Skip to content

Commit

Permalink
feat: Create Listener trait
Browse files Browse the repository at this point in the history
This commit creates the Listener trait and moves most of EventListener's
functionality to that trait.

Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull committed Feb 3, 2024
1 parent 86b7780 commit d9144a8
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 78 deletions.
2 changes: 1 addition & 1 deletion benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::iter;

use criterion::{criterion_group, criterion_main, Criterion};
use event_listener::Event;
use event_listener::{prelude::*, Event};

const COUNT: usize = 8000;

Expand Down
2 changes: 1 addition & 1 deletion examples/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod example {
use std::thread;
use std::time::{Duration, Instant};

use event_listener::Event;
use event_listener::{Event, Listener};

/// A simple mutex.
struct Mutex<T> {
Expand Down
184 changes: 108 additions & 76 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! use std::thread;
//! use std::time::Duration;
//! use std::usize;
//! use event_listener::Event;
//! use event_listener::{Event, prelude::*};
//!
//! let flag = Arc::new(AtomicBool::new(false));
//! let event = Arc::new(Event::new());
Expand Down Expand Up @@ -108,7 +108,7 @@ pub use notify::{IntoNotification, Notification};

/// Useful traits for notifications.
pub mod prelude {
pub use crate::{IntoNotification, Notification};
pub use crate::{IntoNotification, Listener, Notification};
}

/// Inner state of [`Event`].
Expand Down Expand Up @@ -213,7 +213,7 @@ impl<T> Event<T> {
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::<usize>::with_tag();
/// ```
Expand All @@ -230,7 +230,7 @@ impl<T> Event<T> {
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
/// let listener = event.listen();
Expand All @@ -254,7 +254,7 @@ impl<T> Event<T> {
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
/// let listener = event.listen();
Expand Down Expand Up @@ -283,7 +283,7 @@ impl<T> Event<T> {
let inner = ManuallyDrop::new(unsafe { Arc::from_raw(self.inner()) });

// Allocate the listener on the heap and insert it.
let mut listener = Box::pin(Listener {
let mut listener = Box::pin(InnerListener {
event: Arc::clone(&inner),
listener: None,
});
Expand Down Expand Up @@ -322,7 +322,7 @@ impl<T> Event<T> {
/// Use the default notification strategy:
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
///
Expand Down Expand Up @@ -539,7 +539,7 @@ impl Event<()> {
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
/// ```
Expand Down Expand Up @@ -576,7 +576,7 @@ impl Event<()> {
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
/// use std::sync::atomic::{self, Ordering};
///
/// let event = Event::new();
Expand Down Expand Up @@ -628,7 +628,7 @@ impl Event<()> {
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
///
Expand Down Expand Up @@ -678,7 +678,7 @@ impl Event<()> {
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
/// use std::sync::atomic::{self, Ordering};
///
/// let event = Event::new();
Expand Down Expand Up @@ -720,47 +720,17 @@ impl<T> Drop for Event<T> {
}
}

/// A guard waiting for a notification from an [`Event`].
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
/// A handle that is listening to an [`Event`].
///
/// If a notified listener is dropped without receiving a notification, dropping will notify
/// another active listener. Whether one *additional* listener will be notified depends on what
/// kind of notification was delivered.
///
/// The listener is not registered into the linked list inside of the [`Event`] by default if
/// it is created via the `new()` method. It needs to be pinned first before being inserted
/// using the `listen()` method. After the listener has begun `listen`ing, the user can
/// `await` it like a future or call `wait()` to block the current thread until it is notified.
///
/// This structure allocates the listener on the heap.
pub struct EventListener<T = ()> {
listener: Pin<Box<Listener<T, Arc<Inner<T>>>>>,
}

unsafe impl<T: Send> Send for EventListener<T> {}
unsafe impl<T: Send> Sync for EventListener<T> {}

impl<T> core::panic::UnwindSafe for EventListener<T> {}
impl<T> core::panic::RefUnwindSafe for EventListener<T> {}
impl<T> Unpin for EventListener<T> {}

impl<T> fmt::Debug for EventListener<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventListener").finish_non_exhaustive()
}
}

impl<T> EventListener<T> {
/// This trait represents a type waiting for a notification from an [`Event`]. See the
/// [`EventListener`] type for more documentation on this trait's usage.
pub trait Listener<T>: Future<Output = T> + __private::Sealed {
/// Blocks until a notification is received.
///
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
/// let mut listener = event.listen();
Expand All @@ -772,9 +742,7 @@ impl<T> EventListener<T> {
/// listener.wait();
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn wait(mut self) -> T {
self.listener.as_mut().wait_internal(None).unwrap()
}
fn wait(self) -> T;

/// Blocks until a notification is received or a timeout is reached.
///
Expand All @@ -784,7 +752,7 @@ impl<T> EventListener<T> {
///
/// ```
/// use std::time::Duration;
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
/// let mut listener = event.listen();
Expand All @@ -793,11 +761,7 @@ impl<T> EventListener<T> {
/// assert!(listener.wait_timeout(Duration::from_secs(1)).is_none());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn wait_timeout(mut self, timeout: Duration) -> Option<T> {
self.listener
.as_mut()
.wait_internal(Instant::now().checked_add(timeout))
}
fn wait_timeout(self, timeout: Duration) -> Option<T>;

/// Blocks until a notification is received or a deadline is reached.
///
Expand All @@ -807,7 +771,7 @@ impl<T> EventListener<T> {
///
/// ```
/// use std::time::{Duration, Instant};
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
/// let mut listener = event.listen();
Expand All @@ -816,19 +780,17 @@ impl<T> EventListener<T> {
/// assert!(listener.wait_deadline(Instant::now() + Duration::from_secs(1)).is_none());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn wait_deadline(mut self, deadline: Instant) -> Option<T> {
self.listener.as_mut().wait_internal(Some(deadline))
}
fn wait_deadline(self, deadline: Instant) -> Option<T>;

/// Drops this listener and discards its notification (if any) without notifying another
/// active listener.
///
/// Returns `true` if a notification was discarded.
///
/// # Examples
///
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
/// let mut listener1 = event.listen();
Expand All @@ -839,41 +801,103 @@ impl<T> EventListener<T> {
/// assert!(listener1.discard());
/// assert!(!listener2.discard());
/// ```
pub fn discard(mut self) -> bool {
self.listener.as_mut().discard()
}
fn discard(self) -> bool;

/// Returns `true` if this listener listens to the given `Event`.
///
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
/// let listener = event.listen();
///
/// assert!(listener.listens_to(&event));
/// ```
#[inline]
pub fn listens_to(&self, event: &Event<T>) -> bool {
ptr::eq::<Inner<T>>(&*self.listener.event, event.inner.load(Ordering::Acquire))
}
fn listens_to(&self, event: &Event<T>) -> bool;

/// Returns `true` if both listeners listen to the same `Event`.
///
/// # Examples
///
/// ```
/// use event_listener::Event;
/// use event_listener::{Event, prelude::*};
///
/// let event = Event::new();
/// let listener1 = event.listen();
/// let listener2 = event.listen();
///
/// assert!(listener1.same_event(&listener2));
/// ```
pub fn same_event(&self, other: &EventListener<T>) -> bool {
fn same_event(&self, other: &Self) -> bool;
}

/// A guard waiting for a notification from an [`Event`].
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
///
/// If a notified listener is dropped without receiving a notification, dropping will notify
/// another active listener. Whether one *additional* listener will be notified depends on what
/// kind of notification was delivered.
///
/// See the [`Listener`] trait for the functionality exposed by this type.
///
/// The listener is not registered into the linked list inside of the [`Event`] by default if
/// it is created via the `new()` method. It needs to be pinned first before being inserted
/// using the `listen()` method. After the listener has begun `listen`ing, the user can
/// `await` it like a future or call `wait()` to block the current thread until it is notified.
///
/// This structure allocates the listener on the heap.
pub struct EventListener<T = ()> {
listener: Pin<Box<InnerListener<T, Arc<Inner<T>>>>>,
}

unsafe impl<T: Send> Send for EventListener<T> {}
unsafe impl<T: Send> Sync for EventListener<T> {}

impl<T> core::panic::UnwindSafe for EventListener<T> {}
impl<T> core::panic::RefUnwindSafe for EventListener<T> {}
impl<T> Unpin for EventListener<T> {}

impl<T> fmt::Debug for EventListener<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventListener").finish_non_exhaustive()
}
}

impl<T> Listener<T> for EventListener<T> {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait(mut self) -> T {
self.listener.as_mut().wait_internal(None).unwrap()
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_timeout(mut self, timeout: Duration) -> Option<T> {
self.listener
.as_mut()
.wait_internal(Instant::now().checked_add(timeout))
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_deadline(mut self, deadline: Instant) -> Option<T> {
self.listener.as_mut().wait_internal(Some(deadline))
}

fn discard(mut self) -> bool {
self.listener.as_mut().discard()
}

#[inline]
fn listens_to(&self, event: &Event<T>) -> bool {
ptr::eq::<Inner<T>>(&*self.listener.event, event.inner.load(Ordering::Acquire))
}

#[inline]
fn same_event(&self, other: &EventListener<T>) -> bool {
ptr::eq::<Inner<T>>(&*self.listener.event, &*other.listener.event)
}
}
Expand All @@ -889,7 +913,7 @@ impl<T> Future for EventListener<T> {
pin_project_lite::pin_project! {
#[project(!Unpin)]
#[project = ListenerProject]
struct Listener<T, B: Borrow<Inner<T>>>
struct InnerListener<T, B: Borrow<Inner<T>>>
where
B: Unpin,
{
Expand All @@ -904,7 +928,7 @@ pin_project_lite::pin_project! {
listener: Option<sys::Listener<T>>,
}

impl<T, B: Borrow<Inner<T>>> PinnedDrop for Listener<T, B>
impl<T, B: Borrow<Inner<T>>> PinnedDrop for InnerListener<T, B>
where
B: Unpin,
{
Expand All @@ -916,10 +940,10 @@ pin_project_lite::pin_project! {
}
}

unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Send> Send for Listener<T, B> {}
unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Sync> Sync for Listener<T, B> {}
unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Send> Send for InnerListener<T, B> {}
unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Sync> Sync for InnerListener<T, B> {}

impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
impl<T, B: Borrow<Inner<T>> + Unpin> InnerListener<T, B> {
/// Wait until the provided deadline.
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_internal(mut self: Pin<&mut Self>, deadline: Option<Instant>) -> Option<T> {
Expand Down Expand Up @@ -1225,3 +1249,11 @@ fn __test_send_and_sync() {
_assert_send::<EventListener<()>>();
_assert_sync::<EventListener<()>>();
}

#[doc(hidden)]
mod __private {
use super::EventListener;

pub trait Sealed {}
impl<T> Sealed for EventListener<T> {}
}

0 comments on commit d9144a8

Please sign in to comment.