Skip to content

Commit

Permalink
fix: unintentional spin-wait loop in spawn
Browse files Browse the repository at this point in the history
This addresses a bug introduced with named timers.

Even more importantly, the fact that the bug made it into a published
release highlights a gap in the test coverage for the real-world actor
runtime, which has always gotten less attention than the model checking
side of this library. I planned to add a regression test, but I believe
I have to first add support for an `Out::exit` command or similar and
don't want to block the fix on that.
  • Loading branch information
jonnadal committed Aug 27, 2023
1 parent 1466b3c commit 91f98ef
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions src/actor/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,42 +92,46 @@ where
loop {
// Apply an interrupt if present, otherwise wait for a message.
let mut out = Out::new();
if let Some((min_timer, min_instant)) = next_interrupts.iter().min_by_key(|(_, instant)| *instant).map(|(t, i)| (t.clone(), *i)) {
if let Some(max_wait) = min_instant.checked_duration_since(Instant::now()) {
socket.set_read_timeout(Some(max_wait)).expect("set_read_timeout failed");
match socket.recv_from(&mut in_buf) {
Err(e) => {
// Timeout (`WouldBlock`) ignored since next iteration will apply interrupt.
if e.kind() != std::io::ErrorKind::WouldBlock {
log::warn!("Unable to read socket. Ignoring. id={}, err={:?}", addr, e);
}
continue;
},
Ok((count, src_addr)) => {
match deserialize(&in_buf[..count]) {
Ok(msg) => {
if let SocketAddr::V4(src_addr) = src_addr {
log::info!("Received message. id={}, src={}, msg={}",
addr, src_addr, format!("{:?}", msg));
actor.on_msg(id, &mut state, Id::from(src_addr), msg, &mut out);
} else {
log::debug!("Received non-IPv4 message. Ignoring. id={}, src={}, msg={}",
addr, src_addr, format!("{:?}", msg));
continue;
}
},
Err(e) => {
log::debug!("Unable to parse message. Ignoring. id={}, src={}, buf={:?}, err={:?}",
addr, src_addr, &in_buf[..count], e);
let (min_timer, min_instant) = next_interrupts
.iter()
.min_by_key(|(_, instant)| *instant)
.map(|(t, i)| (Some(t.clone()), *i))
.unwrap_or_else(|| (None, practically_never()));
if let Some(max_wait) = min_instant.checked_duration_since(Instant::now()) {
socket.set_read_timeout(Some(max_wait)).expect("set_read_timeout failed");
match socket.recv_from(&mut in_buf) {
Err(e) => {
// Timeout (`WouldBlock`) ignored since next iteration will apply interrupt.
if e.kind() != std::io::ErrorKind::WouldBlock {
log::warn!("Unable to read socket. Ignoring. id={}, err={:?}", addr, e);
}
continue;
},
Ok((count, src_addr)) => {
match deserialize(&in_buf[..count]) {
Ok(msg) => {
if let SocketAddr::V4(src_addr) = src_addr {
log::info!("Received message. id={}, src={}, msg={}",
addr, src_addr, format!("{:?}", msg));
actor.on_msg(id, &mut state, Id::from(src_addr), msg, &mut out);
} else {
log::debug!("Received non-IPv4 message. Ignoring. id={}, src={}, msg={}",
addr, src_addr, format!("{:?}", msg));
continue;
}
},
Err(e) => {
log::debug!("Unable to parse message. Ignoring. id={}, src={}, buf={:?}, err={:?}",
addr, src_addr, &in_buf[..count], e);
continue;
}
},
}
} else {
next_interrupts.remove(&min_timer); // timer is no longer valid
actor.on_timeout(id, &mut state, &min_timer, &mut out);
}
},
}
} else {
let min_timer = min_timer.unwrap();
next_interrupts.remove(&min_timer); // timer is no longer valid
actor.on_timeout(id, &mut state, &min_timer, &mut out);
}

// Handle commands and update state.
Expand Down

0 comments on commit 91f98ef

Please sign in to comment.