Skip to content

Commit

Permalink
sync: add Notify::notify_last (#6520)
Browse files Browse the repository at this point in the history
  • Loading branch information
pfreixes committed May 27, 2024
1 parent 6c42d28 commit 9e00b26
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 17 deletions.
82 changes: 65 additions & 17 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ struct Waiter {
/// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
waker: UnsafeCell<Option<Waker>>,

/// Notification for this waiter.
/// Notification for this waiter. Uses 2 bits to store if and how was
/// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
/// the rest of it is unused.
/// * if it's `None`, then `waker` is protected by the `waiters` lock.
/// * if it's `Some`, then `waker` is exclusively owned by the
/// enclosing `Waiter` and can be accessed without locking.
Expand Down Expand Up @@ -253,13 +255,16 @@ generate_addr_of_methods! {
}

// No notification.
const NOTIFICATION_NONE: usize = 0;
const NOTIFICATION_NONE: usize = 0b000;

// Notification type used by `notify_one`.
const NOTIFICATION_ONE: usize = 1;
const NOTIFICATION_ONE: usize = 0b001;

// Notification type used by `notify_last`.
const NOTIFICATION_LAST: usize = 0b101;

// Notification type used by `notify_waiters`.
const NOTIFICATION_ALL: usize = 2;
const NOTIFICATION_ALL: usize = 0b010;

/// Notification for a `Waiter`.
/// This struct is equivalent to `Option<Notification>`, but uses
Expand All @@ -275,13 +280,20 @@ impl AtomicNotification {
/// Store-release a notification.
/// This method should be called exactly once.
fn store_release(&self, notification: Notification) {
self.0.store(notification as usize, Release);
let data: usize = match notification {
Notification::All => NOTIFICATION_ALL,
Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
};
self.0.store(data, Release);
}

fn load(&self, ordering: Ordering) -> Option<Notification> {
match self.0.load(ordering) {
let data = self.0.load(ordering);
match data {
NOTIFICATION_NONE => None,
NOTIFICATION_ONE => Some(Notification::One),
NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
NOTIFICATION_ALL => Some(Notification::All),
_ => unreachable!(),
}
Expand All @@ -296,11 +308,18 @@ impl AtomicNotification {
}
}

#[derive(Debug, PartialEq, Eq)]
#[repr(usize)]
enum NotifyOneStrategy {
Fifo,
Lifo,
}

#[derive(Debug, PartialEq, Eq)]
#[repr(usize)]
enum Notification {
One = NOTIFICATION_ONE,
All = NOTIFICATION_ALL,
One(NotifyOneStrategy),
All,
}

/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
Expand Down Expand Up @@ -521,7 +540,7 @@ impl Notify {
}
}

/// Notifies a waiting task.
/// Notifies the first waiting task.
///
/// If a task is currently waiting, that task is notified. Otherwise, a
/// permit is stored in this `Notify` value and the **next** call to
Expand Down Expand Up @@ -558,6 +577,23 @@ impl Notify {
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "notify"))]
pub fn notify_one(&self) {
self.notify_with_strategy(NotifyOneStrategy::Fifo);
}

/// Notifies the last waiting task.
///
/// This function behaves similar to `notify_one`. The only difference is that it wakes
/// the most recently added waiter instead of the oldest waiter.
///
/// Check the [`notify_one()`] documentation for more info and
/// examples.
///
/// [`notify_one()`]: Notify::notify_one
pub fn notify_last(&self) {
self.notify_with_strategy(NotifyOneStrategy::Lifo);
}

fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
// Load the current state
let mut curr = self.state.load(SeqCst);

Expand Down Expand Up @@ -585,7 +621,7 @@ impl Notify {
// transition out of WAITING while the lock is held.
curr = self.state.load(SeqCst);

if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
drop(waiters);
waker.wake();
}
Expand Down Expand Up @@ -708,7 +744,12 @@ impl Default for Notify {
impl UnwindSafe for Notify {}
impl RefUnwindSafe for Notify {}

fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
fn notify_locked(
waiters: &mut WaitList,
state: &AtomicUsize,
curr: usize,
strategy: NotifyOneStrategy,
) -> Option<Waker> {
match get_state(curr) {
EMPTY | NOTIFIED => {
let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
Expand All @@ -728,8 +769,11 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
// concurrently change as holding the lock is required to
// transition **out** of `WAITING`.
//
// Get a pending waiter
let waiter = waiters.pop_back().unwrap();
// Get a pending waiter using one of the available dequeue strategies.
let waiter = match strategy {
NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
};

// Safety: we never make mutable references to waiters.
let waiter = unsafe { waiter.as_ref() };
Expand All @@ -738,7 +782,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };

// This waiter is unlinked and will not be shared ever again, release it.
waiter.notification.store_release(Notification::One);
waiter
.notification
.store_release(Notification::One(strategy));

if waiters.is_empty() {
// As this the **final** waiter in the list, the state
Expand Down Expand Up @@ -1137,8 +1183,10 @@ impl Drop for Notified<'_> {
// See if the node was notified but not received. In this case, if
// the notification was triggered via `notify_one`, it must be sent
// to the next waiter.
if notification == Some(Notification::One) {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
if let Some(Notification::One(strategy)) = notification {
if let Some(waker) =
notify_locked(&mut waiters, &notify.state, notify_state, strategy)
{
drop(waiters);
waker.wake();
}
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/util/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,26 @@ impl<L: Link> LinkedList<L, L::Target> {
}
}

/// Removes the first element from a list and returns it, or None if it is
/// empty.
pub(crate) fn pop_front(&mut self) -> Option<L::Handle> {
unsafe {
let head = self.head?;
self.head = L::pointers(head).as_ref().get_next();

if let Some(new_head) = L::pointers(head).as_ref().get_next() {
L::pointers(new_head).as_mut().set_prev(None);
} else {
self.tail = None;
}

L::pointers(head).as_mut().set_prev(None);
L::pointers(head).as_mut().set_next(None);

Some(L::from_raw(head))
}
}

/// Removes the last element from a list and returns it, or None if it is
/// empty.
pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
Expand Down
75 changes: 75 additions & 0 deletions tokio/tests/sync_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,38 @@ fn notify_notified_one() {
assert_ready!(notified.poll());
}

#[test]
fn notify_multi_notified_one() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

// add two waiters into the queue
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

// should wakeup the first one
notify.notify_one();
assert_ready!(notified1.poll());
assert_pending!(notified2.poll());
}

#[test]
fn notify_multi_notified_last() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

// add two waiters into the queue
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

// should wakeup the last one
notify.notify_last();
assert_pending!(notified1.poll());
assert_ready!(notified2.poll());
}

#[test]
fn notified_one_notify() {
let notify = Notify::new();
Expand Down Expand Up @@ -105,6 +137,49 @@ fn notified_multi_notify_drop_one() {
assert_ready!(notified2.poll());
}

#[test]
fn notified_multi_notify_one_drop() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
let mut notified3 = spawn(async { notify.notified().await });

// add waiters by order of poll execution
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
assert_pending!(notified3.poll());

// by default fifo
notify.notify_one();

drop(notified1);

// next waiter should be the one to be to woken up
assert_ready!(notified2.poll());
assert_pending!(notified3.poll());
}

#[test]
fn notified_multi_notify_last_drop() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
let mut notified3 = spawn(async { notify.notified().await });

// add waiters by order of poll execution
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
assert_pending!(notified3.poll());

notify.notify_last();

drop(notified3);

// latest waiter added should be the one to woken up
assert_ready!(notified2.poll());
assert_pending!(notified1.poll());
}

#[test]
fn notify_in_drop_after_wake() {
use futures::task::ArcWake;
Expand Down

0 comments on commit 9e00b26

Please sign in to comment.