diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9c4f0e2..8fcd2ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,8 +37,9 @@ jobs: - name: Run cargo check (without dev-dependencies to catch missing feature flags) if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - - run: cargo test --features __test --all - - run: cargo build --no-default-features --all + - run: cargo test --all + - run: cargo test --no-default-features --tests + - run: cargo build -p event-listener-strategy --no-default-features - name: Install cargo-hack uses: taiki-e/install-action@cargo-hack - run: rustup target add thumbv7m-none-eabi @@ -56,6 +57,7 @@ jobs: - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - run: cargo build --all + - run: cargo build --all --no-default-features clippy: runs-on: ubuntu-latest @@ -79,7 +81,7 @@ jobs: - uses: actions/checkout@v3 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - - run: cargo miri test --features __test --all + - run: cargo miri test --all env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout diff --git a/Cargo.toml b/Cargo.toml index f601937..6f3acdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,15 +18,11 @@ exclude = ["/.*"] default = ["std"] std = ["parking"] -# Unstable, test only feature. Do not enable this. -__test = [] - [dependencies] -crossbeam-utils = { version = "0.8.12", default-features = false } parking = { version = "2.0.0", optional = true } -slab = { version = "0.4.7", default-features = false } [dev-dependencies] +futures-lite = "1.12.0" waker-fn = "1" [dev-dependencies.criterion] diff --git a/benches/bench.rs b/benches/bench.rs index 26840e5..a3412c0 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -1,22 +1,31 @@ +use std::iter; +use std::pin::Pin; + use criterion::{criterion_group, criterion_main, Criterion}; -use event_listener::Event; +use event_listener::{Event, EventListener}; const COUNT: usize = 8000; fn bench_events(c: &mut Criterion) { c.bench_function("notify_and_wait", |b| { let ev = Event::new(); - b.iter(|| { - let mut handles = Vec::with_capacity(COUNT); + let mut handles = iter::repeat_with(|| EventListener::new(&ev)) + .take(COUNT) + .collect::>(); - for _ in 0..COUNT { - handles.push(ev.listen()); + b.iter(|| { + for handle in &mut handles { + // SAFETY: The handle is not moved out. + let listener = unsafe { Pin::new_unchecked(handle) }; + listener.listen(); } ev.notify(COUNT); - for handle in handles { - handle.wait(); + for handle in &mut handles { + // SAFETY: The handle is not moved out. + let listener = unsafe { Pin::new_unchecked(handle) }; + listener.wait(); } }); }); diff --git a/examples/mutex.rs b/examples/mutex.rs index 0d28e41..3faa341 100644 --- a/examples/mutex.rs +++ b/examples/mutex.rs @@ -63,9 +63,9 @@ impl Mutex { // Start listening and then try locking again. listener = Some(self.lock_ops.listen()); } - Some(l) => { + Some(mut l) => { // Wait until a notification is received. - l.wait(); + l.as_mut().wait(); } } } @@ -88,9 +88,9 @@ impl Mutex { // Start listening and then try locking again. listener = Some(self.lock_ops.listen()); } - Some(l) => { + Some(mut l) => { // Wait until a notification is received. - if !l.wait_deadline(deadline) { + if !l.as_mut().wait_deadline(deadline) { return None; } } diff --git a/src/inner.rs b/src/inner.rs deleted file mode 100644 index 1c5a758..0000000 --- a/src/inner.rs +++ /dev/null @@ -1,213 +0,0 @@ -//! The inner mechanism powering the `Event` type. - -use crate::list::List; -use crate::node::Node; -use crate::queue::Queue; -use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use crate::sync::cell::UnsafeCell; -use crate::Task; - -use alloc::vec; -use alloc::vec::Vec; - -use core::ops; - -/// Inner state of [`Event`]. -pub(crate) struct Inner { - /// The number of notified entries, or `usize::MAX` if all of them have been notified. - /// - /// If there are no entries, this value is set to `usize::MAX`. - pub(crate) notified: AtomicUsize, - - /// A linked list holding registered listeners. - list: Mutex, - - /// Queue of nodes waiting to be processed. - queue: Queue, -} - -impl Inner { - /// Create a new `Inner`. - pub(crate) fn new() -> Self { - Self { - notified: AtomicUsize::new(core::usize::MAX), - list: Mutex::new(List::new()), - queue: Queue::new(), - } - } - - /// Locks the list. - pub(crate) fn lock(&self) -> Option> { - self.list.try_lock().map(|guard| ListGuard { - inner: self, - guard: Some(guard), - }) - } - - /// Push a pending operation to the queue. - #[cold] - pub(crate) fn push(&self, node: Node) { - self.queue.push(node); - - // Acquire and drop the lock to make sure that the queue is flushed. - let _guard = self.lock(); - } -} - -/// The guard returned by [`Inner::lock`]. -pub(crate) struct ListGuard<'a> { - /// Reference to the inner state. - inner: &'a Inner, - - /// The locked list. - guard: Option>, -} - -impl ListGuard<'_> { - #[cold] - fn process_nodes_slow( - &mut self, - start_node: Node, - tasks: &mut Vec, - guard: &mut MutexGuard<'_, List>, - ) { - // Process the start node. - tasks.extend(start_node.apply(guard)); - - // Process all remaining nodes. - while let Some(node) = self.inner.queue.pop() { - tasks.extend(node.apply(guard)); - } - } -} - -impl ops::Deref for ListGuard<'_> { - type Target = List; - - fn deref(&self) -> &Self::Target { - self.guard.as_ref().unwrap() - } -} - -impl ops::DerefMut for ListGuard<'_> { - fn deref_mut(&mut self) -> &mut Self::Target { - self.guard.as_mut().unwrap() - } -} - -impl Drop for ListGuard<'_> { - fn drop(&mut self) { - let Self { inner, guard } = self; - let mut list = guard.take().unwrap(); - - // Tasks to wakeup after releasing the lock. - let mut tasks = vec![]; - - // Process every node left in the queue. - if let Some(start_node) = inner.queue.pop() { - self.process_nodes_slow(start_node, &mut tasks, &mut list); - } - - // Update the atomic `notified` counter. - let notified = if list.notified < list.len() { - list.notified - } else { - core::usize::MAX - }; - - self.inner.notified.store(notified, Ordering::Release); - - // Drop the actual lock. - drop(list); - - // Wakeup all tasks. - for task in tasks { - task.wake(); - } - } -} - -/// A simple mutex type that optimistically assumes that the lock is uncontended. -struct Mutex { - /// The inner value. - value: UnsafeCell, - - /// Whether the mutex is locked. - locked: AtomicBool, -} - -impl Mutex { - /// Create a new mutex. - pub(crate) fn new(value: T) -> Self { - Self { - value: UnsafeCell::new(value), - locked: AtomicBool::new(false), - } - } - - /// Lock the mutex. - pub(crate) fn try_lock(&self) -> Option> { - // Try to lock the mutex. - if self - .locked - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - { - // We have successfully locked the mutex. - Some(MutexGuard { mutex: self }) - } else { - self.try_lock_slow() - } - } - - #[cold] - fn try_lock_slow(&self) -> Option> { - // Assume that the contention is short-term. - // Spin for a while to see if the mutex becomes unlocked. - let mut spins = 100u32; - - loop { - if self - .locked - .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - { - // We have successfully locked the mutex. - return Some(MutexGuard { mutex: self }); - } - - // Use atomic loads instead of compare-exchange. - while self.locked.load(Ordering::Relaxed) { - // Return None once we've exhausted the number of spins. - spins = spins.checked_sub(1)?; - } - } - } -} - -struct MutexGuard<'a, T> { - mutex: &'a Mutex, -} - -impl<'a, T> Drop for MutexGuard<'a, T> { - fn drop(&mut self) { - self.mutex.locked.store(false, Ordering::Release); - } -} - -impl<'a, T> ops::Deref for MutexGuard<'a, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.mutex.value.get() } - } -} - -impl<'a, T> ops::DerefMut for MutexGuard<'a, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.mutex.value.get() } - } -} - -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} diff --git a/src/lib.rs b/src/lib.rs index 0459056..0868105 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,7 +48,7 @@ //! } //! //! // Start listening for events. -//! let listener = event.listen(); +//! let mut listener = event.listen(); //! //! // Check the flag again after creating the listener. //! if flag.load(Ordering::SeqCst) { @@ -56,89 +56,70 @@ //! } //! //! // Wait for a notification and continue the loop. -//! listener.wait(); +//! listener.as_mut().wait(); //! } //! ``` -#![cfg_attr(not(feature = "std"), no_std)] +#![cfg_attr(all(not(feature = "std"), not(test)), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] extern crate alloc; -#[cfg(feature = "std")] -extern crate std; - -mod inner; -mod list; -mod node; -mod queue; -mod sync; +#[cfg_attr(feature = "std", path = "std.rs")] +#[cfg_attr(not(feature = "std"), path = "no_std.rs")] +mod sys; -use alloc::sync::Arc; +use alloc::boxed::Box; +use core::borrow::Borrow; use core::fmt; use core::future::Future; -use core::mem::{self, ManuallyDrop}; -use core::num::NonZeroUsize; +use core::marker::PhantomPinned; +use core::mem::ManuallyDrop; use core::pin::Pin; use core::ptr; -use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use core::task::{Context, Poll, Waker}; -use core::usize; #[cfg(feature = "std")] -use std::panic::{RefUnwindSafe, UnwindSafe}; +use parking::{Parker, Unparker}; #[cfg(feature = "std")] use std::time::{Duration, Instant}; -use inner::Inner; -use list::{Entry, State}; -use node::{Node, TaskWaiting}; - -#[cfg(feature = "std")] -use parking::Unparker; - -/// An asynchronous waker or thread unparker that can be used to notify a task or thread. -enum Task { - /// A waker that can be used to notify a task. - Waker(Waker), - - /// An unparker that can be used to notify a thread. - #[cfg(feature = "std")] - Thread(Unparker), -} +use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use sync::{Arc, WithMut}; -impl Task { - /// Notifies the task or thread. - fn wake(self) { - match self { - Task::Waker(waker) => waker.wake(), - #[cfg(feature = "std")] - Task::Thread(unparker) => { - unparker.unpark(); - } +/// 1.39-compatible replacement for `matches!` +macro_rules! matches { + ($expr:expr, $($pattern:pat)|+ $(if $guard: expr)?) => { + match $expr { + $($pattern)|+ $(if $guard)? => true, + _ => false, } - } + }; } -/// Details of a notification. -#[derive(Copy, Clone)] -struct Notify { - /// The number of listeners to notify. - count: usize, +/// Inner state of [`Event`]. +struct Inner { + /// The number of notified entries, or `usize::MAX` if all of them have been notified. + /// + /// If there are no entries, this value is set to `usize::MAX`. + notified: AtomicUsize, - /// The notification strategy. - kind: NotifyKind, + /// Inner queue of event listeners. + /// + /// On `std` platforms, this is an intrusive linked list. On `no_std` platforms, this is a + /// more traditional `Vec` of listeners, with an atomic queue used as a backup for high + /// contention. + list: sys::List, } -/// The strategy for notifying listeners. -#[derive(Copy, Clone)] -enum NotifyKind { - /// Notify non-notified listeners. - Notify, - - /// Notify all listeners. - NotifyAdditional, +impl Inner { + fn new() -> Self { + Self { + notified: AtomicUsize::new(core::usize::MAX), + list: sys::List::new(), + } + } } /// A synchronization primitive for notifying async tasks and threads. @@ -169,10 +150,26 @@ pub struct Event { inner: AtomicPtr, } +unsafe impl Send for Event {} +unsafe impl Sync for Event {} + #[cfg(feature = "std")] -impl UnwindSafe for Event {} +impl std::panic::UnwindSafe for Event {} #[cfg(feature = "std")] -impl RefUnwindSafe for Event {} +impl std::panic::RefUnwindSafe for Event {} + +impl fmt::Debug for Event { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Pad { .. }") + } +} + +impl Default for Event { + #[inline] + fn default() -> Self { + Self::new() + } +} impl Event { /// Creates a new [`Event`]. @@ -185,15 +182,17 @@ impl Event { /// let event = Event::new(); /// ``` #[inline] - pub const fn new() -> Event { - Event { + pub const fn new() -> Self { + Self { inner: AtomicPtr::new(ptr::null_mut()), } } /// Returns a guard listening for a notification. /// - /// This method emits a `SeqCst` fence after registering a listener. + /// This method emits a `SeqCst` fence after registering a listener. For now, this method + /// is an alias for calling [`EventListener::new()`], pinning it to the heap, and then + /// inserting it into a list. /// /// # Examples /// @@ -204,39 +203,9 @@ impl Event { /// let listener = event.listen(); /// ``` #[cold] - pub fn listen(&self) -> EventListener { - let inner = self.inner(); - - // Try to acquire a lock in the inner list. - let state = { - let inner = unsafe { &*inner }; - if let Some(mut lock) = inner.lock() { - let entry = lock.insert(Entry::new()); - - ListenerState::HasNode(entry) - } else { - // Push entries into the queue indicating that we want to push a listener. - let (node, entry) = Node::listener(); - inner.push(node); - - // Indicate that there are nodes waiting to be notified. - inner - .notified - .compare_exchange(usize::MAX, 0, Ordering::AcqRel, Ordering::Relaxed) - .ok(); - - ListenerState::Queued(entry) - } - }; - - // Register the listener. - let listener = EventListener { - inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }, - state, - }; - - // Make sure the listener is registered before whatever happens next. - full_fence(); + pub fn listen(&self) -> Pin> { + let mut listener = Box::pin(EventListener::new(self)); + listener.as_mut().listen(); listener } @@ -278,14 +247,7 @@ impl Event { // Notify if there is at least one unnotified listener and the number of notified // listeners is less than `n`. if inner.notified.load(Ordering::Acquire) < n { - if let Some(mut lock) = inner.lock() { - lock.notify_unnotified(n); - } else { - inner.push(Node::Notify(Notify { - count: n, - kind: NotifyKind::Notify, - })); - } + inner.notify(n, false); } } } @@ -329,14 +291,7 @@ impl Event { // Notify if there is at least one unnotified listener and the number of notified // listeners is less than `n`. if inner.notified.load(Ordering::Acquire) < n { - if let Some(mut lock) = inner.lock() { - lock.notify_unnotified(n); - } else { - inner.push(Node::Notify(Notify { - count: n, - kind: NotifyKind::Notify, - })); - } + inner.notify(n, true); } } } @@ -378,15 +333,8 @@ impl Event { if let Some(inner) = self.try_inner() { // Notify if there is at least one unnotified listener. - if inner.notified.load(Ordering::Acquire) < usize::MAX { - if let Some(mut lock) = inner.lock() { - lock.notify_additional(n); - } else { - inner.push(Node::Notify(Notify { - count: n, - kind: NotifyKind::NotifyAdditional, - })); - } + if inner.notified.load(Ordering::Acquire) < core::usize::MAX { + inner.notify(n, true); } } } @@ -430,41 +378,35 @@ impl Event { pub fn notify_additional_relaxed(&self, n: usize) { if let Some(inner) = self.try_inner() { // Notify if there is at least one unnotified listener. - if inner.notified.load(Ordering::Acquire) < usize::MAX { - if let Some(mut lock) = inner.lock() { - lock.notify_additional(n); - } else { - inner.push(Node::Notify(Notify { - count: n, - kind: NotifyKind::NotifyAdditional, - })); - } + if inner.notified.load(Ordering::Acquire) < core::usize::MAX { + inner.notify(n, true); } } } - /// Returns a reference to the inner state if it was initialized. + /// Return a reference to the inner state if it has been initialized. #[inline] fn try_inner(&self) -> Option<&Inner> { let inner = self.inner.load(Ordering::Acquire); unsafe { inner.as_ref() } } - /// Returns a raw pointer to the inner state, initializing it if necessary. + /// Returns a raw, initialized pointer to the inner state. /// /// This returns a raw pointer instead of reference because `from_raw` - /// requires raw/mut provenance: + /// requires raw/mut provenance: . fn inner(&self) -> *const Inner { let mut inner = self.inner.load(Ordering::Acquire); - // Initialize the state if this is its first use. + // If this is the first use, initialize the state. if inner.is_null() { - // Allocate on the heap. + // Allocate the state on the heap. let new = Arc::new(Inner::new()); - // Convert the heap-allocated state into a raw pointer. + + // Convert the state to a raw pointer. let new = Arc::into_raw(new) as *mut Inner; - // Attempt to replace the null-pointer with the new state pointer. + // Replace the null pointer with the new state pointer. inner = self .inner .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire) @@ -490,26 +432,14 @@ impl Event { impl Drop for Event { #[inline] fn drop(&mut self) { - let inner: *mut Inner = *self.inner.get_mut(); - - // If the state pointer has been initialized, deallocate it. - if !inner.is_null() { - unsafe { - drop(Arc::from_raw(inner)); + self.inner.with_mut(|&mut inner| { + // If the state pointer has been initialized, drop it. + if !inner.is_null() { + unsafe { + drop(Arc::from_raw(inner)); + } } - } - } -} - -impl fmt::Debug for Event { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Event { .. }") - } -} - -impl Default for Event { - fn default() -> Event { - Event::new() + }) } } @@ -523,32 +453,39 @@ impl Default for Event { /// If a notified listener is dropped without receiving a notification, dropping will notify /// another active listener. Whether one *additional* listener will be notified depends on what /// kind of notification was delivered. -pub struct EventListener { - /// A reference to [`Event`]'s inner state. - inner: Arc, +pub struct EventListener(Listener>); - /// The current state of the listener. - state: ListenerState, +impl fmt::Debug for EventListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("EventListener { .. }") + } } -enum ListenerState { - /// The listener has a node inside of the linked list. - HasNode(NonZeroUsize), +impl EventListener { + /// Create a new `EventListener` that will wait for a notification from the given [`Event`]. + pub fn new(event: &Event) -> Self { + let inner = event.inner(); + + let listener = Listener { + event: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }, + listener: None, + _pin: PhantomPinned, + }; - /// The listener has already been notified and has discarded its entry. - Discarded, + Self(listener) + } - /// The listener has an entry in the queue that may or may not have a task waiting. - Queued(Arc), -} + /// Register this listener into the given [`Event`]. + /// + /// This method can only be called after the listener has been pinned, and must be called before + /// the listener is polled. + pub fn listen(self: Pin<&mut Self>) { + self.listener().insert(); -#[cfg(feature = "std")] -impl UnwindSafe for EventListener {} -#[cfg(feature = "std")] -impl RefUnwindSafe for EventListener {} + // Make sure the listener is registered before whatever happens next. + full_fence(); + } -#[cfg(feature = "std")] -impl EventListener { /// Blocks until a notification is received. /// /// # Examples @@ -557,16 +494,17 @@ impl EventListener { /// use event_listener::Event; /// /// let event = Event::new(); - /// let listener = event.listen(); + /// let mut listener = event.listen(); /// /// // Notify `listener`. /// event.notify(1); /// /// // Receive the notification. - /// listener.wait(); + /// listener.as_mut().wait(); /// ``` - pub fn wait(self) { - self.wait_internal(None); + #[cfg(feature = "std")] + pub fn wait(self: Pin<&mut Self>) { + self.listener().wait_internal(None); } /// Blocks until a notification is received or a timeout is reached. @@ -580,13 +518,15 @@ impl EventListener { /// use event_listener::Event; /// /// let event = Event::new(); - /// let listener = event.listen(); + /// let mut listener = event.listen(); /// /// // There are no notification so this times out. - /// assert!(!listener.wait_timeout(Duration::from_secs(1))); + /// assert!(!listener.as_mut().wait_timeout(Duration::from_secs(1))); /// ``` - pub fn wait_timeout(self, timeout: Duration) -> bool { - self.wait_internal(Some(Instant::now() + timeout)) + #[cfg(feature = "std")] + pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> bool { + self.listener() + .wait_internal(Instant::now().checked_add(timeout)) } /// Blocks until a notification is received or a deadline is reached. @@ -600,143 +540,36 @@ impl EventListener { /// use event_listener::Event; /// /// let event = Event::new(); - /// let listener = event.listen(); + /// let mut listener = event.listen(); /// /// // There are no notification so this times out. - /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1))); + /// assert!(!listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1))); /// ``` - pub fn wait_deadline(self, deadline: Instant) -> bool { - self.wait_internal(Some(deadline)) - } - - fn wait_internal(mut self, deadline: Option) -> bool { - // Take out the entry pointer and set it to `None`. - let (parker, unparker) = parking::pair(); - let entry = match self.state.take() { - ListenerState::HasNode(entry) => entry, - ListenerState::Queued(task_waiting) => { - // This listener is stuck in the backup queue. - // Wait for the task to be notified. - loop { - match task_waiting.status() { - Some(entry_id) => break entry_id, - None => { - // Register a task and park until it is notified. - task_waiting.register(Task::Thread(unparker.clone())); - - parker.park(); - } - } - } - } - ListenerState::Discarded => panic!("Cannot wait on a discarded listener"), - }; - - // Wait for the lock to be available. - let lock = || { - loop { - match self.inner.lock() { - Some(lock) => return lock, - None => { - // Wake us up when the lock is free. - let unparker = parker.unparker(); - self.inner.push(Node::Waiting(Task::Thread(unparker))); - parker.park() - } - } - } - }; - - // Set this listener's state to `Waiting`. - { - let mut list = lock(); - - // If the listener was notified, we're done. - match list.state(entry).replace(State::Notified(false)) { - State::Notified(_) => { - list.remove(entry); - return true; - } - _ => list.state(entry).set(State::Task(Task::Thread(unparker))), - } - } - - // Wait until a notification is received or the timeout is reached. - loop { - match deadline { - None => parker.park(), - - Some(deadline) => { - // Check for timeout. - let now = Instant::now(); - if now >= deadline { - // Remove the entry and check if notified. - let mut list = lock(); - let state = list.remove(entry); - return state.is_notified(); - } - - // Park until the deadline. - parker.park_timeout(deadline - now); - } - } - - let mut list = lock(); - - // Do a dummy replace operation in order to take out the state. - match list.state(entry).replace(State::Notified(false)) { - State::Notified(_) => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry); - return true; - } - // Otherwise, set the state back to `Waiting`. - state => list.state(entry).set(state), - } - } + #[cfg(feature = "std")] + pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> bool { + self.listener().wait_internal(Some(deadline)) } -} -impl EventListener { /// Drops this listener and discards its notification (if any) without notifying another /// active listener. /// - /// Returns `true` if a notification was discarded. Note that this function may spuriously - /// return `false` even if a notification was received by the listener. + /// Returns `true` if a notification was discarded. /// /// # Examples /// ``` /// use event_listener::Event; /// /// let event = Event::new(); - /// let listener1 = event.listen(); - /// let listener2 = event.listen(); + /// let mut listener1 = event.listen(); + /// let mut listener2 = event.listen(); /// /// event.notify(1); /// - /// assert!(listener1.discard()); - /// assert!(!listener2.discard()); + /// assert!(listener1.as_mut().discard()); + /// assert!(!listener2.as_mut().discard()); /// ``` - pub fn discard(mut self) -> bool { - // If this listener has never picked up a notification... - if let ListenerState::HasNode(entry) = self.state.take() { - // Remove the listener from the list and return `true` if it was notified. - if let Some(mut lock) = self.inner.lock() { - let state = lock.remove(entry); - - if let State::Notified(_) = state { - return true; - } - } else { - // Let someone else do it for us. - self.inner.push(Node::RemoveListener { - listener: entry, - propagate: false, - }); - } - } - - false + pub fn discard(self: Pin<&mut Self>) -> bool { + self.listener().discard() } /// Returns `true` if this listener listens to the given `Event`. @@ -753,7 +586,7 @@ impl EventListener { /// ``` #[inline] pub fn listens_to(&self, event: &Event) -> bool { - ptr::eq::(&*self.inner, event.inner.load(Ordering::Acquire)) + ptr::eq::(&**self.inner(), event.inner.load(Ordering::Acquire)) } /// Returns `true` if both listeners listen to the same `Event`. @@ -770,119 +603,285 @@ impl EventListener { /// assert!(listener1.same_event(&listener2)); /// ``` pub fn same_event(&self, other: &EventListener) -> bool { - ptr::eq::(&*self.inner, &*other.inner) + ptr::eq::(&**self.inner(), &**other.inner()) } -} -impl fmt::Debug for EventListener { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("EventListener { .. }") + fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener>> { + unsafe { self.map_unchecked_mut(|this| &mut this.0) } + } + + fn inner(&self) -> &Arc { + &self.0.event } } impl Future for EventListener { type Output = (); - #[allow(unreachable_patterns)] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let entry = match self.state { - ListenerState::Discarded => { - unreachable!("cannot poll a completed `EventListener` future") - } - ListenerState::HasNode(ref entry) => *entry, - ListenerState::Queued(ref task_waiting) => { - loop { - // See if the task waiting has been completed. - match task_waiting.status() { - Some(entry_id) => { - self.state = ListenerState::HasNode(entry_id); - break entry_id; - } - None => { - // If not, wait for it to complete. - task_waiting.register(Task::Waker(cx.waker().clone())); - if task_waiting.status().is_none() { - return Poll::Pending; - } - } - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.listener().poll_internal(cx) + } +} + +struct Listener + Unpin> { + /// The reference to the original event. + event: B, + + /// The inner state of the listener. + listener: Option, + + /// Enforce pinning. + _pin: PhantomPinned, +} + +unsafe impl + Unpin + Send> Send for Listener {} +unsafe impl + Unpin + Sync> Sync for Listener {} + +impl + Unpin> Listener { + /// Pin-project this listener. + fn project(self: Pin<&mut Self>) -> (&Inner, Pin<&mut Option>) { + // SAFETY: `event` is `Unpin`, and `listener`'s pin status is preserved + unsafe { + let Listener { + event, listener, .. + } = self.get_unchecked_mut(); + + ((*event).borrow(), Pin::new_unchecked(listener)) + } + } + + /// Register this listener with the event. + fn insert(self: Pin<&mut Self>) { + let (inner, listener) = self.project(); + inner.insert(listener); + } + + /// Wait until the provided deadline. + #[cfg(feature = "std")] + fn wait_internal(mut self: Pin<&mut Self>, deadline: Option) -> bool { + use std::cell::RefCell; + + std::thread_local! { + /// Cached thread-local parker/unparker pair. + static PARKER: RefCell> = RefCell::new(None); + } + + // Try to borrow the thread-local parker/unparker pair. + PARKER + .try_with({ + let this = self.as_mut(); + |parker| { + let mut pair = parker + .try_borrow_mut() + .expect("Shouldn't be able to borrow parker reentrantly"); + let (parker, unparker) = pair.get_or_insert_with(|| { + let (parker, unparker) = parking::pair(); + (parker, Task::Unparker(unparker)) + }); + + this.wait_with_parker(deadline, parker, unparker.as_task_ref()) } + }) + .unwrap_or_else(|_| { + // If the pair isn't accessible, we may be being called in a destructor. + // Just create a new pair. + let (parker, unparker) = parking::pair(); + self.wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker)) + }) + } + + /// Wait until the provided deadline using the specified parker/unparker pair. + #[cfg(feature = "std")] + fn wait_with_parker( + self: Pin<&mut Self>, + deadline: Option, + parker: &Parker, + unparker: TaskRef<'_>, + ) -> bool { + let (inner, mut listener) = self.project(); + + // Set the listener's state to `Task`. + match inner.register(listener.as_mut(), unparker) { + Some(true) => { + // We were already notified, so we don't need to park. + return true; } - }; - let mut list = match self.inner.lock() { - Some(list) => list, + + Some(false) => { + // We're now waiting for a notification. + } + None => { - // Wait for the lock to be available. - self.inner - .push(Node::Waiting(Task::Waker(cx.waker().clone()))); - - // If the lock is suddenly available, we need to poll again. - if let Some(list) = self.inner.lock() { - list - } else { - return Poll::Pending; + // We were never inserted into the list. + panic!("listener was never inserted into the list"); + } + } + + // Wait until a notification is received or the timeout is reached. + loop { + match deadline { + None => parker.park(), + + Some(deadline) => { + // Make sure we're not timed out already. + let now = Instant::now(); + if now >= deadline { + // Remove our entry and check if we were notified. + return inner + .remove(listener, false) + .expect("We never removed ourself from the list") + .is_notified(); + } } } - }; - let state = list.state(entry); - - // Do a dummy replace operation in order to take out the state. - match state.replace(State::Notified(false)) { - State::Notified(_) => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry); - drop(list); - self.state = ListenerState::Discarded; - return Poll::Ready(()); + + // See if we were notified. + if inner + .register(listener.as_mut(), unparker) + .expect("We never removed ourself from the list") + { + return true; } - State::Created => { - // If the listener was just created, put it in the `Polling` state. - state.set(State::Task(Task::Waker(cx.waker().clone()))); + } + } + + /// Drops this listener and discards its notification (if any) without notifying another + /// active listener. + fn discard(self: Pin<&mut Self>) -> bool { + let (inner, listener) = self.project(); + + inner + .remove(listener, false) + .map_or(false, |state| state.is_notified()) + } + + /// Poll this listener for a notification. + fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let (inner, mut listener) = self.project(); + + // Try to register the listener. + match inner.register(listener.as_mut(), TaskRef::Waker(cx.waker())) { + Some(true) => { + // We were already notified, so we don't need to park. + Poll::Ready(()) } - State::Task(Task::Waker(w)) => { - // If the listener was in the `Polling` state, update the waker. - if w.will_wake(cx.waker()) { - state.set(State::Task(Task::Waker(w))); - } else { - state.set(State::Task(Task::Waker(cx.waker().clone()))); - } + + Some(false) => { + // We're now waiting for a notification. + Poll::Pending } - State::Task(_) => { - unreachable!("cannot poll and wait on `EventListener` at the same time") + + None => { + // We were never inserted into the list. + panic!("listener was never inserted into the list"); } } - - Poll::Pending } } -impl Drop for EventListener { +impl + Unpin> Drop for Listener { fn drop(&mut self) { - // If this listener has never picked up a notification... - if let ListenerState::HasNode(entry) = self.state.take() { - match self.inner.lock() { - Some(mut list) => { - // But if a notification was delivered to it... - if let State::Notified(additional) = list.remove(entry) { - // Then pass it on to another active listener. - list.notify(1, additional); - } - } - None => { - // Request that someone else do it. - self.inner.push(Node::RemoveListener { - listener: entry, - propagate: true, - }); - } + // If we're being dropped, we need to remove ourself from the list. + let (inner, listener) = unsafe { Pin::new_unchecked(self).project() }; + + inner.remove(listener, true); + } +} + +/// The state of a listener. +#[derive(Debug, PartialEq)] +enum State { + /// The listener was just created. + Created, + + /// The listener has received a notification. + /// + /// The `bool` is `true` if this was an "additional" notification. + Notified(bool), + + /// A task is waiting for a notification. + Task(Task), + + /// Empty hole used to replace a notified listener. + NotifiedTaken, +} + +impl State { + fn is_notified(&self) -> bool { + matches!(self, Self::Notified(_) | Self::NotifiedTaken) + } +} + +/// A task that can be woken up. +#[derive(Debug, Clone)] +enum Task { + /// A waker that wakes up a future. + Waker(Waker), + + /// An unparker that wakes up a thread. + #[cfg(feature = "std")] + Unparker(Unparker), +} + +impl Task { + fn as_task_ref(&self) -> TaskRef<'_> { + match self { + Self::Waker(waker) => TaskRef::Waker(waker), + #[cfg(feature = "std")] + Self::Unparker(unparker) => TaskRef::Unparker(unparker), + } + } + + fn wake(self) { + match self { + Self::Waker(waker) => waker.wake(), + #[cfg(feature = "std")] + Self::Unparker(unparker) => { + unparker.unpark(); } } } } -impl ListenerState { - fn take(&mut self) -> Self { - mem::replace(self, ListenerState::Discarded) +impl PartialEq for Task { + fn eq(&self, other: &Self) -> bool { + self.as_task_ref().will_wake(other.as_task_ref()) + } +} + +/// A reference to a task. +#[derive(Clone, Copy)] +enum TaskRef<'a> { + /// A waker that wakes up a future. + Waker(&'a Waker), + + /// An unparker that wakes up a thread. + #[cfg(feature = "std")] + Unparker(&'a Unparker), +} + +impl TaskRef<'_> { + /// Tells if this task will wake up the other task. + #[allow(unreachable_patterns)] + fn will_wake(self, other: Self) -> bool { + match (self, other) { + (Self::Waker(a), Self::Waker(b)) => a.will_wake(b), + #[cfg(feature = "std")] + (Self::Unparker(_), Self::Unparker(_)) => { + // TODO: Use unreleased will_unpark API. + false + } + _ => false, + } + } + + /// Converts this task reference to a task by cloning. + fn into_task(self) -> Task { + match self { + Self::Waker(waker) => Task::Waker(waker.clone()), + #[cfg(feature = "std")] + Self::Unparker(unparker) => Task::Unparker(unparker.clone()), + } } } @@ -905,42 +904,41 @@ fn full_fence() { // The ideal solution here would be to use inline assembly, but we're instead creating a // temporary atomic variable and compare-and-exchanging its value. No sane compiler to // x86 platforms is going to optimize this away. - atomic::compiler_fence(Ordering::SeqCst); + sync::atomic::compiler_fence(Ordering::SeqCst); let a = AtomicUsize::new(0); let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst); - atomic::compiler_fence(Ordering::SeqCst); + sync::atomic::compiler_fence(Ordering::SeqCst); } else { - atomic::fence(Ordering::SeqCst); + sync::atomic::fence(Ordering::SeqCst); } } -#[cfg(any(feature = "__test", test))] -impl Event { - /// Locks the event. - /// - /// This is useful for simulating contention, but otherwise serves no other purpose for users. - /// It is used only in testing. - /// - /// This method and `EventLock` are not part of the public API. - #[doc(hidden)] - pub fn __lock_event(&self) -> EventLock<'_> { - unsafe { - EventLock { - _lock: (*self.inner()).lock().unwrap(), - } - } +/// Synchronization primitive implementation. +mod sync { + pub(super) use alloc::sync::Arc; + pub(super) use core::cell; + pub(super) use core::sync::atomic; + + #[cfg(feature = "std")] + pub(super) use std::sync::{Mutex, MutexGuard}; + + pub(super) trait WithMut { + type Output; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Output) -> R; } -} -#[cfg(any(feature = "__test", test))] -#[doc(hidden)] -pub struct EventLock<'a> { - _lock: inner::ListGuard<'a>, -} + impl WithMut for atomic::AtomicPtr { + type Output = *mut T; -#[cfg(any(feature = "__test", test))] -impl fmt::Debug for EventLock<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("EventLock { .. }") + #[inline] + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Output) -> R, + { + f(self.get_mut()) + } } } diff --git a/src/list.rs b/src/list.rs deleted file mode 100644 index 009c8e3..0000000 --- a/src/list.rs +++ /dev/null @@ -1,237 +0,0 @@ -//! The inner list of listeners. - -use crate::sync::cell::Cell; -use crate::Task; - -use core::mem; -use core::num::NonZeroUsize; - -use slab::Slab; - -/// The state of a listener. -pub(crate) enum State { - /// It has just been created. - Created, - - /// It has received a notification. - /// - /// The `bool` is `true` if this was an "additional" notification. - Notified(bool), - - /// A task is polling it. - Task(Task), -} - -impl State { - /// Returns `true` if this is the `Notified` state. - #[inline] - pub(crate) fn is_notified(&self) -> bool { - match self { - State::Notified(_) => true, - _ => false, - } - } -} - -/// An entry representing a registered listener. -pub(crate) struct Entry { - /// The state of this listener. - state: Cell, - - /// Previous entry in the linked list. - prev: Cell>, - - /// Next entry in the linked list. - next: Cell>, -} - -/// A linked list of entries. -pub(crate) struct List { - /// The raw list of entries. - entries: Slab, - - /// First entry in the list. - head: Option, - - /// Last entry in the list. - tail: Option, - - /// The first unnotified entry in the list. - start: Option, - - /// The number of notified entries in the list. - pub(crate) notified: usize, -} - -impl List { - /// Create a new, empty list. - pub(crate) fn new() -> Self { - // Create a Slab with a permanent entry occupying index 0, so that - // it is never used (and we can therefore use 0 as a sentinel value). - let mut entries = Slab::new(); - entries.insert(Entry::new()); - - Self { - entries, - head: None, - tail: None, - start: None, - notified: 0, - } - } - - /// Get the number of entries in the list. - pub(crate) fn len(&self) -> usize { - self.entries.len() - 1 - } - - /// Get the state of the entry at the given index. - pub(crate) fn state(&self, index: NonZeroUsize) -> &Cell { - &self.entries[index.get()].state - } - - /// Inserts a new entry into the list. - pub(crate) fn insert(&mut self, entry: Entry) -> NonZeroUsize { - // Replace the tail with the new entry. - let key = NonZeroUsize::new(self.entries.vacant_key()).unwrap(); - match mem::replace(&mut self.tail, Some(key)) { - None => self.head = Some(key), - Some(t) => { - self.entries[t.get()].next.set(Some(key)); - entry.prev.set(Some(t)); - } - } - - // If there were no unnotified entries, this one is the first now. - if self.start.is_none() { - self.start = self.tail; - } - - // Insert the entry into the slab. - self.entries.insert(entry); - - // Return the key. - key - } - - /// Removes an entry from the list and returns its state. - pub(crate) fn remove(&mut self, key: NonZeroUsize) -> State { - let entry = self.entries.remove(key.get()); - let prev = entry.prev.get(); - let next = entry.next.get(); - - // Unlink from the previous entry. - match prev { - None => self.head = next, - Some(p) => self.entries[p.get()].next.set(next), - } - - // Unlink from the next entry. - match next { - None => self.tail = prev, - Some(n) => self.entries[n.get()].prev.set(prev), - } - - // If this was the first unnotified entry, move the pointer to the next one. - if self.start == Some(key) { - self.start = next; - } - - // Extract the state. - let state = entry.state.replace(State::Created); - - // Update the counters. - if state.is_notified() { - self.notified = self.notified.saturating_sub(1); - } - - state - } - - /// Notifies a number of entries, either normally or as an additional notification. - #[cold] - pub(crate) fn notify(&mut self, count: usize, additional: bool) { - if additional { - self.notify_additional(count); - } else { - self.notify_unnotified(count); - } - } - - /// Notifies a number of entries. - #[cold] - pub(crate) fn notify_unnotified(&mut self, mut n: usize) { - if n <= self.notified { - return; - } - n -= self.notified; - - while n > 0 { - n -= 1; - - // Notify the first unnotified entry. - match self.start { - None => break, - Some(e) => { - // Get the entry and move the pointer forward. - let e = &self.entries[e.get()]; - self.start = e.next.get(); - - // Set the state of this entry to `Notified` and notify. - let was_notified = e.notify(false); - - // Update the counter. - self.notified += was_notified as usize; - } - } - } - } - - /// Notifies a number of additional entries. - #[cold] - pub(crate) fn notify_additional(&mut self, mut n: usize) { - while n > 0 { - n -= 1; - - // Notify the first unnotified entry. - match self.start { - None => break, - Some(e) => { - // Get the entry and move the pointer forward. - let e = &self.entries[e.get()]; - self.start = e.next.get(); - - // Set the state of this entry to `Notified` and notify. - let was_notified = e.notify(true); - - // Update the counter. - self.notified += was_notified as usize; - } - } - } - } -} - -impl Entry { - /// Create a new, empty entry. - pub(crate) fn new() -> Self { - Self { - state: Cell::new(State::Created), - prev: Cell::new(None), - next: Cell::new(None), - } - } - - /// Indicate that this entry has been notified. - #[cold] - pub(crate) fn notify(&self, additional: bool) -> bool { - match self.state.replace(State::Notified(additional)) { - State::Notified(_) => {} - State::Created => {} - State::Task(w) => w.wake(), - } - - // Return whether the notification would have had any effect. - true - } -} diff --git a/src/no_std.rs b/src/no_std.rs new file mode 100644 index 0000000..36727d2 --- /dev/null +++ b/src/no_std.rs @@ -0,0 +1,1273 @@ +//! Implementation of `event-listener` built exclusively on atomics. +//! +//! On `no_std`, we don't have access to `Mutex`, so we can't use intrusive linked lists like the `std` +//! implementation. Normally, we would use a concurrent atomic queue to store listeners, but benchmarks +//! show that using queues in this way is very slow, especially for the single threaded use-case. +//! +//! We've found that it's easier to assume that the `Event` won't be under high contention in most use +//! cases. Therefore, we use a spinlock that protects a linked list of listeners, and fall back to an +//! atomic queue if the lock is contended. Benchmarks show that this is about 20% slower than the std +//! implementation, but still much faster than using a queue. + +#[path = "no_std/node.rs"] +mod node; + +#[path = "no_std/queue.rs"] +mod queue; + +use node::{Node, TaskWaiting}; +use queue::Queue; + +use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sync::cell::{Cell, UnsafeCell}; +use crate::sync::Arc; +use crate::{State, Task, TaskRef}; + +use core::fmt; +use core::mem; +use core::num::NonZeroUsize; +use core::ops; +use core::pin::Pin; + +use alloc::vec::Vec; + +impl crate::Inner { + /// Locks the list. + fn try_lock(&self) -> Option> { + self.list.inner.try_lock().map(|guard| ListGuard { + inner: self, + guard: Some(guard), + }) + } + + /// Add a new listener to the list. + /// + /// Does nothing if the list is already registered. + pub(crate) fn insert(&self, mut listener: Pin<&mut Option>) { + if listener.as_ref().as_pin_ref().is_some() { + // Already inserted. + return; + } + + match self.try_lock() { + Some(mut lock) => { + let key = lock.insert(State::Created); + *listener = Some(Listener::HasNode(key)); + } + + None => { + // Push it to the queue. + let (node, task_waiting) = Node::listener(); + self.list.queue.push(node); + *listener = Some(Listener::Queued(task_waiting)); + } + } + } + + /// Remove a listener from the list. + pub(crate) fn remove( + &self, + mut listener: Pin<&mut Option>, + propogate: bool, + ) -> Option { + let state = match listener.as_mut().take() { + Some(Listener::HasNode(key)) => { + match self.try_lock() { + Some(mut list) => { + // Fast path removal. + list.remove(key, propogate) + } + + None => { + // Slow path removal. + // This is why intrusive lists don't work on no_std. + let node = Node::RemoveListener { + listener: key, + propagate: propogate, + }; + + self.list.queue.push(node); + + None + } + } + } + + Some(Listener::Queued(_)) => { + // This won't be added after we drop the lock. + None + } + + None => None, + }; + + state + } + + /// Notifies a number of entries. + #[cold] + pub(crate) fn notify(&self, n: usize, additional: bool) { + match self.try_lock() { + Some(mut guard) => { + // Notify the listeners. + guard.notify(n, additional); + } + + None => { + // Push it to the queue. + let node = Node::Notify { + count: n, + additional, + }; + + self.list.queue.push(node); + } + } + } + + /// Register a task to be notified when the event is triggered. + /// + /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener + /// isn't inserted, returns `None`. + pub(crate) fn register( + &self, + mut listener: Pin<&mut Option>, + task: TaskRef<'_>, + ) -> Option { + loop { + match listener.as_mut().take() { + Some(Listener::HasNode(key)) => { + *listener = Some(Listener::HasNode(key)); + match self.try_lock() { + Some(mut guard) => { + // Fast path registration. + return guard.register(listener, task); + } + + None => { + // Wait for the lock. + let node = Node::Waiting(task.into_task()); + self.list.queue.push(node); + return Some(false); + } + } + } + + Some(Listener::Queued(task_waiting)) => { + // Are we done yet? + match task_waiting.status() { + Some(key) => { + // We're inserted now, adjust state. + *listener = Some(Listener::HasNode(key)); + } + + None => { + // We're still queued, so register the task. + task_waiting.register(task.into_task()); + *listener = Some(Listener::Queued(task_waiting)); + return None; + } + } + } + + _ => return None, + } + } + } +} + +pub(crate) struct List { + /// The inner list. + inner: Mutex, + + /// The queue of pending operations. + queue: Queue, +} + +impl List { + pub(super) fn new() -> List { + List { + inner: Mutex::new(ListenerSlab::new()), + queue: Queue::new(), + } + } +} + +/// The guard returned by [`Inner::lock`]. +pub(crate) struct ListGuard<'a> { + /// Reference to the inner state. + pub(crate) inner: &'a crate::Inner, + + /// The locked list. + pub(crate) guard: Option>, +} + +impl ListGuard<'_> { + #[cold] + fn process_nodes_slow( + &mut self, + start_node: Node, + tasks: &mut Vec, + guard: &mut MutexGuard<'_, ListenerSlab>, + ) { + // Process the start node. + tasks.extend(start_node.apply(guard)); + + // Process all remaining nodes. + while let Some(node) = self.inner.list.queue.pop() { + tasks.extend(node.apply(guard)); + } + } +} + +impl ops::Deref for ListGuard<'_> { + type Target = ListenerSlab; + + fn deref(&self) -> &Self::Target { + self.guard.as_ref().unwrap() + } +} + +impl ops::DerefMut for ListGuard<'_> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.guard.as_mut().unwrap() + } +} + +impl Drop for ListGuard<'_> { + fn drop(&mut self) { + let Self { inner, guard } = self; + let mut list = guard.take().unwrap(); + + // Tasks to wakeup after releasing the lock. + let mut tasks = alloc::vec![]; + + // Process every node left in the queue. + if let Some(start_node) = inner.list.queue.pop() { + self.process_nodes_slow(start_node, &mut tasks, &mut list); + } + + // Update the atomic `notified` counter. + let notified = if list.notified < list.len { + list.notified + } else { + core::usize::MAX + }; + + self.inner.notified.store(notified, Ordering::Release); + + // Drop the actual lock. + drop(list); + + // Wakeup all tasks. + for task in tasks { + task.wake(); + } + } +} + +/// An entry representing a registered listener. +enum Entry { + /// Contains the listener state. + Listener { + /// The state of the listener. + state: Cell, + + /// The previous listener in the list. + prev: Cell>, + + /// The next listener in the list. + next: Cell>, + }, + + /// An empty slot that contains the index of the next empty slot. + Empty(NonZeroUsize), + + /// Sentinel value. + Sentinel, +} + +struct TakenState<'a> { + slot: &'a Cell, + state: State, +} + +impl Drop for TakenState<'_> { + fn drop(&mut self) { + self.slot + .set(mem::replace(&mut self.state, State::NotifiedTaken)); + } +} + +impl fmt::Debug for TakenState<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.state, f) + } +} + +impl PartialEq for TakenState<'_> { + fn eq(&self, other: &Self) -> bool { + self.state == other.state + } +} + +impl<'a> TakenState<'a> { + fn new(slot: &'a Cell) -> Self { + let state = slot.replace(State::NotifiedTaken); + Self { slot, state } + } +} + +impl fmt::Debug for Entry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Entry::Listener { state, next, prev } => f + .debug_struct("Listener") + .field("state", &TakenState::new(state)) + .field("prev", prev) + .field("next", next) + .finish(), + Entry::Empty(next) => f.debug_tuple("Empty").field(next).finish(), + Entry::Sentinel => f.debug_tuple("Sentinel").finish(), + } + } +} + +impl PartialEq for Entry { + fn eq(&self, other: &Entry) -> bool { + match (self, other) { + ( + Self::Listener { + state: state1, + prev: prev1, + next: next1, + }, + Self::Listener { + state: state2, + prev: prev2, + next: next2, + }, + ) => { + if TakenState::new(state1) != TakenState::new(state2) { + return false; + } + + prev1.get() == prev2.get() && next1.get() == next2.get() + } + (Self::Empty(next1), Self::Empty(next2)) => next1 == next2, + (Self::Sentinel, Self::Sentinel) => true, + _ => false, + } + } +} + +impl Entry { + fn state(&self) -> &Cell { + match self { + Entry::Listener { state, .. } => state, + _ => unreachable!(), + } + } + + fn prev(&self) -> &Cell> { + match self { + Entry::Listener { prev, .. } => prev, + _ => unreachable!(), + } + } + + fn next(&self) -> &Cell> { + match self { + Entry::Listener { next, .. } => next, + _ => unreachable!(), + } + } +} + +/// A linked list of entries. +pub(crate) struct ListenerSlab { + /// The raw list of entries. + listeners: Vec, + + /// First entry in the list. + head: Option, + + /// Last entry in the list. + tail: Option, + + /// The first unnotified entry in the list. + start: Option, + + /// The number of notified entries in the list. + notified: usize, + + /// The total number of listeners. + len: usize, + + /// The index of the first `Empty` entry, or the length of the list plus one if there + /// are no empty entries. + first_empty: NonZeroUsize, +} + +impl ListenerSlab { + /// Create a new, empty list. + pub(crate) fn new() -> Self { + Self { + listeners: alloc::vec![Entry::Sentinel], + head: None, + tail: None, + start: None, + notified: 0, + len: 0, + first_empty: unsafe { NonZeroUsize::new_unchecked(1) }, + } + } + + /// Inserts a new entry into the list. + pub(crate) fn insert(&mut self, state: State) -> NonZeroUsize { + // Add the new entry into the list. + let key = { + let entry = Entry::Listener { + state: Cell::new(state), + prev: Cell::new(self.tail), + next: Cell::new(None), + }; + + let key = self.first_empty; + if self.first_empty.get() == self.listeners.len() { + // No empty entries, so add a new entry. + self.listeners.push(entry); + + // SAFETY: Guaranteed to not overflow, since the Vec would have panicked already. + self.first_empty = unsafe { NonZeroUsize::new_unchecked(self.listeners.len()) }; + } else { + // There is an empty entry, so replace it. + let slot = &mut self.listeners[key.get()]; + let next = match mem::replace(slot, entry) { + Entry::Empty(next) => next, + _ => unreachable!(), + }; + + self.first_empty = next; + } + + key + }; + + // Replace the tail with the new entry. + match mem::replace(&mut self.tail, Some(key)) { + None => self.head = Some(key), + Some(tail) => { + let tail = &self.listeners[tail.get()]; + tail.next().set(Some(key)); + } + } + + // If there are no listeners that have been notified, then the new listener is the next + // listener to be notified. + if self.start.is_none() { + self.start = Some(key); + } + + // Increment the length. + self.len += 1; + + key + } + + /// Removes an entry from the list and returns its state. + pub(crate) fn remove(&mut self, key: NonZeroUsize, propogate: bool) -> Option { + let entry = &self.listeners[key.get()]; + let prev = entry.prev().get(); + let next = entry.next().get(); + + // Unlink from the previous entry. + match prev { + None => self.head = next, + Some(p) => self.listeners[p.get()].next().set(next), + } + + // Unlink from the next entry. + match next { + None => self.tail = prev, + Some(n) => self.listeners[n.get()].prev().set(prev), + } + + // If this was the first unnotified entry, move the pointer to the next one. + if self.start == Some(key) { + self.start = next; + } + + // Extract the state. + let entry = mem::replace( + &mut self.listeners[key.get()], + Entry::Empty(self.first_empty), + ); + self.first_empty = key; + + let state = match entry { + Entry::Listener { state, .. } => state.into_inner(), + _ => unreachable!(), + }; + + // Update the counters. + if state.is_notified() { + self.notified = self.notified.saturating_sub(1); + + if propogate { + // Propogate the notification to the next entry. + if let State::Notified(additional) = state { + self.notify(1, additional); + } + } + } + self.len -= 1; + + Some(state) + } + + /// Notifies a number of listeners. + #[cold] + pub(crate) fn notify(&mut self, mut n: usize, additional: bool) { + if !additional { + // Make sure we're not notifying more than we have. + if n <= self.notified { + return; + } + n -= self.notified; + } + + while n > 0 { + n -= 1; + + // Notify the next entry. + match self.start { + None => break, + + Some(e) => { + // Get the entry and move the pointer forwards. + let entry = &self.listeners[e.get()]; + self.start = entry.next().get(); + + // Set the state to `Notified` and notify. + if let State::Task(task) = entry.state().replace(State::Notified(additional)) { + task.wake(); + } + + // Bump the notified count. + self.notified += 1; + } + } + } + } + + /// Register a task to be notified when the event is triggered. + /// + /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener + /// isn't inserted, returns `None`. + pub(crate) fn register( + &mut self, + mut listener: Pin<&mut Option>, + task: TaskRef<'_>, + ) -> Option { + let key = match *listener { + Some(Listener::HasNode(key)) => key, + _ => return None, + }; + + let entry = &self.listeners[key.get()]; + + // Take the state out and check it. + match entry.state().replace(State::NotifiedTaken) { + State::Notified(_) | State::NotifiedTaken => { + // The listener was already notified, so we don't need to do anything. + self.remove(key, false)?; + *listener = None; + Some(true) + } + + State::Task(other_task) => { + // Only replace the task if it's not the same as the one we're registering. + if task.will_wake(other_task.as_task_ref()) { + entry.state().set(State::Task(other_task)); + } else { + entry.state().set(State::Task(task.into_task())); + } + + Some(false) + } + + _ => { + // Register the task. + entry.state().set(State::Task(task.into_task())); + Some(false) + } + } + } +} + +#[derive(Debug)] +pub(crate) enum Listener { + /// The listener has a node inside of the linked list. + HasNode(NonZeroUsize), + + /// The listener has an entry in the queue that may or may not have a task waiting. + Queued(Arc), +} + +impl PartialEq for Listener { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::HasNode(a), Self::HasNode(b)) => a == b, + (Self::Queued(a), Self::Queued(b)) => Arc::ptr_eq(a, b), + _ => false, + } + } +} + +/// A simple mutex type that optimistically assumes that the lock is uncontended. +pub(crate) struct Mutex { + /// The inner value. + value: UnsafeCell, + + /// Whether the mutex is locked. + locked: AtomicBool, +} + +impl Mutex { + /// Create a new mutex. + pub(crate) fn new(value: T) -> Self { + Self { + value: UnsafeCell::new(value), + locked: AtomicBool::new(false), + } + } + + /// Lock the mutex. + pub(crate) fn try_lock(&self) -> Option> { + // Try to lock the mutex. + if self + .locked + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + // We have successfully locked the mutex. + Some(MutexGuard { mutex: self }) + } else { + self.try_lock_slow() + } + } + + #[cold] + fn try_lock_slow(&self) -> Option> { + // Assume that the contention is short-term. + // Spin for a while to see if the mutex becomes unlocked. + let mut spins = 100u32; + + loop { + if self + .locked + .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + // We have successfully locked the mutex. + return Some(MutexGuard { mutex: self }); + } + + // Use atomic loads instead of compare-exchange. + while self.locked.load(Ordering::Relaxed) { + // Return None once we've exhausted the number of spins. + spins = spins.checked_sub(1)?; + } + } + } +} + +pub(crate) struct MutexGuard<'a, T> { + mutex: &'a Mutex, +} + +impl<'a, T> Drop for MutexGuard<'a, T> { + fn drop(&mut self) { + self.mutex.locked.store(false, Ordering::Release); + } +} + +impl<'a, T> ops::Deref for MutexGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.mutex.value.get() } + } +} + +impl<'a, T> ops::DerefMut for MutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.mutex.value.get() } + } +} + +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Task; + + #[test] + fn smoke_mutex() { + let mutex = Mutex::new(0); + + { + let mut guard = mutex.try_lock().unwrap(); + *guard += 1; + } + + { + let mut guard = mutex.try_lock().unwrap(); + *guard += 1; + } + + let guard = mutex.try_lock().unwrap(); + assert_eq!(*guard, 2); + } + + #[test] + fn smoke_listener_slab() { + let mut listeners = ListenerSlab::new(); + + // Insert a few listeners. + let key1 = listeners.insert(State::Created); + let key2 = listeners.insert(State::Created); + let key3 = listeners.insert(State::Created); + + assert_eq!(listeners.len, 3); + assert_eq!(listeners.notified, 0); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key1)); + assert_eq!(listeners.start, Some(key1)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(None), + next: Cell::new(Some(key2)), + } + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key1)), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + + // Remove one. + assert_eq!(listeners.remove(key2, false), Some(State::Created)); + + assert_eq!(listeners.len, 2); + assert_eq!(listeners.notified, 0); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key1)); + assert_eq!(listeners.start, Some(key1)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(None), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[2], + Entry::Empty(NonZeroUsize::new(4).unwrap()) + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key1)), + next: Cell::new(None), + } + ); + } + + #[test] + fn listener_slab_notify() { + let mut listeners = ListenerSlab::new(); + + // Insert a few listeners. + let key1 = listeners.insert(State::Created); + let key2 = listeners.insert(State::Created); + let key3 = listeners.insert(State::Created); + + // Notify one. + listeners.notify(1, true); + + assert_eq!(listeners.len, 3); + assert_eq!(listeners.notified, 1); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key1)); + assert_eq!(listeners.start, Some(key2)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Listener { + state: Cell::new(State::Notified(true)), + prev: Cell::new(None), + next: Cell::new(Some(key2)), + } + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key1)), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + + // Remove the notified listener. + assert_eq!(listeners.remove(key1, false), Some(State::Notified(true))); + + assert_eq!(listeners.len, 2); + assert_eq!(listeners.notified, 0); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key2)); + assert_eq!(listeners.start, Some(key2)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Empty(NonZeroUsize::new(4).unwrap()) + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(None), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + } + + #[test] + fn listener_slab_register() { + let woken = Arc::new(AtomicBool::new(false)); + let waker = waker_fn::waker_fn({ + let woken = woken.clone(); + move || woken.store(true, Ordering::SeqCst) + }); + + let mut listeners = ListenerSlab::new(); + + // Insert a few listeners. + let key1 = listeners.insert(State::Created); + let key2 = listeners.insert(State::Created); + let key3 = listeners.insert(State::Created); + + // Register one. + assert_eq!( + listeners.register( + Pin::new(&mut Some(Listener::HasNode(key2))), + TaskRef::Waker(&waker) + ), + Some(false) + ); + + assert_eq!(listeners.len, 3); + assert_eq!(listeners.notified, 0); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key1)); + assert_eq!(listeners.start, Some(key1)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(None), + next: Cell::new(Some(key2)), + } + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Task(Task::Waker(waker.clone()))), + prev: Cell::new(Some(key1)), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + + // Notify the listener. + listeners.notify(2, false); + + assert_eq!(listeners.len, 3); + assert_eq!(listeners.notified, 2); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key1)); + assert_eq!(listeners.start, Some(key3)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Listener { + state: Cell::new(State::Notified(false)), + prev: Cell::new(None), + next: Cell::new(Some(key2)), + } + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Notified(false)), + prev: Cell::new(Some(key1)), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + + assert!(woken.load(Ordering::SeqCst)); + assert_eq!( + listeners.register( + Pin::new(&mut Some(Listener::HasNode(key2))), + TaskRef::Waker(&waker) + ), + Some(true) + ); + } + + #[test] + fn listener_slab_notify_prop() { + let woken = Arc::new(AtomicBool::new(false)); + let waker = waker_fn::waker_fn({ + let woken = woken.clone(); + move || woken.store(true, Ordering::SeqCst) + }); + + let mut listeners = ListenerSlab::new(); + + // Insert a few listeners. + let key1 = listeners.insert(State::Created); + let key2 = listeners.insert(State::Created); + let key3 = listeners.insert(State::Created); + + // Register one. + assert_eq!( + listeners.register( + Pin::new(&mut Some(Listener::HasNode(key2))), + TaskRef::Waker(&waker) + ), + Some(false) + ); + + assert_eq!(listeners.len, 3); + assert_eq!(listeners.notified, 0); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key1)); + assert_eq!(listeners.start, Some(key1)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(None), + next: Cell::new(Some(key2)), + } + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Task(Task::Waker(waker.clone()))), + prev: Cell::new(Some(key1)), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + + // Notify the first listener. + listeners.notify(1, false); + + assert_eq!(listeners.len, 3); + assert_eq!(listeners.notified, 1); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key1)); + assert_eq!(listeners.start, Some(key2)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Listener { + state: Cell::new(State::Notified(false)), + prev: Cell::new(None), + next: Cell::new(Some(key2)), + } + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Task(Task::Waker(waker.clone()))), + prev: Cell::new(Some(key1)), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + + // Calling notify again should not change anything. + listeners.notify(1, false); + + assert_eq!(listeners.len, 3); + assert_eq!(listeners.notified, 1); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key1)); + assert_eq!(listeners.start, Some(key2)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Listener { + state: Cell::new(State::Notified(false)), + prev: Cell::new(None), + next: Cell::new(Some(key2)), + } + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Task(Task::Waker(waker.clone()))), + prev: Cell::new(Some(key1)), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + + // Remove the first listener. + assert_eq!(listeners.remove(key1, false), Some(State::Notified(false))); + + assert_eq!(listeners.len, 2); + assert_eq!(listeners.notified, 0); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key2)); + assert_eq!(listeners.start, Some(key2)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Empty(NonZeroUsize::new(4).unwrap()) + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Task(Task::Waker(waker))), + prev: Cell::new(None), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + + // Notify the second listener. + listeners.notify(1, false); + assert!(woken.load(Ordering::SeqCst)); + + assert_eq!(listeners.len, 2); + assert_eq!(listeners.notified, 1); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key2)); + assert_eq!(listeners.start, Some(key3)); + assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Empty(NonZeroUsize::new(4).unwrap()) + ); + assert_eq!( + listeners.listeners[2], + Entry::Listener { + state: Cell::new(State::Notified(false)), + prev: Cell::new(None), + next: Cell::new(Some(key3)), + } + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Created), + prev: Cell::new(Some(key2)), + next: Cell::new(None), + } + ); + + // Remove and propogate the second listener. + assert_eq!(listeners.remove(key2, true), Some(State::Notified(false))); + + // The third listener should be notified. + assert_eq!(listeners.len, 1); + assert_eq!(listeners.notified, 1); + assert_eq!(listeners.tail, Some(key3)); + assert_eq!(listeners.head, Some(key3)); + assert_eq!(listeners.start, None); + assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap()); + assert_eq!(listeners.listeners[0], Entry::Sentinel); + assert_eq!( + listeners.listeners[1], + Entry::Empty(NonZeroUsize::new(4).unwrap()) + ); + assert_eq!( + listeners.listeners[2], + Entry::Empty(NonZeroUsize::new(1).unwrap()) + ); + assert_eq!( + listeners.listeners[3], + Entry::Listener { + state: Cell::new(State::Notified(false)), + prev: Cell::new(None), + next: Cell::new(None), + } + ); + + // Remove the third listener. + assert_eq!(listeners.remove(key3, false), Some(State::Notified(false))); + } + + #[test] + fn uncontended_inner() { + let inner = crate::Inner::new(); + + // Register two listeners. + let (mut listener1, mut listener2, mut listener3) = (None, None, None); + inner.insert(Pin::new(&mut listener1)); + inner.insert(Pin::new(&mut listener2)); + inner.insert(Pin::new(&mut listener3)); + + assert_eq!( + listener1, + Some(Listener::HasNode(NonZeroUsize::new(1).unwrap())) + ); + assert_eq!( + listener2, + Some(Listener::HasNode(NonZeroUsize::new(2).unwrap())) + ); + + // Register a waker in the second listener. + let woken = Arc::new(AtomicBool::new(false)); + let waker = waker_fn::waker_fn({ + let woken = woken.clone(); + move || woken.store(true, Ordering::SeqCst) + }); + assert_eq!( + inner.register(Pin::new(&mut listener2), TaskRef::Waker(&waker)), + Some(false) + ); + + // Notify the first listener. + inner.notify(1, false); + assert!(!woken.load(Ordering::SeqCst)); + + // Another notify should do nothing. + inner.notify(1, false); + assert!(!woken.load(Ordering::SeqCst)); + + // Receive the notification. + assert_eq!( + inner.register(Pin::new(&mut listener1), TaskRef::Waker(&waker)), + Some(true) + ); + + // First listener is already removed. + assert!(listener1.is_none()); + + // Notify the second listener. + inner.notify(1, false); + assert!(woken.load(Ordering::SeqCst)); + + // Remove the second listener and propogate the notification. + assert_eq!( + inner.remove(Pin::new(&mut listener2), true), + Some(State::Notified(false)) + ); + + // Second listener is already removed. + assert!(listener2.is_none()); + + // Third listener should be notified. + assert_eq!( + inner.register(Pin::new(&mut listener3), TaskRef::Waker(&waker)), + Some(true) + ); + } +} diff --git a/src/node.rs b/src/no_std/node.rs similarity index 60% rename from src/node.rs rename to src/no_std/node.rs index 1d39762..feba69c 100644 --- a/src/node.rs +++ b/src/no_std/node.rs @@ -1,12 +1,16 @@ +//! An operation that can be delayed. + //! The node that makes up queues. -use crate::list::{Entry, List, State}; -use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use crate::sync::Arc; -use crate::{Notify, NotifyKind, Task}; +use crate::sys::ListenerSlab; +use crate::{State, Task}; + +use alloc::boxed::Box; use core::num::NonZeroUsize; -use crossbeam_utils::atomic::AtomicCell; +use core::ptr; /// A node in the backup queue. pub(crate) enum Node { @@ -19,7 +23,13 @@ pub(crate) enum Node { }, /// This node is notifying a listener. - Notify(Notify), + Notify { + /// The number of listeners to notify. + count: usize, + + /// Whether to wake up notified listeners. + additional: bool, + }, /// This node is removing a listener. RemoveListener { @@ -34,9 +44,10 @@ pub(crate) enum Node { Waiting(Task), } +#[derive(Debug)] pub(crate) struct TaskWaiting { /// The task that is being waited on. - task: AtomicCell>, + task: AtomicCell, /// The ID of the new entry. /// @@ -48,7 +59,7 @@ impl Node { pub(crate) fn listener() -> (Self, Arc) { // Create a new `TaskWaiting` structure. let task_waiting = Arc::new(TaskWaiting { - task: AtomicCell::new(None), + task: AtomicCell::new(), entry_id: AtomicUsize::new(0), }); @@ -61,35 +72,27 @@ impl Node { } /// Apply the node to the list. - pub(crate) fn apply(self, list: &mut List) -> Option { + pub(super) fn apply(self, list: &mut ListenerSlab) -> Option { match self { Node::AddListener { task_waiting } => { // Add a new entry to the list. - let entry = Entry::new(); - let key = list.insert(entry); + let key = list.insert(State::Created); // Send the new key to the listener and wake it if necessary. task_waiting.entry_id.store(key.get(), Ordering::Release); - return task_waiting.task.take(); + + return task_waiting.task.take().map(|t| *t); } - Node::Notify(Notify { count, kind }) => { - // Notify the listener. - match kind { - NotifyKind::Notify => list.notify_unnotified(count), - NotifyKind::NotifyAdditional => list.notify_additional(count), - } + Node::Notify { count, additional } => { + // Notify the next `count` listeners. + list.notify(count, additional); } Node::RemoveListener { listener, propagate, } => { // Remove the listener from the list. - let state = list.remove(listener); - - if let (true, State::Notified(additional)) = (propagate, state) { - // Propagate the notification to the next listener. - list.notify(1, additional); - } + list.remove(listener, propagate); } Node::Waiting(task) => { return Some(task); @@ -111,7 +114,7 @@ impl TaskWaiting { /// Register a listener. pub(crate) fn register(&self, task: Task) { // Set the task. - if let Some(task) = self.task.swap(Some(task)) { + if let Some(task) = self.task.replace(Some(Box::new(task))) { task.wake(); } @@ -122,3 +125,39 @@ impl TaskWaiting { } } } + +/// A shared pointer to a value. +/// +/// The inner value is a `Box`. +#[derive(Debug)] +struct AtomicCell(AtomicPtr); + +impl AtomicCell { + /// Create a new `AtomicCell`. + fn new() -> Self { + Self(AtomicPtr::new(ptr::null_mut())) + } + + /// Swap the value out. + fn replace(&self, value: Option>) -> Option> { + let value = value.map_or(ptr::null_mut(), |value| Box::into_raw(value)); + let old_value = self.0.swap(value, Ordering::SeqCst); + + if old_value.is_null() { + None + } else { + Some(unsafe { Box::from_raw(old_value) }) + } + } + + /// Take the value out. + fn take(&self) -> Option> { + self.replace(None) + } +} + +impl Drop for AtomicCell { + fn drop(&mut self) { + self.take(); + } +} diff --git a/src/no_std/queue.rs b/src/no_std/queue.rs new file mode 100644 index 0000000..1d676a1 --- /dev/null +++ b/src/no_std/queue.rs @@ -0,0 +1,221 @@ +//! An atomic queue of operations to process. + +use super::node::Node; +use crate::sync::atomic::{AtomicPtr, Ordering}; + +use alloc::boxed::Box; +use core::ptr; + +/// An naive atomic queue of operations to process. +pub(super) struct Queue { + /// The head of the queue. + head: AtomicPtr, + + /// The tail of the queue. + tail: AtomicPtr, +} + +struct Link { + /// The inner node. + node: Node, + + /// The next node in the queue. + next: AtomicPtr, +} + +impl Queue { + /// Create a new, empty queue. + pub(super) fn new() -> Self { + Self { + head: AtomicPtr::new(ptr::null_mut()), + tail: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Push a new node onto the queue. + pub(super) fn push(&self, node: Node) { + // Allocate a new link. + let link = Box::into_raw(Box::new(Link { + node, + next: AtomicPtr::new(ptr::null_mut()), + })); + + // Push the link onto the queue. + let mut tail = self.tail.load(Ordering::Acquire); + loop { + // If the tail is null, then the queue is empty. + if tail.is_null() { + // Try to set the head to the new link. + if self + .head + .compare_exchange(ptr::null_mut(), link, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + // We successfully set the head, so we can set the tail. + self.tail.store(link, Ordering::Release); + return; + } + + // The head was set by another thread, so we need to try again. + tail = self.tail.load(Ordering::Acquire); + } + + unsafe { + // Try to set the next pointer of the current tail. + let next = (*tail).next.load(Ordering::Acquire); + if next.is_null() + && (*tail) + .next + .compare_exchange( + ptr::null_mut(), + link, + Ordering::AcqRel, + Ordering::Acquire, + ) + .is_ok() + { + // We successfully set the next pointer, so we can set the tail. + self.tail.store(link, Ordering::Release); + return; + } + } + + // The next pointer was set by another thread, so we need to try again. + tail = self.tail.load(Ordering::Acquire); + } + } + + /// Pop a node from the queue. + pub(super) fn pop(&self) -> Option { + // Pop the head of the queue. + let mut head = self.head.load(Ordering::Acquire); + loop { + // If the head is null, then the queue is empty. + if head.is_null() { + return None; + } + + unsafe { + // Try to set the head to the next pointer. + let next = (*head).next.load(Ordering::Acquire); + if self + .head + .compare_exchange(head, next, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + // We successfully set the head, so we can set the tail if the queue is now empty. + if next.is_null() { + self.tail.store(ptr::null_mut(), Ordering::Release); + } + + // Return the popped node. + let boxed = Box::from_raw(head); + return Some(boxed.node); + } + } + + // The head was set by another thread, so we need to try again. + head = self.head.load(Ordering::Acquire); + } + } +} + +impl Drop for Queue { + fn drop(&mut self) { + // Pop all nodes from the queue. + while self.pop().is_some() {} + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn node_from_num(num: usize) -> Node { + Node::Notify { + count: num, + additional: true, + } + } + + fn node_to_num(node: Node) -> usize { + match node { + Node::Notify { + count, + additional: true, + } => count, + _ => panic!("unexpected node"), + } + } + + #[test] + fn push_pop() { + let queue = Queue::new(); + + queue.push(node_from_num(1)); + queue.push(node_from_num(2)); + queue.push(node_from_num(3)); + + assert_eq!(node_to_num(queue.pop().unwrap()), 1); + assert_eq!(node_to_num(queue.pop().unwrap()), 2); + assert_eq!(node_to_num(queue.pop().unwrap()), 3); + assert!(queue.pop().is_none()); + } + + #[test] + fn push_pop_many() { + const COUNT: usize = if cfg!(miri) { 10 } else { 1_000 }; + + for i in 0..COUNT { + let queue = Queue::new(); + + for j in 0..i { + queue.push(node_from_num(j)); + } + + for j in 0..i { + assert_eq!(node_to_num(queue.pop().unwrap()), j); + } + + assert!(queue.pop().is_none()); + } + } + + #[cfg(not(miri))] + #[test] + fn push_pop_many_threads() { + use crate::sync::Arc; + + const NUM_THREADS: usize = 3; + const COUNT: usize = 50; + + let mut handles = Vec::new(); + let queue = Arc::new(Queue::new()); + + for _ in 0..NUM_THREADS { + let queue = queue.clone(); + + handles.push(std::thread::spawn(move || { + for i in 0..COUNT { + queue.push(node_from_num(i)); + } + })); + } + + for handle in handles { + handle.join().unwrap(); + } + + let mut items = Vec::new(); + while let Some(node) = queue.pop() { + items.push(node_to_num(node)); + } + + items.sort_unstable(); + for i in 0..COUNT { + for j in 0..NUM_THREADS { + assert_eq!(items[i * NUM_THREADS + j], i); + } + } + } +} diff --git a/src/queue.rs b/src/queue.rs deleted file mode 100644 index 7b61700..0000000 --- a/src/queue.rs +++ /dev/null @@ -1,113 +0,0 @@ -//! The queue of nodes that keeps track of pending operations. - -use crate::node::Node; -use crate::sync::atomic::{AtomicPtr, Ordering}; - -use crossbeam_utils::CachePadded; - -use alloc::boxed::Box; -use core::ptr; - -/// A queue of nodes. -pub(crate) struct Queue { - /// The head of the queue. - head: CachePadded>, - - /// The tail of the queue. - tail: CachePadded>, -} - -/// A single node in the `Queue`. -struct QueueNode { - /// The next node in the queue. - next: AtomicPtr, - - /// Associated node data. - node: Node, -} - -impl Queue { - /// Create a new queue. - pub(crate) fn new() -> Self { - Self { - head: CachePadded::new(AtomicPtr::new(ptr::null_mut())), - tail: CachePadded::new(AtomicPtr::new(ptr::null_mut())), - } - } - - /// Push a node to the tail end of the queue. - pub(crate) fn push(&self, node: Node) { - let node = Box::into_raw(Box::new(QueueNode { - next: AtomicPtr::new(ptr::null_mut()), - node, - })); - - // Push the node to the tail end of the queue. - let mut tail = self.tail.load(Ordering::Relaxed); - - // Get the next() pointer we have to overwrite. - let next_ptr = if tail.is_null() { - &self.head - } else { - unsafe { &(*tail).next } - }; - - loop { - match next_ptr.compare_exchange( - ptr::null_mut(), - node, - Ordering::Release, - Ordering::Relaxed, - ) { - Ok(_) => { - // Either set the tail to the new node, or let whoever beat us have it - let _ = self.tail.compare_exchange( - tail, - node, - Ordering::Release, - Ordering::Relaxed, - ); - - return; - } - Err(next) => tail = next, - } - } - } - - /// Pop the oldest node from the head of the queue. - pub(crate) fn pop(&self) -> Option { - let mut head = self.head.load(Ordering::Relaxed); - - loop { - if head.is_null() { - return None; - } - - let next = unsafe { (*head).next.load(Ordering::Relaxed) }; - - match self - .head - .compare_exchange(head, next, Ordering::Release, Ordering::Relaxed) - { - Ok(_) => { - // We have successfully popped the head of the queue. - let node = unsafe { Box::from_raw(head) }; - - // If next is also null, set the tail to null as well. - if next.is_null() { - let _ = self.tail.compare_exchange( - head, - ptr::null_mut(), - Ordering::Release, - Ordering::Relaxed, - ); - } - - return Some(node.node); - } - Err(h) => head = h, - } - } - } -} diff --git a/src/std.rs b/src/std.rs new file mode 100644 index 0000000..f317c18 --- /dev/null +++ b/src/std.rs @@ -0,0 +1,376 @@ +//! libstd-based implementation of `event-listener`. +//! +//! This implementation crates an intrusive linked list of listeners. + +use crate::sync::atomic::Ordering; +use crate::sync::cell::{Cell, UnsafeCell}; +use crate::sync::{Mutex, MutexGuard}; +use crate::{State, TaskRef}; + +use core::marker::PhantomPinned; +use core::mem; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::ptr::NonNull; + +pub(super) struct List(Mutex); + +struct Inner { + /// The head of the linked list. + head: Option>, + + /// The tail of the linked list. + tail: Option>, + + /// The first unnotified listener. + next: Option>, + + /// Total number of listeners. + len: usize, + + /// The number of notified listeners. + notified: usize, +} + +impl List { + /// Create a new, empty event listener list. + pub(super) fn new() -> Self { + Self(Mutex::new(Inner { + head: None, + tail: None, + next: None, + len: 0, + notified: 0, + })) + } +} + +impl crate::Inner { + fn lock(&self) -> ListLock<'_, '_> { + ListLock { + inner: self, + lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()), + } + } + + /// Add a new listener to the list. + /// + /// Does nothing is the listener is already registered. + pub(crate) fn insert(&self, listener: Pin<&mut Option>) { + let mut inner = self.lock(); + + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { + // SAFETY: We never move out the `link` field. + let listener = match listener.get_unchecked_mut() { + listener @ None => { + // TODO: Use Option::insert once the MSRV is high enough. + *listener = Some(Listener { + link: UnsafeCell::new(Link { + state: Cell::new(State::Created), + prev: Cell::new(inner.tail), + next: Cell::new(None), + }), + _pin: PhantomPinned, + }); + + listener.as_mut().unwrap() + } + Some(_) => return, + }; + + // Get the inner pointer. + &*listener.link.get() + }; + + // Replace the tail with the new entry. + match mem::replace(&mut inner.tail, Some(entry.into())) { + None => inner.head = Some(entry.into()), + Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, + }; + + // If there are no unnotified entries, this is the first one. + if inner.next.is_none() { + inner.next = inner.tail; + } + + // Bump the entry count. + inner.len += 1; + } + + /// Remove a listener from the list. + pub(crate) fn remove( + &self, + listener: Pin<&mut Option>, + propogate: bool, + ) -> Option { + self.lock().remove(listener, propogate) + } + + /// Notifies a number of entries. + #[cold] + pub(crate) fn notify(&self, n: usize, additional: bool) { + self.lock().notify(n, additional) + } + + /// Register a task to be notified when the event is triggered. + /// + /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener + /// isn't inserted, returns `None`. + pub(crate) fn register( + &self, + mut listener: Pin<&mut Option>, + task: TaskRef<'_>, + ) -> Option { + let mut inner = self.lock(); + + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { + // SAFETY: We never move out the `link` field. + let listener = listener.as_mut().get_unchecked_mut().as_mut()?; + &*listener.link.get() + }; + + // Take out the state and check it. + match entry.state.replace(State::NotifiedTaken) { + State::Notified(_) => { + // We have been notified, remove the listener. + inner.remove(listener, false); + Some(true) + } + + State::Task(other_task) => { + // Only replace the task if it's different. + entry.state.set(State::Task({ + if !task.will_wake(other_task.as_task_ref()) { + task.into_task() + } else { + other_task + } + })); + + Some(false) + } + + _ => { + // We have not been notified, register the task. + entry.state.set(State::Task(task.into_task())); + Some(false) + } + } + } +} + +impl Inner { + fn remove( + &mut self, + mut listener: Pin<&mut Option>, + propogate: bool, + ) -> Option { + let entry = unsafe { + // SAFETY: We never move out the `link` field. + let listener = listener.as_mut().get_unchecked_mut().as_mut()?; + + // Get the inner pointer. + &*listener.link.get() + }; + + let prev = entry.prev.get(); + let next = entry.next.get(); + + // Unlink from the previous entry. + match prev { + None => self.head = next, + Some(p) => unsafe { + p.as_ref().next.set(next); + }, + } + + // Unlink from the next entry. + match next { + None => self.tail = prev, + Some(n) => unsafe { + n.as_ref().prev.set(prev); + }, + } + + // If this was the first unnotified entry, update the next pointer. + if self.next == Some(entry.into()) { + self.next = next; + } + + // The entry is now fully unlinked, so we can now take it out safely. + let entry = unsafe { + listener + .get_unchecked_mut() + .take() + .unwrap() + .link + .into_inner() + }; + + let state = entry.state.into_inner(); + + // Update the notified count. + if state.is_notified() { + self.notified -= 1; + + if propogate { + if let State::Notified(additional) = state { + self.notify(1, additional); + } + } + } + self.len -= 1; + + Some(state) + } + + #[cold] + fn notify(&mut self, mut n: usize, additional: bool) { + if !additional { + // Make sure we're not notifying more than we have. + if n <= self.notified { + return; + } + n -= self.notified; + } + + while n > 0 { + n -= 1; + + // Notify the next entry. + match self.next { + None => break, + + Some(e) => { + // Get the entry and move the pointer forwards. + let entry = unsafe { e.as_ref() }; + self.next = entry.next.get(); + + // Set the state to `Notified` and notify. + if let State::Task(task) = entry.state.replace(State::Notified(additional)) { + task.wake(); + } + + // Bump the notified count. + self.notified += 1; + } + } + } + } +} + +struct ListLock<'a, 'b> { + lock: MutexGuard<'a, Inner>, + inner: &'b crate::Inner, +} + +impl Deref for ListLock<'_, '_> { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + &self.lock + } +} + +impl DerefMut for ListLock<'_, '_> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.lock + } +} + +impl Drop for ListLock<'_, '_> { + fn drop(&mut self) { + let list = &mut **self; + + // Update the notified count. + let notified = if list.notified < list.len { + list.notified + } else { + core::usize::MAX + }; + + self.inner.notified.store(notified, Ordering::Release); + } +} + +pub(crate) struct Listener { + /// The inner link in the linked list. + /// + /// # Safety + /// + /// This can only be accessed while the central mutex is locked. + link: UnsafeCell, + + /// This listener cannot be moved after being pinned. + _pin: PhantomPinned, +} + +struct Link { + /// The current state of the listener. + state: Cell, + + /// The previous link in the linked list. + prev: Cell>>, + + /// The next link in the linked list. + next: Cell>>, +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_lite::pin; + + macro_rules! make_listeners { + ($($id:ident),*) => { + $( + let $id = Option::::None; + pin!($id); + )* + }; + } + + #[test] + fn insert() { + let inner = crate::Inner::new(); + make_listeners!(listen1, listen2, listen3); + + // Register the listeners. + inner.insert(listen1.as_mut()); + inner.insert(listen2.as_mut()); + inner.insert(listen3.as_mut()); + + assert_eq!(inner.lock().len, 3); + + // Remove one. + assert_eq!(inner.remove(listen2, false), Some(State::Created)); + assert_eq!(inner.lock().len, 2); + + // Remove another. + assert_eq!(inner.remove(listen1, false), Some(State::Created)); + assert_eq!(inner.lock().len, 1); + } + + #[test] + fn drop_non_notified() { + let inner = crate::Inner::new(); + make_listeners!(listen1, listen2, listen3); + + // Register the listeners. + inner.insert(listen1.as_mut()); + inner.insert(listen2.as_mut()); + inner.insert(listen3.as_mut()); + + // Notify one. + inner.notify(1, false); + + // Remove one. + inner.remove(listen3, true); + + // Remove the rest. + inner.remove(listen1, true); + inner.remove(listen2, true); + } +} diff --git a/src/sync.rs b/src/sync.rs deleted file mode 100644 index 267dc6a..0000000 --- a/src/sync.rs +++ /dev/null @@ -1,7 +0,0 @@ -//! Implementation of synchronization primitives. - -// TODO: portable_atomic or loom implementations - -pub use alloc::sync::Arc; -pub use core::cell; -pub use core::sync::atomic; diff --git a/strategy/src/lib.rs b/strategy/src/lib.rs index 027f202..9bfff2b 100644 --- a/strategy/src/lib.rs +++ b/strategy/src/lib.rs @@ -27,28 +27,22 @@ //! event.notify(1); //! }); //! -//! WaitThreeSeconds { listener: Some(listener) } +//! WaitThreeSeconds { listener } //! } //! //! struct WaitThreeSeconds { -//! listener: Option, +//! listener: Pin>, //! } //! //! impl EventListenerFuture for WaitThreeSeconds { //! type Output = (); //! -//! fn poll_with_strategy( -//! mut self: Pin<&mut Self>, +//! fn poll_with_strategy<'a, S: Strategy<'a>>( +//! mut self: Pin<&'a mut Self>, //! strategy: &mut S, //! context: &mut S::Context, //! ) -> Poll { -//! match strategy.poll(self.listener.take().unwrap(), context) { -//! Ok(()) => Poll::Ready(()), -//! Err(listener) => { -//! self.listener = Some(listener); -//! Poll::Pending -//! } -//! } +//! strategy.poll(self.listener.as_mut(), context) //! } //! } //! @@ -95,7 +89,7 @@ pub use pin_project_lite::pin_project; /// impl EventListenerFuture for MyFuture { /// type Output = (); /// -/// fn poll_with_strategy( +/// fn poll_with_strategy<'a, S: Strategy<'a>>( /// self: Pin<&mut Self>, /// strategy: &mut S, /// context: &mut S::Context, @@ -199,8 +193,8 @@ pub trait EventListenerFuture { /// /// This function should use the `Strategy::poll` method to poll the future, and proceed /// based on the result. - fn poll_with_strategy( - self: Pin<&mut Self>, + fn poll_with_strategy<'a, S: Strategy<'a>>( + self: Pin<&'a mut Self>, strategy: &mut S, context: &mut S::Context, ) -> Poll; @@ -314,42 +308,43 @@ impl Future for FutureWrapper { /// ``` /// use event_listener::{Event, EventListener}; /// use event_listener_strategy::{EventListenerFuture, Strategy, Blocking, NonBlocking}; +/// use std::pin::Pin; /// -/// async fn wait_on(evl: EventListener, strategy: &mut S) { +/// async fn wait_on<'a, S: Strategy<'a>>(evl: Pin<&'a mut EventListener>, strategy: &mut S) { /// strategy.wait(evl).await; /// } /// /// # futures_lite::future::block_on(async { /// // Block on the future. /// let ev = Event::new(); -/// let listener = ev.listen(); +/// let mut listener = ev.listen(); /// ev.notify(1); /// -/// wait_on(listener, &mut Blocking::default()).await; +/// wait_on(listener.as_mut(), &mut Blocking::default()).await; /// /// // Poll the future. -/// let listener = ev.listen(); +/// listener.as_mut().listen(); /// ev.notify(1); /// -/// wait_on(listener, &mut NonBlocking::default()).await; +/// wait_on(listener.as_mut(), &mut NonBlocking::default()).await; /// # }); /// ``` -pub trait Strategy { +pub trait Strategy<'a> { /// The context needed to poll the future. type Context: ?Sized; /// The future returned by the [`Strategy::wait`] method. - type Future: Future; + type Future: Future + 'a; /// Poll the event listener until it is ready. fn poll( &mut self, - event_listener: EventListener, + event_listener: Pin<&mut EventListener>, context: &mut Self::Context, - ) -> Result<(), EventListener>; + ) -> Poll<()>; /// Wait for the event listener to become ready. - fn wait(&mut self, evl: EventListener) -> Self::Future; + fn wait(&mut self, evl: Pin<&'a mut EventListener>) -> Self::Future; } /// A strategy that uses polling to efficiently wait for an event. @@ -358,25 +353,22 @@ pub struct NonBlocking<'a> { _marker: PhantomData>, } -impl<'a> Strategy for NonBlocking<'a> { +impl<'a, 'evl> Strategy<'evl> for NonBlocking<'a> { type Context = Context<'a>; - type Future = EventListener; + type Future = Pin<&'evl mut EventListener>; #[inline] - fn wait(&mut self, evl: EventListener) -> Self::Future { + fn wait(&mut self, evl: Pin<&'evl mut EventListener>) -> Self::Future { evl } #[inline] fn poll( &mut self, - mut event_listener: EventListener, + event_listener: Pin<&mut EventListener>, context: &mut Self::Context, - ) -> Result<(), EventListener> { - match Pin::new(&mut event_listener).poll(context) { - Poll::Ready(()) => Ok(()), - Poll::Pending => Err(event_listener), - } + ) -> Poll<()> { + event_listener.poll(context) } } @@ -388,12 +380,12 @@ pub struct Blocking { } #[cfg(feature = "std")] -impl Strategy for Blocking { +impl<'evl> Strategy<'evl> for Blocking { type Context = (); type Future = Ready; #[inline] - fn wait(&mut self, evl: EventListener) -> Self::Future { + fn wait(&mut self, evl: Pin<&'evl mut EventListener>) -> Self::Future { evl.wait(); Ready { _private: () } } @@ -401,11 +393,11 @@ impl Strategy for Blocking { #[inline] fn poll( &mut self, - event_listener: EventListener, + event_listener: Pin<&mut EventListener>, _context: &mut Self::Context, - ) -> Result<(), EventListener> { + ) -> Poll<()> { event_listener.wait(); - Ok(()) + Poll::Ready(()) } } diff --git a/tests/notify.rs b/tests/notify.rs index e725db5..520a087 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -7,11 +7,9 @@ use std::usize; use event_listener::{Event, EventListener}; use waker_fn::waker_fn; -fn is_notified(listener: &mut EventListener) -> bool { +fn is_notified(listener: Pin<&mut EventListener>) -> bool { let waker = waker_fn(|| ()); - Pin::new(listener) - .poll(&mut Context::from_waker(&waker)) - .is_ready() + listener.poll(&mut Context::from_waker(&waker)).is_ready() } #[test] @@ -22,16 +20,16 @@ fn notify() { let mut l2 = event.listen(); let mut l3 = event.listen(); - assert!(!is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); + assert!(!is_notified(l1.as_mut())); + assert!(!is_notified(l2.as_mut())); + assert!(!is_notified(l3.as_mut())); event.notify(2); event.notify(1); - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); + assert!(is_notified(l1.as_mut())); + assert!(is_notified(l2.as_mut())); + assert!(!is_notified(l3.as_mut())); } #[test] @@ -46,9 +44,9 @@ fn notify_additional() { event.notify(1); event.notify_additional(1); - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); + assert!(is_notified(l1.as_mut())); + assert!(is_notified(l2.as_mut())); + assert!(!is_notified(l3.as_mut())); } #[test] @@ -58,15 +56,15 @@ fn notify_one() { let mut l1 = event.listen(); let mut l2 = event.listen(); - assert!(!is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); + assert!(!is_notified(l1.as_mut())); + assert!(!is_notified(l2.as_mut())); event.notify(1); - assert!(is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); + assert!(is_notified(l1.as_mut())); + assert!(!is_notified(l2.as_mut())); event.notify(1); - assert!(is_notified(&mut l2)); + assert!(is_notified(l2.as_mut())); } #[test] @@ -76,12 +74,12 @@ fn notify_all() { let mut l1 = event.listen(); let mut l2 = event.listen(); - assert!(!is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); + assert!(!is_notified(l1.as_mut())); + assert!(!is_notified(l2.as_mut())); event.notify(usize::MAX); - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); + assert!(is_notified(l1.as_mut())); + assert!(is_notified(l2.as_mut())); } #[test] @@ -94,8 +92,8 @@ fn drop_notified() { event.notify(1); drop(l1); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); + assert!(is_notified(l2.as_mut())); + assert!(!is_notified(l3.as_mut())); } #[test] @@ -108,8 +106,8 @@ fn drop_notified2() { event.notify(2); drop(l1); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); + assert!(is_notified(l2.as_mut())); + assert!(!is_notified(l3.as_mut())); } #[test] @@ -124,9 +122,9 @@ fn drop_notified_additional() { event.notify_additional(1); event.notify(2); drop(l1); - assert!(is_notified(&mut l2)); - assert!(is_notified(&mut l3)); - assert!(!is_notified(&mut l4)); + assert!(is_notified(l2.as_mut())); + assert!(is_notified(l3.as_mut())); + assert!(!is_notified(l4.as_mut())); } #[test] @@ -139,8 +137,8 @@ fn drop_non_notified() { event.notify(1); drop(l3); - assert!(is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); + assert!(is_notified(l1.as_mut())); + assert!(!is_notified(l2.as_mut())); } #[test] diff --git a/tests/queue.rs b/tests/queue.rs deleted file mode 100644 index 1040fb1..0000000 --- a/tests/queue.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! Tests involving the backup queue used under heavy contention. - -use std::future::Future; -use std::pin::Pin; -use std::task::Context; - -use event_listener::{Event, EventListener}; -use waker_fn::waker_fn; - -fn is_notified(listener: &mut EventListener) -> bool { - let waker = waker_fn(|| ()); - Pin::new(listener) - .poll(&mut Context::from_waker(&waker)) - .is_ready() -} - -#[test] -fn insert_and_notify() { - let event = Event::new(); - - // Lock to simulate contention. - let lock = event.__lock_event(); - - let mut l1 = event.listen(); - let mut l2 = event.listen(); - let mut l3 = event.listen(); - - assert!(!is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); - - event.notify(2); - event.notify(1); - - // Unlock to simulate contention being released. - drop(lock); - - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); -} - -#[test] -fn insert_then_contention() { - let event = Event::new(); - - // Allow the listeners to be created without contention. - let mut l1 = event.listen(); - let mut l2 = event.listen(); - let mut l3 = event.listen(); - - assert!(!is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); - - // Lock to simulate contention. - let lock = event.__lock_event(); - - assert!(!is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); - - event.notify(2); - - // Unlock to simulate contention being released. - drop(lock); - - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); -}