Skip to content

Commit

Permalink
Fix panic when hitting ctrl+c to stop. (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
r12f committed Aug 7, 2021
1 parent fb6759e commit 1b54dde
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/rnp_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct RnpCore {

stop_event: Arc<ManualResetEvent>,
worker_join_handles: Vec<JoinHandle<()>>,
ping_result_processor_stop_event: Arc<ManualResetEvent>,
ping_result_processor_join_handle: Option<JoinHandle<()>>,
result_sender: mpsc::Sender<PingResult>,
}
Expand Down Expand Up @@ -83,18 +84,21 @@ impl RnpCore {
let mut extra_ping_result_processors = Vec::new();
extra_ping_result_processors.append(&mut config.extra_ping_result_processors);

let ping_result_processor_stop_event = Arc::new(ManualResetEvent::new(false));

let (result_sender, ping_result_processor_join_handle) =
RnpCore::create_ping_result_processing_worker(
config.result_processor_config.clone(),
extra_ping_result_processors,
config.worker_scheduler_config.parallel_ping_count,
stop_event.clone(),
ping_result_processor_stop_event.clone(),
);

let rnp_core = RnpCore {
config,
stop_event,
worker_join_handles: Vec::new(),
ping_result_processor_stop_event,
ping_result_processor_join_handle: Some(ping_result_processor_join_handle),
result_sender,
};
Expand Down Expand Up @@ -241,13 +245,15 @@ impl RnpCore {
self.worker_join_handles.clear();
tracing::debug!("All workers are stopped.");

// If all the ping jobs are finished, the workers will stop automatically.
// In this case, the stop events won't be set, and we set it here to be safe.
if !self.stop_event.is_set() {
tracing::debug!(
"All ping jobs are completed and all workers are stopped. Signal result processor to exit."
);
self.stop_event.set();
}

tracing::debug!("All ping jobs are completed and all workers are stopped. Signal result processor to exit.");
self.ping_result_processor_stop_event.set();

tracing::debug!("Waiting for result processor to be stopped.");
self.ping_result_processor_join_handle
.take()
Expand Down
19 changes: 19 additions & 0 deletions tests/rnp_core_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ fn ping_with_rnp_core_stress_should_work() {
assert_eq!(1000, results.len());
}

#[test]
fn ping_with_rnp_core_stop_event_should_work() {
let actual_ping_results = Arc::new(Mutex::new(Vec::<MockPingClientResult>::new()));
let config = create_mock_rnp_config(actual_ping_results.clone(), 1000, 0, 10);
let rt = Runtime::new().unwrap();
rt.block_on(async {
let stop_event = Arc::new(ManualResetEvent::new(false));
let stop_event_clone = stop_event.clone();

let mut rp = RnpCore::new(config, stop_event);
rp.run_warmup_pings().await;

rp.start_running_normal_pings();
tokio::spawn(async move { stop_event_clone.set() });

rp.join().await;
});
}

fn create_mock_rnp_config(actual_ping_results: Arc<Mutex<Vec<MockPingClientResult>>>, ping_count: u32, warmup_count: u32, parallel_ping_count: u32) -> RnpCoreConfig {
RnpCoreConfig {
worker_config: PingWorkerConfig {
Expand Down

0 comments on commit 1b54dde

Please sign in to comment.