Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect result count when ping speed is too fast. #140

Merged
merged 1 commit into from
Aug 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/ping_runners/ping_result_processing_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::{sync::mpsc, task, task::JoinHandle};
pub struct PingResultProcessingWorker {
stop_event: Arc<ManualResetEvent>,

receiver: mpsc::Receiver<PingResult>,
receiver: mpsc::UnboundedReceiver<PingResult>,
processors: Vec<Box<dyn PingResultProcessor + Send + Sync>>,
}

Expand All @@ -18,7 +18,7 @@ impl PingResultProcessingWorker {
extra_ping_result_processors: Vec<Box<dyn PingResultProcessor + Send + Sync>>,
stop_event: Arc<ManualResetEvent>,
ping_stop_event: Arc<ManualResetEvent>,
receiver: mpsc::Receiver<PingResult>,
receiver: mpsc::UnboundedReceiver<PingResult>,
) -> JoinHandle<()> {
let join_handle = task::spawn(async move {
let processors = ping_result_processor_factory::new(&config, extra_ping_result_processors, ping_stop_event);
Expand Down Expand Up @@ -52,7 +52,8 @@ impl PingResultProcessingWorker {
}

_ = self.stop_event.wait() => {
tracing::debug!("Stop event received, stopping ping result processing worker");
tracing::debug!("Stop event received, stopping receiver to avoid future message and drain till completed.");
self.receiver.close();
break;
}

Expand All @@ -62,6 +63,12 @@ impl PingResultProcessingWorker {
}
}
}

while let Some(ping_result) = self.receiver.recv().await {
self.process_ping_result(&ping_result);
}

tracing::debug!("All pending ping results are processed, exiting ping result processing worker loop.");
}

#[tracing::instrument(name = "Processing ping result", level = "debug", skip(self), fields(processor_count = %self.processors.len()))]
Expand Down
16 changes: 6 additions & 10 deletions src/ping_runners/ping_runner_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct PingRunnerCore {
worker_join_handles: Vec<JoinHandle<()>>,
ping_result_processor_stop_event: Arc<ManualResetEvent>,
ping_result_processor_join_handle: Option<JoinHandle<()>>,
result_sender: mpsc::Sender<PingResult>,
result_sender: mpsc::UnboundedSender<PingResult>,
}

impl PingRunnerCore {
Expand Down Expand Up @@ -120,13 +120,8 @@ impl PingRunnerCore {
parallel_ping_count: u32,
stop_event: Arc<ManualResetEvent>,
ping_stop_event: Arc<ManualResetEvent>,
) -> (mpsc::Sender<PingResult>, JoinHandle<()>) {
let mut ping_result_channel_size = parallel_ping_count * 2;
if ping_result_channel_size < 128 {
ping_result_channel_size = 128;
}

let (ping_result_sender, ping_result_receiver) = mpsc::channel(ping_result_channel_size as usize);
) -> (mpsc::UnboundedSender<PingResult>, JoinHandle<()>) {
let (ping_result_sender, ping_result_receiver) = mpsc::unbounded_channel();
let ping_result_processor_join_handle = PingResultProcessingWorker::run(
Arc::new(result_processor_config),
extra_ping_result_processors,
Expand Down Expand Up @@ -238,15 +233,16 @@ impl PingRunnerCore {
join_handle.await.unwrap();
}
self.worker_join_handles.clear();
tracing::debug!("All workers are stopped.");

// If all the ping jobs are finished, the workers will stop automatically.
// In this case, the stop events won't be set, and we set it here to be safe.
if !self.stop_event.is_set() {
tracing::debug!("All ping jobs are completed, hence all workers are stopped. Signal result processor to exit.");
self.stop_event.set();
} else {
tracing::debug!("All workers are stopped. Signal result processor to exit.");
}

tracing::debug!("All ping jobs are completed and all workers are stopped. Signal result processor to exit.");
self.ping_result_processor_stop_event.set();

tracing::debug!("Waiting for result processor to be stopped.");
Expand Down
8 changes: 4 additions & 4 deletions src/ping_runners/ping_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct PingWorker {
stop_event: Arc<ManualResetEvent>,
port_picker: Arc<Mutex<PingPortPicker>>,
ping_client: Box<dyn PingClient + Send + Sync>,
result_sender: mpsc::Sender<PingResult>,
result_sender: mpsc::UnboundedSender<PingResult>,
is_warmup_worker: bool,
}

Expand All @@ -28,7 +28,7 @@ impl PingWorker {
external_ping_client_factory: Option<PingClientFactory>,
port_picker: Arc<Mutex<PingPortPicker>>,
stop_event: Arc<ManualResetEvent>,
result_sender: mpsc::Sender<PingResult>,
result_sender: mpsc::UnboundedSender<PingResult>,
is_warmup_worker: bool,
) -> JoinHandle<()> {
let join_handle = task::spawn(async move {
Expand Down Expand Up @@ -103,7 +103,7 @@ impl PingWorker {
None,
);

self.result_sender.send(result).await.unwrap();
self.result_sender.send(result).unwrap();
}

#[tracing::instrument(name = "Processing ping client single ping error", level = "debug", skip(self), fields(worker_id = %self.id))]
Expand All @@ -124,7 +124,7 @@ impl PingWorker {
Some(error),
);

self.result_sender.send(result).await.unwrap();
self.result_sender.send(result).unwrap();
}

#[tracing::instrument(name = "Waiting for next schedule", level = "debug", skip(self), fields(worker_id = %self.id))]
Expand Down
11 changes: 10 additions & 1 deletion tests/ping_runner_core_tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod test_common;
mod test_mocks;

use futures_intrusive::sync::ManualResetEvent;
Expand All @@ -10,6 +11,8 @@ use tokio::runtime::Runtime;

#[test]
fn ping_with_rnp_core_should_work() {
test_common::initialize();

let actual_ping_results = Arc::new(Mutex::new(Vec::<MockPingClientResult>::new()));
let config = create_mock_rnp_config(actual_ping_results.clone(), 6, 3, 1);
let rt = Runtime::new().unwrap();
Expand Down Expand Up @@ -40,10 +43,12 @@ fn ping_with_rnp_core_should_work() {

#[test]
fn ping_with_rnp_core_stress_should_work() {
test_common::initialize();

let actual_ping_results = Arc::new(Mutex::new(Vec::<MockPingClientResult>::new()));
let config = create_mock_rnp_config(actual_ping_results.clone(), 1000, 0, 10);
let rt = Runtime::new().unwrap();
rt.block_on(async {
rt.block_on(async move {
let stop_event = Arc::new(ManualResetEvent::new(false));
let mut rp = PingRunnerCore::new(config, stop_event);
rp.run_warmup_pings().await;
Expand All @@ -57,6 +62,8 @@ fn ping_with_rnp_core_stress_should_work() {

#[test]
fn ping_with_rnp_core_stop_event_should_work() {
test_common::initialize();

let actual_ping_results = Arc::new(Mutex::new(Vec::<MockPingClientResult>::new()));
let config = create_mock_rnp_config(actual_ping_results.clone(), 1000, 0, 10);
let rt = Runtime::new().unwrap();
Expand All @@ -76,6 +83,8 @@ fn ping_with_rnp_core_stop_event_should_work() {

#[test]
fn ping_with_rnp_core_exit_on_fail_should_work() {
test_common::initialize();

let actual_ping_results = Arc::new(Mutex::new(Vec::<MockPingClientResult>::new()));
let exit_reason = Arc::new(Mutex::new(None));

Expand Down
9 changes: 9 additions & 0 deletions tests/test_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use std::sync::Once;

static INIT: Once = Once::new();

pub fn initialize() {
INIT.call_once(|| {
let _ = env_logger::builder().format_timestamp_micros().is_test(true).try_init();
});
}
24 changes: 14 additions & 10 deletions tests/test_mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,6 @@ impl PingResultProcessor for MockPingResultProcessor {
return;
}

if let Some(warning) = ping_result.warning() {
match warning {
PingClientWarning::AppHandshakeFailed(_) => results.push(MockPingClientResult::AppHandshakeFailed(ping_result.round_trip_time())),
PingClientWarning::DisconnectFailed(_) => results.push(MockPingClientResult::DisconnectFailed(ping_result.round_trip_time())),
}
return;
}

if let Some(error) = ping_result.error() {
match error {
PingClientError::PreparationFailed(_) => results.push(MockPingClientResult::PreparationFailed),
Expand All @@ -129,9 +121,21 @@ impl PingResultProcessor for MockPingResultProcessor {
return;
}

if ping_result.is_succeeded() {
results.push(MockPingClientResult::Success(ping_result.round_trip_time()));
assert!(ping_result.is_succeeded());

if let Some(warning) = ping_result.warning() {
match warning {
PingClientWarning::AppHandshakeFailed(_) => results.push(MockPingClientResult::AppHandshakeFailed(ping_result.round_trip_time())),
PingClientWarning::DisconnectFailed(_) => results.push(MockPingClientResult::DisconnectFailed(ping_result.round_trip_time())),
}
return;
}

results.push(MockPingClientResult::Success(ping_result.round_trip_time()));
}

fn rundown(&mut self) {
let results = self.results.lock().unwrap();
println!("Ping runner shutting down. {} result received!", results.len());
}
}