diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index daab6d8..f91813d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,6 +58,7 @@ jobs: run: cargo hack build --all --no-dev-deps - run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features portable-atomic + - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features critical-section #- name: Install wasm-pack # uses: taiki-e/install-action@wasm-pack #- run: wasm-pack test --node @@ -93,6 +94,7 @@ jobs: uses: taiki-e/install-action@cargo-hack - run: cargo hack build --all --rust-version - run: cargo hack build --all --no-default-features --rust-version + - run: cargo hack build --all --no-default-features --rust-version --features critical-section clippy: runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index b5ad282..082aa37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } [dependencies] concurrent-queue = { version = "2.4.0", default-features = false } +critical-section = { version = "1.2.0", default-features = false, optional = true } pin-project-lite = "0.2.12" portable-atomic-util = { version = "0.2.0", default-features = false, optional = true, features = ["alloc"] } @@ -45,6 +46,7 @@ default-features = false optional = true [dev-dependencies] +critical-section = { version = "1.2.0", features = ["std"] } futures-lite = "2.0.0" try-lock = "0.2.5" waker-fn = "1" diff --git a/examples/mutex.rs b/examples/mutex.rs index 873c8bb..59b03f7 100644 --- a/examples/mutex.rs +++ b/examples/mutex.rs @@ -2,7 +2,7 @@ //! //! This mutex exposes both blocking and async methods for acquiring a lock. -#[cfg(not(target_family = "wasm"))] +#[cfg(all(feature = "std", not(target_family = "wasm")))] mod example { #![allow(dead_code)] @@ -171,7 +171,7 @@ mod example { } } -#[cfg(target_family = "wasm")] +#[cfg(any(target_family = "wasm", not(feature = "std")))] mod example { pub(super) fn entry() { println!("This example is not supported on wasm yet."); diff --git a/src/std.rs b/src/intrusive.rs similarity index 57% rename from src/std.rs rename to src/intrusive.rs index c509ebc..69f460b 100644 --- a/src/std.rs +++ b/src/intrusive.rs @@ -1,20 +1,31 @@ -//! libstd-based implementation of `event-listener`. +//! Intrusive linked list-based implementation of `event-listener`. //! -//! This implementation crates an intrusive linked list of listeners. +//! This implementation crates an intrusive linked list of listeners. This list +//! is secured using either a libstd mutex or a critical section. use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::Ordering; use crate::sync::cell::{Cell, UnsafeCell}; -use crate::sync::{Mutex, MutexGuard}; use crate::{RegisterResult, State, TaskRef}; +#[cfg(feature = "critical-section")] +use core::cell::RefCell; +#[cfg(all(feature = "std", not(feature = "critical-section")))] +use core::ops::{Deref, DerefMut}; + use core::marker::PhantomPinned; use core::mem; -use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::ptr::NonNull; -pub(super) struct List(Mutex>); +pub(super) struct List( + /// libstd-based implementation uses a normal Muetx to secure the data. + #[cfg(all(feature = "std", not(feature = "critical-section")))] + crate::sync::Mutex>, + /// Critical-section-based implementation uses a CS cell that wraps a RefCell. + #[cfg(feature = "critical-section")] + critical_section::Mutex>>, +); struct Inner { /// The head of the linked list. @@ -36,67 +47,141 @@ struct Inner { impl List { /// Create a new, empty event listener list. pub(super) fn new() -> Self { - Self(Mutex::new(Inner { + let inner = Inner { head: None, tail: None, next: None, len: 0, notified: 0, - })) + }; + + #[cfg(feature = "critical-section")] + { + Self(critical_section::Mutex::new(RefCell::new(inner))) + } + + #[cfg(not(feature = "critical-section"))] + Self(crate::sync::Mutex::new(inner)) } /// Get the total number of listeners without blocking. + #[cfg(all(feature = "std", not(feature = "critical-section")))] pub(crate) fn try_total_listeners(&self) -> Option { self.0.try_lock().ok().map(|list| list.len) } + /// Get the total number of listeners without blocking. + #[cfg(feature = "critical-section")] + pub(crate) fn try_total_listeners(&self) -> Option { + Some(self.total_listeners()) + } + /// Get the total number of listeners with blocking. + #[cfg(all(feature = "std", not(feature = "critical-section")))] pub(crate) fn total_listeners(&self) -> usize { self.0.lock().unwrap_or_else(|e| e.into_inner()).len } + + /// Get the total number of listeners with blocking. + #[cfg(feature = "critical-section")] + #[allow(unused)] + pub(crate) fn total_listeners(&self) -> usize { + critical_section::with(|cs| self.0.borrow(cs).borrow().len) + } } impl crate::Inner { - fn lock(&self) -> ListLock<'_, '_, T> { - ListLock { + #[cfg(all(feature = "std", not(feature = "critical-section")))] + fn with_inner(&self, f: impl FnOnce(&mut Inner) -> R) -> R { + struct ListLock<'a, 'b, T> { + lock: crate::sync::MutexGuard<'a, Inner>, + inner: &'b crate::Inner, + } + + impl Deref for ListLock<'_, '_, T> { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + &self.lock + } + } + + impl DerefMut for ListLock<'_, '_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.lock + } + } + + impl Drop for ListLock<'_, '_, T> { + fn drop(&mut self) { + update_notified(&self.inner.notified, &self.lock); + } + } + + let mut list = ListLock { inner: self, lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()), - } + }; + f(&mut list) } - /// Add a new listener to the list. - pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { - let mut inner = self.lock(); - - listener.as_mut().set(Some(Listener { - link: UnsafeCell::new(Link { - state: Cell::new(State::Created), - prev: Cell::new(inner.tail), - next: Cell::new(None), - }), - _pin: PhantomPinned, - })); - let listener = listener.as_pin_mut().unwrap(); + #[cfg(feature = "critical-section")] + fn with_inner(&self, f: impl FnOnce(&mut Inner) -> R) -> R { + struct ListWrapper<'a, T> { + inner: &'a crate::Inner, + list: &'a mut Inner, + } - { - let entry_guard = listener.link.get(); - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { entry_guard.deref() }; + impl Drop for ListWrapper<'_, T> { + fn drop(&mut self) { + update_notified(&self.inner.notified, self.list); + } + } - // Replace the tail with the new entry. - match mem::replace(&mut inner.tail, Some(entry.into())) { - None => inner.head = Some(entry.into()), - Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, + critical_section::with(move |cs| { + let mut list = self.list.0.borrow_ref_mut(cs); + let wrapper = ListWrapper { + inner: self, + list: &mut *list, }; - } - // If there are no unnotified entries, this is the first one. - if inner.next.is_none() { - inner.next = inner.tail; - } + f(wrapper.list) + }) + } + + /// Add a new listener to the list. + pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { + self.with_inner(|inner| { + listener.as_mut().set(Some(Listener { + link: UnsafeCell::new(Link { + state: Cell::new(State::Created), + prev: Cell::new(inner.tail), + next: Cell::new(None), + }), + _pin: PhantomPinned, + })); + let listener = listener.as_pin_mut().unwrap(); + + { + let entry_guard = listener.link.get(); + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; + + // Replace the tail with the new entry. + match mem::replace(&mut inner.tail, Some(entry.into())) { + None => inner.head = Some(entry.into()), + Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, + }; + } + + // If there are no unnotified entries, this is the first one. + if inner.next.is_none() { + inner.next = inner.tail; + } - // Bump the entry count. - inner.len += 1; + // Bump the entry count. + inner.len += 1; + }); } /// Remove a listener from the list. @@ -105,13 +190,13 @@ impl crate::Inner { listener: Pin<&mut Option>>, propagate: bool, ) -> Option> { - self.lock().remove(listener, propagate) + self.with_inner(|inner| inner.remove(listener, propagate)) } /// Notifies a number of entries. #[cold] pub(crate) fn notify(&self, notify: impl Notification) -> usize { - self.lock().notify(notify) + self.with_inner(|inner| inner.notify(notify)) } /// Register a task to be notified when the event is triggered. @@ -123,41 +208,42 @@ impl crate::Inner { mut listener: Pin<&mut Option>>, task: TaskRef<'_>, ) -> RegisterResult { - let mut inner = self.lock(); - let entry_guard = match listener.as_mut().as_pin_mut() { - Some(listener) => listener.link.get(), - None => return RegisterResult::NeverInserted, - }; - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { entry_guard.deref() }; - - // Take out the state and check it. - match entry.state.replace(State::NotifiedTaken) { - State::Notified { tag, .. } => { - // We have been notified, remove the listener. - inner.remove(listener, false); - RegisterResult::Notified(tag) - } + self.with_inner(|inner| { + let entry_guard = match listener.as_mut().as_pin_mut() { + Some(listener) => listener.link.get(), + None => return RegisterResult::NeverInserted, + }; + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; - State::Task(other_task) => { - // Only replace the task if it's different. - entry.state.set(State::Task({ - if !task.will_wake(other_task.as_task_ref()) { - task.into_task() - } else { - other_task - } - })); + // Take out the state and check it. + match entry.state.replace(State::NotifiedTaken) { + State::Notified { tag, .. } => { + // We have been notified, remove the listener. + inner.remove(listener, false); + RegisterResult::Notified(tag) + } - RegisterResult::Registered - } + State::Task(other_task) => { + // Only replace the task if it's different. + entry.state.set(State::Task({ + if !task.will_wake(other_task.as_task_ref()) { + task.into_task() + } else { + other_task + } + })); + + RegisterResult::Registered + } - _ => { - // We have not been notified, register the task. - entry.state.set(State::Task(task.into_task())); - RegisterResult::Registered + _ => { + // We have not been notified, register the task. + entry.state.set(State::Task(task.into_task())); + RegisterResult::Registered + } } - } + }) } } @@ -274,38 +360,15 @@ impl Inner { } } -struct ListLock<'a, 'b, T> { - lock: MutexGuard<'a, Inner>, - inner: &'b crate::Inner, -} - -impl Deref for ListLock<'_, '_, T> { - type Target = Inner; - - fn deref(&self) -> &Self::Target { - &self.lock - } -} - -impl DerefMut for ListLock<'_, '_, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.lock - } -} - -impl Drop for ListLock<'_, '_, T> { - fn drop(&mut self) { - let list = &mut **self; +fn update_notified(slot: &crate::sync::atomic::AtomicUsize, list: &Inner) { + // Update the notified count. + let notified = if list.notified < list.len { + list.notified + } else { + usize::MAX + }; - // Update the notified count. - let notified = if list.notified < list.len { - list.notified - } else { - usize::MAX - }; - - self.inner.notified.store(notified, Ordering::Release); - } + slot.store(notified, Ordering::Release); } pub(crate) struct Listener { @@ -358,15 +421,15 @@ mod tests { inner.insert(listen2.as_mut()); inner.insert(listen3.as_mut()); - assert_eq!(inner.lock().len, 3); + assert_eq!(inner.list.try_total_listeners(), Some(3)); // Remove one. assert_eq!(inner.remove(listen2, false), Some(State::Created)); - assert_eq!(inner.lock().len, 2); + assert_eq!(inner.list.try_total_listeners(), Some(2)); // Remove another. assert_eq!(inner.remove(listen1, false), Some(State::Created)); - assert_eq!(inner.lock().len, 1); + assert_eq!(inner.list.try_total_listeners(), Some(1)); } #[test] diff --git a/src/lib.rs b/src/lib.rs index 7b59e99..d6a8e44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,11 +63,15 @@ //! # Features //! //! - The `std` feature (enabled by default) enables the use of the Rust standard library. Disable it for `no_std` -//! support +//! support. +//! +//! - The `critical-section` feature enables usage of the [`critical-section`] crate to enable a +//! more efficient implementation of `event-listener` for `no_std` platforms. //! //! - The `portable-atomic` feature enables the use of the [`portable-atomic`] crate to provide //! atomic operations on platforms that don't support them. //! +//! [`critical-section`]: https://crates.io/crates/critical-section //! [`portable-atomic`]: https://crates.io/crates/portable-atomic #![cfg_attr(not(feature = "std"), no_std)] @@ -85,8 +89,14 @@ extern crate alloc; #[cfg(feature = "std")] extern crate std as alloc; -#[cfg_attr(feature = "std", path = "std.rs")] -#[cfg_attr(not(feature = "std"), path = "no_std.rs")] +#[cfg_attr( + any(feature = "std", feature = "critical-section"), + path = "intrusive.rs" +)] +#[cfg_attr( + not(any(feature = "std", feature = "critical-section")), + path = "slab.rs" +)] mod sys; mod notify; @@ -1359,7 +1369,8 @@ mod sync { #[cfg(feature = "portable-atomic")] pub(super) use portable_atomic_util::Arc; - #[cfg(all(feature = "std", not(loom)))] + #[allow(unused)] + #[cfg(all(feature = "std", not(feature = "critical-section"), not(loom)))] pub(super) use std::sync::{Mutex, MutexGuard}; #[cfg(all(feature = "std", not(target_family = "wasm"), not(loom)))] pub(super) use std::thread_local; diff --git a/src/no_std.rs b/src/slab.rs similarity index 99% rename from src/no_std.rs rename to src/slab.rs index a6a2f25..59e1c21 100644 --- a/src/no_std.rs +++ b/src/slab.rs @@ -9,7 +9,7 @@ //! atomic queue if the lock is contended. Benchmarks show that this is about 20% slower than the std //! implementation, but still much faster than using a queue. -#[path = "no_std/node.rs"] +#[path = "slab/node.rs"] mod node; use node::{Node, NothingProducer, TaskWaiting}; diff --git a/src/no_std/node.rs b/src/slab/node.rs similarity index 100% rename from src/no_std/node.rs rename to src/slab/node.rs