Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: cancel running timers on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Nov 3, 2020
1 parent 75f0a5c commit d8f420f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 11 deletions.
6 changes: 6 additions & 0 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,9 @@ impl Routing {
.ok_or(Error::InvalidState)
}
}

impl Drop for Routing {
fn drop(&mut self) {
self.stage.cancel_timers()
}
}
43 changes: 36 additions & 7 deletions src/routing/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,33 @@ use crate::{
};
use bytes::Bytes;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{sync::mpsc, sync::Mutex, time};
use tokio::{
sync::{mpsc, watch, Mutex},
time,
};

// Node's current stage which is responsible
// for accessing current info and trigger operations.
pub(crate) struct Stage {
pub(super) state: Mutex<Approved>,
pub(super) comm: Comm,

cancel_timer_tx: watch::Sender<bool>,
cancel_timer_rx: watch::Receiver<bool>,
}

impl Stage {
pub fn new(state: Approved, comm: Comm) -> Self {
let (cancel_timer_tx, mut cancel_timer_rx) = watch::channel(false);

// Take out the initial value.
let _ = futures::executor::block_on(cancel_timer_rx.recv());

Self {
state: Mutex::new(state),
comm,
cancel_timer_tx,
cancel_timer_rx,
}
}

Expand Down Expand Up @@ -96,9 +109,11 @@ impl Stage {
Command::SendUserMessage { src, dst, content } => {
self.state.lock().await.send_user_message(src, dst, content)
}
Command::ScheduleTimeout { duration, token } => {
Ok(vec![self.handle_schedule_timeout(duration, token).await])
}
Command::ScheduleTimeout { duration, token } => Ok(self
.handle_schedule_timeout(duration, token)
.await
.into_iter()
.collect()),
Command::Relocate {
bootstrap_addrs,
details,
Expand All @@ -118,6 +133,11 @@ impl Stage {
result
}

// Cancels any scheduled timers, currently or in the future.
pub fn cancel_timers(&self) {
let _ = self.cancel_timer_tx.broadcast(true);
}

// Note: this indirecton is needed. Trying to call `spawn(self.handle_commands(...))` directly
// inside `handle_commands` causes compile error about type check cycle.
fn spawn_handle_commands(self: Arc<Self>, command: Command) {
Expand All @@ -144,9 +164,18 @@ impl Stage {
}
}

async fn handle_schedule_timeout(&self, duration: Duration, token: u64) -> Command {
time::delay_for(duration).await;
Command::HandleTimeout(token)
async fn handle_schedule_timeout(&self, duration: Duration, token: u64) -> Option<Command> {
let mut cancel_rx = self.cancel_timer_rx.clone();

if *cancel_rx.borrow() {
// Timers are already cancelled, do nothing.
return None;
}

tokio::select! {
_ = time::delay_for(duration) => Some(Command::HandleTimeout(token)),
_ = cancel_rx.recv() => None,
}
}

async fn handle_relocate(
Expand Down
5 changes: 1 addition & 4 deletions tests/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,7 @@ async fn test_section_bootstrapping() -> Result<()> {
// Test that the first `ELDER_SIZE` nodes in the network are promoted to elders.
#[tokio::test]
async fn test_startup_elders() -> Result<()> {
// FIXME: using only 3 nodes for now because with 4 or more the test takes too long (but still
// succeeds). Needs further investigation.
let network_size = 4;
let mut nodes = create_connected_nodes(network_size).await?;
let mut nodes = create_connected_nodes(ELDER_SIZE).await?;

async fn expect_promote_event(stream: &mut EventStream) {
while let Some(event) = stream.next().await {
Expand Down

0 comments on commit d8f420f

Please sign in to comment.