Skip to content

Commit

Permalink
Fixing flaky test (#2407)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored May 22, 2024
1 parent e1679f3 commit b806122
Showing 1 changed file with 42 additions and 14 deletions.
56 changes: 42 additions & 14 deletions src/core/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,28 +184,56 @@ mod tests {
fn test_cancel_cpu_intensive_tasks() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;

let counter: Arc<AtomicU64> = Default::default();

let other_counter: Arc<AtomicU64> = Default::default();

let mut futures = Vec::new();
let mut other_futures = Vec::new();

let (tx, rx) = crossbeam_channel::bounded::<()>(0);
let rx = Arc::new(rx);
let executor = Executor::multi_thread(3, "search-test").unwrap();
for _ in 0..1_000 {
let counter_clone = counter.clone();
for i in 0..1000 {
let counter_clone: Arc<AtomicU64> = counter.clone();
let other_counter_clone: Arc<AtomicU64> = other_counter.clone();

let rx_clone = rx.clone();
let rx_clone2 = rx.clone();
let fut = executor.spawn_blocking(move || {
std::thread::sleep(Duration::from_millis(4));
counter_clone.fetch_add(1, Ordering::SeqCst)
counter_clone.fetch_add(1, Ordering::SeqCst);
let () = rx_clone.recv().unwrap();
});
futures.push(fut);
let other_fut = executor.spawn_blocking(move || {
other_counter_clone.fetch_add(1, Ordering::SeqCst);
let () = rx_clone2.recv().unwrap();
});
other_futures.push(other_fut);
}

// We execute 100 futures.
for i in 0..100 {
tx.send(()).unwrap();
}
std::thread::sleep(Duration::from_millis(5));
// The first few num_cores tasks should run, but the other should get cancelled.
drop(futures);
while Arc::strong_count(&counter) > 1 {
std::thread::sleep(Duration::from_millis(10));

let counter_val = counter.load(Ordering::SeqCst);
let other_counter_val = other_counter.load(Ordering::SeqCst);
assert!(counter_val >= 30);
assert!(other_counter_val >= 30);

drop(other_futures);

// We execute 100 futures.
for i in 0..100 {
tx.send(()).unwrap();
}
// with ideal timing, we expect the result to always be 6, but as long as we run some, and
// cancelled most, the test is a success
assert!(counter.load(Ordering::SeqCst) > 0);
assert!(counter.load(Ordering::SeqCst) < 50);

let counter_val2 = counter.load(Ordering::SeqCst);
assert!(counter_val2 >= counter_val + 100 - 6);

let other_counter_val2 = other_counter.load(Ordering::SeqCst);
assert!(other_counter_val2 <= other_counter_val + 6);
}
}

0 comments on commit b806122

Please sign in to comment.