Skip to content

Commit

Permalink
Fix TickleLatch reading stale data after set()
Browse files Browse the repository at this point in the history
`TickleLatch` is used for synchronizing between multiple thread pools.
It sets an internal latch, and then tries to tickle the remote pool to
notice the completion. However, if the other pool is already awake and
sees the set flag, it may continue and drop the whole job, invalidating
further reads from the latch.

We can fix this by reading all fields from the `&self` latch _before_
calling the internal `set()`. Furthermore, I've updated this to take a
full `Arc<Registry>` handle, so we're also sure that the other pool
won't be destroyed too soon.

The new `tests/cross-pool.rs` would usually still pass before, but the
race could be increased by adding a manual `yield_now()` between the
`set()` and `tickle()`. This PR still passes under that same hack.

Fixes rayon-rs#739.
  • Loading branch information
cuviper committed Mar 30, 2020
1 parent 3096967 commit 26da2f7
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
17 changes: 11 additions & 6 deletions rayon-core/src/latch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Condvar, Mutex};
use std::sync::{Arc, Condvar, Mutex};
use std::usize;

use crate::sleep::Sleep;
use crate::registry::Registry;

/// We define various kinds of latches, which are all a primitive signaling
/// mechanism. A latch starts as false. Eventually someone calls `set()` and
Expand Down Expand Up @@ -168,15 +168,16 @@ impl Latch for CountLatch {
/// so the source pool can continue processing its own work while waiting.
pub(super) struct TickleLatch<'a, L: Latch> {
inner: L,
sleep: &'a Sleep,
registry: &'a Arc<Registry>,
}

impl<'a, L: Latch> TickleLatch<'a, L> {
#[inline]
pub(super) fn new(latch: L, sleep: &'a Sleep) -> Self {
pub(super) fn new(latch: L, registry: &'a Arc<Registry>) -> Self {
registry.increment_terminate_count();
TickleLatch {
inner: latch,
sleep,
registry,
}
}
}
Expand All @@ -191,8 +192,12 @@ impl<'a, L: Latch> LatchProbe for TickleLatch<'a, L> {
impl<'a, L: Latch> Latch for TickleLatch<'a, L> {
#[inline]
fn set(&self) {
// Ensure the registry stays alive while we tickle it.
let registry = Arc::clone(self.registry);

// NOTE: Once we `set`, the target may proceed and invalidate `&self`!
self.inner.set();
self.sleep.tickle(usize::MAX);
registry.tickle();
}
}

Expand Down
6 changes: 5 additions & 1 deletion rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ impl Registry {
// This thread is a member of a different pool, so let it process
// other work while waiting for this `op` to complete.
debug_assert!(current_thread.registry().id() != self.id());
let latch = TickleLatch::new(SpinLatch::new(), &current_thread.registry().sleep);
let latch = TickleLatch::new(SpinLatch::new(), current_thread.registry());
let job = StackJob::new(
|injected| {
let worker_thread = WorkerThread::current();
Expand Down Expand Up @@ -508,6 +508,10 @@ impl Registry {
self.terminate_latch.set();
self.sleep.tickle(usize::MAX);
}

pub(super) fn tickle(&self) {
self.sleep.tickle(usize::MAX);
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
21 changes: 21 additions & 0 deletions tests/cross-pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;

#[test]
fn cross_pool_busy() {
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();

let n: i32 = 100;
let sum: i32 = pool1.install(move || {
// Each item will block on pool2, but pool1 can continue processing other work from the
// parallel iterator in the meantime. There's a chance that pool1 will still be awake to
// see the latch set without being tickled, and then it will drop that stack job. The latch
// internals must not assume that the job will still be alive after it's set!
(1..=n)
.into_par_iter()
.map(|i| pool2.install(move || i))
.sum()
});
assert_eq!(sum, n * (n + 1) / 2);
}

0 comments on commit 26da2f7

Please sign in to comment.