diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 879b89b4069..5d344f70411 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -223,7 +223,9 @@ struct Waiter { /// `Notify`, or it is exclusively owned by the enclosing `Waiter`. waker: UnsafeCell>, - /// Notification for this waiter. + /// Notification for this waiter. Uses 2 bits to store if and how was + /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and + /// the rest of it is unused. /// * if it's `None`, then `waker` is protected by the `waiters` lock. /// * if it's `Some`, then `waker` is exclusively owned by the /// enclosing `Waiter` and can be accessed without locking. @@ -253,13 +255,16 @@ generate_addr_of_methods! { } // No notification. -const NOTIFICATION_NONE: usize = 0; +const NOTIFICATION_NONE: usize = 0b000; // Notification type used by `notify_one`. -const NOTIFICATION_ONE: usize = 1; +const NOTIFICATION_ONE: usize = 0b001; + +// Notification type used by `notify_last`. +const NOTIFICATION_LAST: usize = 0b101; // Notification type used by `notify_waiters`. -const NOTIFICATION_ALL: usize = 2; +const NOTIFICATION_ALL: usize = 0b010; /// Notification for a `Waiter`. /// This struct is equivalent to `Option`, but uses @@ -275,13 +280,20 @@ impl AtomicNotification { /// Store-release a notification. /// This method should be called exactly once. fn store_release(&self, notification: Notification) { - self.0.store(notification as usize, Release); + let data: usize = match notification { + Notification::All => NOTIFICATION_ALL, + Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE, + Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST, + }; + self.0.store(data, Release); } fn load(&self, ordering: Ordering) -> Option { - match self.0.load(ordering) { + let data = self.0.load(ordering); + match data { NOTIFICATION_NONE => None, - NOTIFICATION_ONE => Some(Notification::One), + NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)), + NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)), NOTIFICATION_ALL => Some(Notification::All), _ => unreachable!(), } @@ -296,11 +308,18 @@ impl AtomicNotification { } } +#[derive(Debug, PartialEq, Eq)] +#[repr(usize)] +enum NotifyOneStrategy { + Fifo, + Lifo, +} + #[derive(Debug, PartialEq, Eq)] #[repr(usize)] enum Notification { - One = NOTIFICATION_ONE, - All = NOTIFICATION_ALL, + One(NotifyOneStrategy), + All, } /// List used in `Notify::notify_waiters`. It wraps a guarded linked list @@ -521,7 +540,7 @@ impl Notify { } } - /// Notifies a waiting task. + /// Notifies the first waiting task. /// /// If a task is currently waiting, that task is notified. Otherwise, a /// permit is stored in this `Notify` value and the **next** call to @@ -558,6 +577,23 @@ impl Notify { // Alias for old name in 0.x #[cfg_attr(docsrs, doc(alias = "notify"))] pub fn notify_one(&self) { + self.notify_with_strategy(NotifyOneStrategy::Fifo); + } + + /// Notifies the last waiting task. + /// + /// This function behaves similar to `notify_one`. The only difference is that it wakes + /// the most recently added waiter instead of the oldest waiter. + /// + /// Check the [`notify_one()`] documentation for more info and + /// examples. + /// + /// [`notify_one()`]: Notify::notify_one + pub fn notify_last(&self) { + self.notify_with_strategy(NotifyOneStrategy::Lifo); + } + + fn notify_with_strategy(&self, strategy: NotifyOneStrategy) { // Load the current state let mut curr = self.state.load(SeqCst); @@ -585,7 +621,7 @@ impl Notify { // transition out of WAITING while the lock is held. curr = self.state.load(SeqCst); - if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) { + if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) { drop(waiters); waker.wake(); } @@ -708,7 +744,12 @@ impl Default for Notify { impl UnwindSafe for Notify {} impl RefUnwindSafe for Notify {} -fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option { +fn notify_locked( + waiters: &mut WaitList, + state: &AtomicUsize, + curr: usize, + strategy: NotifyOneStrategy, +) -> Option { match get_state(curr) { EMPTY | NOTIFIED => { let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst); @@ -728,8 +769,11 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op // concurrently change as holding the lock is required to // transition **out** of `WAITING`. // - // Get a pending waiter - let waiter = waiters.pop_back().unwrap(); + // Get a pending waiter using one of the available dequeue strategies. + let waiter = match strategy { + NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(), + NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(), + }; // Safety: we never make mutable references to waiters. let waiter = unsafe { waiter.as_ref() }; @@ -738,7 +782,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; // This waiter is unlinked and will not be shared ever again, release it. - waiter.notification.store_release(Notification::One); + waiter + .notification + .store_release(Notification::One(strategy)); if waiters.is_empty() { // As this the **final** waiter in the list, the state @@ -1137,8 +1183,10 @@ impl Drop for Notified<'_> { // See if the node was notified but not received. In this case, if // the notification was triggered via `notify_one`, it must be sent // to the next waiter. - if notification == Some(Notification::One) { - if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { + if let Some(Notification::One(strategy)) = notification { + if let Some(waker) = + notify_locked(&mut waiters, ¬ify.state, notify_state, strategy) + { drop(waiters); waker.wake(); } diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index ab20292e21d..0274849b0c6 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -137,6 +137,26 @@ impl LinkedList { } } + /// Removes the first element from a list and returns it, or None if it is + /// empty. + pub(crate) fn pop_front(&mut self) -> Option { + unsafe { + let head = self.head?; + self.head = L::pointers(head).as_ref().get_next(); + + if let Some(new_head) = L::pointers(head).as_ref().get_next() { + L::pointers(new_head).as_mut().set_prev(None); + } else { + self.tail = None; + } + + L::pointers(head).as_mut().set_prev(None); + L::pointers(head).as_mut().set_next(None); + + Some(L::from_raw(head)) + } + } + /// Removes the last element from a list and returns it, or None if it is /// empty. pub(crate) fn pop_back(&mut self) -> Option { diff --git a/tokio/tests/sync_notify.rs b/tokio/tests/sync_notify.rs index 01b8ce86537..13b3f921e98 100644 --- a/tokio/tests/sync_notify.rs +++ b/tokio/tests/sync_notify.rs @@ -21,6 +21,38 @@ fn notify_notified_one() { assert_ready!(notified.poll()); } +#[test] +fn notify_multi_notified_one() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + + // add two waiters into the queue + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + + // should wakeup the first one + notify.notify_one(); + assert_ready!(notified1.poll()); + assert_pending!(notified2.poll()); +} + +#[test] +fn notify_multi_notified_last() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + + // add two waiters into the queue + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + + // should wakeup the last one + notify.notify_last(); + assert_pending!(notified1.poll()); + assert_ready!(notified2.poll()); +} + #[test] fn notified_one_notify() { let notify = Notify::new(); @@ -105,6 +137,49 @@ fn notified_multi_notify_drop_one() { assert_ready!(notified2.poll()); } +#[test] +fn notified_multi_notify_one_drop() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + let mut notified3 = spawn(async { notify.notified().await }); + + // add waiters by order of poll execution + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + assert_pending!(notified3.poll()); + + // by default fifo + notify.notify_one(); + + drop(notified1); + + // next waiter should be the one to be to woken up + assert_ready!(notified2.poll()); + assert_pending!(notified3.poll()); +} + +#[test] +fn notified_multi_notify_last_drop() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + let mut notified3 = spawn(async { notify.notified().await }); + + // add waiters by order of poll execution + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + assert_pending!(notified3.poll()); + + notify.notify_last(); + + drop(notified3); + + // latest waiter added should be the one to woken up + assert_ready!(notified2.poll()); + assert_pending!(notified1.poll()); +} + #[test] fn notify_in_drop_after_wake() { use futures::task::ArcWake;