Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sync::Once #65719

Merged
merged 12 commits into from
Nov 10, 2019
258 changes: 152 additions & 106 deletions src/libstd/sync/once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,40 @@
//
// You'll find a few more details in the implementation, but that's the gist of
// it!

//
// Atomic orderings:
// When running `Once` we deal with multiple atomics:
// `Once.state_and_queue` and an unknown number of `Waiter.signaled`.
// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the
// result of the `Once`, and (3) for synchronizing `Waiter` nodes.
// - At the end of the `call_inner` function we have to make sure the result
// of the `Once` is acquired. So every load which can be the only one to
// load COMPLETED must have at least Acquire ordering, which means all
// three of them.
// - `WaiterQueue::Drop` is the only place that may store COMPLETED, and
// must do so with Release ordering to make the result available.
// - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and
// needs to make the nodes available with Release ordering. The load in
// its `compare_and_swap` can be Relaxed because it only has to compare
// the atomic, not to read other data.
// - `WaiterQueue::Drop` must see the `Waiter` nodes, so it must load
// `state_and_queue` with Acquire ordering.
// - There is just one store where `state_and_queue` is used only as a
// state flag, without having to synchronize data: switching the state
// from INCOMPLETE to RUNNING in `call_inner`. This store can be Relaxed,
// but the read has to be Acquire because of the requirements mentioned
// above.
// * `Waiter.signaled` is both used as a flag, and to protect a field with
// interior mutability in `Waiter`. `Waiter.thread` is changed in
// `WaiterQueue::Drop` which then sets `signaled` with Release ordering.
// After `wait` loads `signaled` with Acquire and sees it is true, it needs to
// see the changes to drop the `Waiter` struct correctly.
// * There is one place where the two atomics `Once.state_and_queue` and
// `Waiter.signaled` come together, and might be reordered by the compiler or
// processor. Because both use Aquire ordering such a reordering is not
// allowed, so no need for SeqCst.

use crate::cell::Cell;
use crate::fmt;
use crate::marker;
use crate::ptr;
Expand All @@ -78,10 +111,10 @@ use crate::thread::{self, Thread};
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Once {
// This `state` word is actually an encoded version of just a pointer to a
// `Waiter`, so we add the `PhantomData` appropriately.
state: AtomicUsize,
_marker: marker::PhantomData<*mut Waiter>,
// `state_and_queue` is actually an a pointer to a `Waiter` with extra state
// bits, so we add the `PhantomData` appropriately.
state_and_queue: AtomicUsize,
_marker: marker::PhantomData<*const Waiter>,
}

// The `PhantomData` of a raw pointer removes these two auto traits, but we
Expand Down Expand Up @@ -121,8 +154,8 @@ pub struct OnceState {
)]
pub const ONCE_INIT: Once = Once::new();

// Four states that a Once can be in, encoded into the lower bits of `state` in
// the Once structure.
// Four states that a Once can be in, encoded into the lower bits of
// `state_and_queue` in the Once structure.
const INCOMPLETE: usize = 0x0;
const POISONED: usize = 0x1;
const RUNNING: usize = 0x2;
Expand All @@ -132,26 +165,34 @@ const COMPLETE: usize = 0x3;
// this is in the RUNNING state.
const STATE_MASK: usize = 0x3;

// Representation of a node in the linked list of waiters in the RUNNING state.
// Representation of a node in the linked list of waiters, used while in the
// RUNNING state.
// Note: `Waiter` can't hold a mutable pointer to the next thread, because then
// `wait` would both hand out a mutable reference to its `Waiter` node, and keep
// a shared reference to check `signaled`. Instead we hold shared references and
// use interior mutability.
#[repr(align(4))] // Ensure the two lower bits are free to use as state bits.
struct Waiter {
thread: Option<Thread>,
thread: Cell<Option<Thread>>,
signaled: AtomicBool,
next: *mut Waiter,
next: *const Waiter,
}

// Helper struct used to clean up after a closure call with a `Drop`
// implementation to also run on panic.
struct Finish<'a> {
panicked: bool,
me: &'a Once,
// Head of a linked list of waiters.
// Every node is a struct on the stack of a waiting thread.
// Will wake up the waiters when it gets dropped, i.e. also on panic.
struct WaiterQueue<'a> {
state_and_queue: &'a AtomicUsize,
set_state_on_drop_to: usize,
}


impl Once {
/// Creates a new `Once` value.
#[stable(feature = "once_new", since = "1.2.0")]
pub const fn new() -> Once {
Once {
state: AtomicUsize::new(INCOMPLETE),
state_and_queue: AtomicUsize::new(INCOMPLETE),
_marker: marker::PhantomData,
}
}
Expand Down Expand Up @@ -329,8 +370,8 @@ impl Once {
// An `Acquire` load is enough because that makes all the initialization
// operations visible to us, and, this being a fast path, weaker
// ordering helps with performance. This `Acquire` synchronizes with
// `SeqCst` operations on the slow path.
self.state.load(Ordering::Acquire) == COMPLETE
// `Release` operations on the slow path.
self.state_and_queue.load(Ordering::Acquire) == COMPLETE
}

// This is a non-generic function to reduce the monomorphization cost of
Expand All @@ -347,122 +388,127 @@ impl Once {
#[cold]
fn call_inner(&self,
ignore_poisoning: bool,
init: &mut dyn FnMut(bool)) {

// This cold path uses SeqCst consistently because the
// performance difference really does not matter there, and
// SeqCst minimizes the chances of something going wrong.
let mut state = self.state.load(Ordering::SeqCst);

'outer: loop {
match state {
// If we're complete, then there's nothing to do, we just
// jettison out as we shouldn't run the closure.
COMPLETE => return,

// If we're poisoned and we're not in a mode to ignore
// poisoning, then we panic here to propagate the poison.
init: &mut dyn FnMut(bool))
{
let mut state_and_queue = self.state_and_queue.load(Ordering::Acquire);
loop {
match state_and_queue {
COMPLETE => break,
POISONED if !ignore_poisoning => {
// Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
}

// Otherwise if we see a poisoned or otherwise incomplete state
// we will attempt to move ourselves into the RUNNING state. If
// we succeed, then the queue of waiters starts at null (all 0
// bits).
POISONED |
INCOMPLETE => {
let old = self.state.compare_and_swap(state, RUNNING,
Ordering::SeqCst);
if old != state {
state = old;
// Try to register this thread as the one RUNNING.
let old = self.state_and_queue.compare_and_swap(state_and_queue,
RUNNING,
Ordering::Acquire);
if old != state_and_queue {
state_and_queue = old;
continue
}

// Run the initialization routine, letting it know if we're
// poisoned or not. The `Finish` struct is then dropped, and
// the `Drop` implementation here is responsible for waking
// up other waiters both in the normal return and panicking
// case.
let mut complete = Finish {
panicked: true,
me: self,
// `waiter_queue` will manage other waiting threads, and
// wake them up on drop.
let mut waiter_queue = WaiterQueue {
state_and_queue: &self.state_and_queue,
set_state_on_drop_to: POISONED,
};
init(state == POISONED);
complete.panicked = false;
return
// Run the initialization function, letting it know if we're
// poisoned or not.
init(state_and_queue == POISONED);
waiter_queue.set_state_on_drop_to = COMPLETE;
break
}

// All other values we find should correspond to the RUNNING
// state with an encoded waiter list in the more significant
// bits. We attempt to enqueue ourselves by moving us to the
// head of the list and bail out if we ever see a state that's
// not RUNNING.
_ => {
assert!(state & STATE_MASK == RUNNING);
let mut node = Waiter {
thread: Some(thread::current()),
signaled: AtomicBool::new(false),
next: ptr::null_mut(),
};
let me = &mut node as *mut Waiter as usize;
assert!(me & STATE_MASK == 0);

while state & STATE_MASK == RUNNING {
node.next = (state & !STATE_MASK) as *mut Waiter;
let old = self.state.compare_and_swap(state,
me | RUNNING,
Ordering::SeqCst);
if old != state {
state = old;
continue
}

// Once we've enqueued ourselves, wait in a loop.
// Afterwards reload the state and continue with what we
// were doing from before.
while !node.signaled.load(Ordering::SeqCst) {
thread::park();
}
state = self.state.load(Ordering::SeqCst);
continue 'outer
}
// All other values must be RUNNING with possibly a
// pointer to the waiter queue in the more significant bits.
assert!(state_and_queue & STATE_MASK == RUNNING);
wait(&self.state_and_queue, state_and_queue);
state_and_queue = self.state_and_queue.load(Ordering::Acquire);
}
}
}
}
}

fn wait(state_and_queue: &AtomicUsize, current_state: usize) {
// Create the node for our current thread that we are going to try to slot
// in at the head of the linked list.
let mut node = Waiter {
pitdicker marked this conversation as resolved.
Show resolved Hide resolved
thread: Cell::new(Some(thread::current())),
signaled: AtomicBool::new(false),
next: ptr::null(),
};
let me = &node as *const Waiter as usize;
assert!(me & STATE_MASK == 0); // We assume pointers have 2 free bits that
// we can use for state.

// Try to slide in the node at the head of the linked list.
// Run in a loop where we make sure the status is still RUNNING, and that
// another thread did not just replace the head of the linked list.
let mut old_head_and_status = current_state;
loop {
if old_head_and_status & STATE_MASK != RUNNING {
return; // No need anymore to enqueue ourselves.
}

node.next = (old_head_and_status & !STATE_MASK) as *const Waiter;
pitdicker marked this conversation as resolved.
Show resolved Hide resolved
let old = state_and_queue.compare_and_swap(old_head_and_status,
me | RUNNING,
Ordering::Release);
if old == old_head_and_status {
break; // Success!
}
old_head_and_status = old;
}

// We have enqueued ourselves, now lets wait.
// It is important not to return before being signaled, otherwise we would
// drop our `Waiter` node and leave a hole in the linked list (and a
// dangling reference). Guard against spurious wakeups by reparking
// ourselves until we are signaled.
while !node.signaled.load(Ordering::Acquire) {
// If the managing thread happens to signal and unpark us before we can
// park ourselves, the result could be this thread never gets unparked.
// Luckily `park` comes with the guarantee that if it got an `unpark`
// just before on an unparked thread is does not park.
thread::park();
}
}

#[stable(feature = "std_debug", since = "1.16.0")]
impl fmt::Debug for Once {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Once { .. }")
}
}

impl Drop for Finish<'_> {
impl Drop for WaiterQueue<'_> {
fn drop(&mut self) {
// Swap out our state with however we finished. We should only ever see
// an old state which was RUNNING.
let queue = if self.panicked {
self.me.state.swap(POISONED, Ordering::SeqCst)
} else {
self.me.state.swap(COMPLETE, Ordering::SeqCst)
};
assert_eq!(queue & STATE_MASK, RUNNING);

// Decode the RUNNING to a list of waiters, then walk that entire list
// and wake them up. Note that it is crucial that after we store `true`
// in the node it can be free'd! As a result we load the `thread` to
// signal ahead of time and then unpark it after the store.
// Swap out our state with however we finished.
let state_and_queue = self.state_and_queue.swap(self.set_state_on_drop_to,
Ordering::AcqRel);

// We should only ever see an old state which was RUNNING.
assert_eq!(state_and_queue & STATE_MASK, RUNNING);

// Walk the entire linked list of waiters and wake them up (in lifo
// order, last to register is first to wake up).
unsafe {
let mut queue = (queue & !STATE_MASK) as *mut Waiter;
// Right after setting `node.signaled = true` the other thread may
// free `node` if there happens to be has a spurious wakeup.
// So we have to take out the `thread` field and copy the pointer to
// `next` first.
let mut queue = (state_and_queue & !STATE_MASK) as *const Waiter;
while !queue.is_null() {
let next = (*queue).next;
let thread = (*queue).thread.take().unwrap();
(*queue).signaled.store(true, Ordering::SeqCst);
thread.unpark();
let thread = (*queue).thread.replace(None).unwrap();
(*queue).signaled.store(true, Ordering::Release);
// ^- FIXME (maybe): This is another case of issue #55005
// `store()` has a potentially dangling ref to `signaled`.
queue = next;
thread.unpark();
}
}
}
Expand Down