Skip to content

Commit

Permalink
feat: Add stack-based listener
Browse files Browse the repository at this point in the history
It is instantiated with the listener!() macro.

Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull committed Feb 3, 2024
1 parent d9144a8 commit 68be528
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 66 deletions.
64 changes: 28 additions & 36 deletions examples/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod example {
use std::thread;
use std::time::{Duration, Instant};

use event_listener::{Event, Listener};
use event_listener::{listener, Event, Listener};

/// A simple mutex.
struct Mutex<T> {
Expand Down Expand Up @@ -51,74 +51,66 @@ mod example {

/// Blocks until a lock is acquired.
fn lock(&self) -> MutexGuard<'_, T> {
let mut listener = None;

loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}

// Set up an event listener or wait for a notification.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.wait();
}
// Set up an event listener.
listener!(self.lock_ops => listener);

// Try again.
if let Some(guard) = self.try_lock() {
return guard;
}

// Wait for a notification.
listener.wait();
}
}

/// Blocks until a lock is acquired or the timeout is reached.
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
let deadline = Instant::now() + timeout;
let mut listener = None;

loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return Some(guard);
}

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.wait_deadline(deadline)?;
}
// Set up an event listener.
listener!(self.lock_ops => listener);

// Try again.
if let Some(guard) = self.try_lock() {
return Some(guard);
}

// Wait until a notification is received.
listener.wait_deadline(deadline)?;
}
}

/// Acquires a lock asynchronously.
async fn lock_async(&self) -> MutexGuard<'_, T> {
let mut listener = None;

loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.await;
}
// Set up an event listener.
listener!(self.lock_ops => listener);

// Try again.
if let Some(guard) = self.try_lock() {
return guard;
}

// Wait until a notification is received.
listener.await;
}
}
}
Expand Down
189 changes: 159 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl<T> Event<T> {
event: Arc::clone(&inner),
listener: None,
});
inner.insert(listener.as_mut().project().listener);
listener.as_mut().listen();

// Return the listener.
EventListener { listener }
Expand Down Expand Up @@ -724,7 +724,7 @@ impl<T> Drop for Event<T> {
///
/// This trait represents a type waiting for a notification from an [`Event`]. See the
/// [`EventListener`] type for more documentation on this trait's usage.
pub trait Listener<T>: Future<Output = T> + __private::Sealed {
pub trait Listener<T>: Future<Output = T> + __sealed::Sealed {
/// Blocks until a notification is received.
///
/// # Examples
Expand Down Expand Up @@ -833,6 +833,45 @@ pub trait Listener<T>: Future<Output = T> + __private::Sealed {
fn same_event(&self, other: &Self) -> bool;
}

macro_rules! forward_impl_to_listener {
($gen:ident => $ty:ty) => {
impl<$gen> crate::Listener<$gen> for $ty {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait(mut self) -> $gen {
self.listener_mut().wait_internal(None).unwrap()
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_timeout(mut self, timeout: std::time::Duration) -> Option<$gen> {
self.listener_mut()
.wait_internal(std::time::Instant::now().checked_add(timeout))
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_deadline(mut self, deadline: std::time::Instant) -> Option<$gen> {
self.listener_mut().wait_internal(Some(deadline))
}

fn discard(mut self) -> bool {
self.listener_mut().discard()
}

#[inline]
fn listens_to(&self, event: &Event<$gen>) -> bool {
core::ptr::eq::<Inner<$gen>>(
&*self.listener().event,
event.inner.load(core::sync::atomic::Ordering::Acquire),
)
}

#[inline]
fn same_event(&self, other: &$ty) -> bool {
core::ptr::eq::<Inner<$gen>>(&*self.listener().event, &*other.listener().event)
}
}
};
}

/// A guard waiting for a notification from an [`Event`].
///
/// There are two ways for a listener to wait for a notification:
Expand Down Expand Up @@ -869,39 +908,20 @@ impl<T> fmt::Debug for EventListener<T> {
}
}

impl<T> Listener<T> for EventListener<T> {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait(mut self) -> T {
self.listener.as_mut().wait_internal(None).unwrap()
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_timeout(mut self, timeout: Duration) -> Option<T> {
self.listener
.as_mut()
.wait_internal(Instant::now().checked_add(timeout))
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_deadline(mut self, deadline: Instant) -> Option<T> {
self.listener.as_mut().wait_internal(Some(deadline))
}

fn discard(mut self) -> bool {
self.listener.as_mut().discard()
}

impl<T> EventListener<T> {
#[inline]
fn listens_to(&self, event: &Event<T>) -> bool {
ptr::eq::<Inner<T>>(&*self.listener.event, event.inner.load(Ordering::Acquire))
fn listener(&self) -> &InnerListener<T, Arc<Inner<T>>> {
&self.listener
}

#[inline]
fn same_event(&self, other: &EventListener<T>) -> bool {
ptr::eq::<Inner<T>>(&*self.listener.event, &*other.listener.event)
fn listener_mut(&mut self) -> Pin<&mut InnerListener<T, Arc<Inner<T>>>> {
self.listener.as_mut()
}
}

forward_impl_to_listener! { T => EventListener<T> }

impl<T> Future for EventListener<T> {
type Output = T;

Expand All @@ -910,6 +930,18 @@ impl<T> Future for EventListener<T> {
}
}

/// Create a stack-based event listener for an [`Event`].
#[macro_export]
macro_rules! listener {
($event:expr => $listener:ident) => {
let mut $listener = $crate::__private::StackSlot::new(&$event);
// SAFETY: We shadow $listener so it can't be moved after.
let mut $listener = unsafe { $crate::__private::Pin::new_unchecked(&mut $listener) };
#[allow(unused_mut)]
let mut $listener = $listener.listen();
};
}

pin_project_lite::pin_project! {
#[project(!Unpin)]
#[project = ListenerProject]
Expand Down Expand Up @@ -944,6 +976,13 @@ unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Send> Send for InnerListener<
unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Sync> Sync for InnerListener<T, B> {}

impl<T, B: Borrow<Inner<T>> + Unpin> InnerListener<T, B> {
/// Insert this listener into the linked list.
#[inline]
fn listen(self: Pin<&mut Self>) {
let this = self.project();
(*this.event).borrow().insert(this.listener);
}

/// Wait until the provided deadline.
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_internal(mut self: Pin<&mut Self>, deadline: Option<Instant>) -> Option<T> {
Expand Down Expand Up @@ -1251,9 +1290,99 @@ fn __test_send_and_sync() {
}

#[doc(hidden)]
mod __private {
use super::EventListener;
mod __sealed {
use super::{EventListener, __private::StackListener};

pub trait Sealed {}
impl<T> Sealed for EventListener<T> {}
impl<T> Sealed for StackListener<'_, '_, T> {}
}

/// Semver exempt module.
#[doc(hidden)]
pub mod __private {
pub use core::pin::Pin;

use super::{Event, Inner, InnerListener};
use core::fmt;
use core::future::Future;
use core::task::{Context, Poll};

pin_project_lite::pin_project! {
/// Space on the stack where a stack-based listener can be allocated.
#[doc(hidden)]
pub struct StackSlot<'ev, T> {
#[pin]
listener: InnerListener<T, &'ev Inner<T>>
}
}

impl<T> fmt::Debug for StackSlot<'_, T> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StackSlot").finish_non_exhaustive()
}
}

impl<'ev, T> StackSlot<'ev, T> {
/// Create a new `StackSlot` on the stack.
#[inline]
#[doc(hidden)]
pub fn new(event: &'ev Event<T>) -> Self {
let inner = unsafe { &*event.inner() };
Self {
listener: InnerListener {
event: inner,
listener: None,
},
}
}

/// Start listening on this `StackSlot`.
#[inline]
#[doc(hidden)]
pub fn listen(mut self: Pin<&mut Self>) -> StackListener<'ev, '_, T> {
// Insert ourselves into the list.
self.as_mut().project().listener.listen();

// We are now listening.
StackListener { slot: self }
}
}

/// A stack-based `EventListener`.
#[doc(hidden)]
pub struct StackListener<'ev, 'stack, T> {
slot: Pin<&'stack mut StackSlot<'ev, T>>,
}

impl<T> fmt::Debug for StackListener<'_, '_, T> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StackListener").finish_non_exhaustive()
}
}

impl<'ev, T> StackListener<'ev, '_, T> {
#[inline]
fn listener(&self) -> &InnerListener<T, &'ev Inner<T>> {
&self.slot.listener
}

#[inline]
fn listener_mut(&mut self) -> Pin<&mut InnerListener<T, &'ev Inner<T>>> {
self.slot.as_mut().project().listener
}
}

forward_impl_to_listener! { T => StackListener<'_, '_, T> }

impl<T> Future for StackListener<'_, '_, T> {
type Output = T;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.listener_mut().poll_internal(cx)
}
}
}

0 comments on commit 68be528

Please sign in to comment.