Skip to content

Commit

Permalink
utils: handle race condition gracefully (paritytech#1583)
Browse files Browse the repository at this point in the history
* utils: handle race condition gracefully

* utils: add a test

* update Cargo.lock

* utils: remove a warning

* utils: init logger in tests

* utils: update the outdated comment

* util: wait for both subsystem and test_future to finish

* Revert "util: wait for both subsystem and test_future to finish"

This reverts commit 075b392.
  • Loading branch information
ordian committed Aug 21, 2020
1 parent bf7ccb8 commit 8e4e79f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/subsystem-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ async-trait = "0.1"
futures = { version = "0.3.5", features = ["thread-pool"] }
parking_lot = "0.10.0"
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
env_logger = "0.7.1"
40 changes: 36 additions & 4 deletions node/subsystem-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,13 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
}

/// Send a message to the appropriate job for this `parent_hash`.
/// Will not return an error if the job is not running.
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> {
match self.running.get_mut(&parent_hash) {
Some(job) => job.send_msg(msg).await?,
None => return Err(Error::JobNotFound(parent_hash)),
None => {
// don't bring down the subsystem, this can happen to due a race condition
},
}
Ok(())
}
Expand Down Expand Up @@ -1140,6 +1143,14 @@ mod tests {
run_args: HashMap<Hash, Vec<FromJob>>,
test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
None,
log::LevelFilter::Trace,
)
.try_init();

let pool = sp_core::testing::TaskExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let (err_tx, err_rx) = mpsc::channel(16);
Expand All @@ -1148,9 +1159,7 @@ mod tests {
let test_future = test(overseer_handle, err_rx);
let timeout = Delay::new(Duration::from_secs(2));

futures::pin_mut!(test_future);
futures::pin_mut!(subsystem);
futures::pin_mut!(timeout);
futures::pin_mut!(subsystem, test_future, timeout);

executor::block_on(async move {
futures::select! {
Expand Down Expand Up @@ -1213,6 +1222,29 @@ mod tests {
});
}

#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let run_args = HashMap::new();

test_harness(run_args, |mut overseer_handle, err_rx| async move {
// send to a non running job
overseer_handle
.send(FromOverseer::Communication {
msg: Default::default(),
})
.await;

// the subsystem is still alive
assert_matches!(
overseer_handle.recv().await,
AllMessages::CandidateSelection(_)
);

let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
}

#[test]
fn test_subsystem_impl_and_name_derivation() {
let pool = sp_core::testing::TaskExecutor::new();
Expand Down

0 comments on commit 8e4e79f

Please sign in to comment.