Skip to content

Commit

Permalink
EventListener is now used through pinning
Browse files Browse the repository at this point in the history
The EventListener for the upcoming libstd-based implementation needs to
be pinned, so this commit sets up the infrastructure for the pinned
EventListener.

This is a breaking change.
  • Loading branch information
notgull committed Mar 31, 2023
1 parent c659cf8 commit 996ee4d
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 113 deletions.
4 changes: 2 additions & 2 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ fn bench_events(c: &mut Criterion) {

ev.notify(COUNT);

for handle in handles {
handle.wait();
for mut handle in handles {
handle.as_mut().wait();
}
});
});
Expand Down
8 changes: 4 additions & 4 deletions examples/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ impl<T> Mutex<T> {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
Some(mut l) => {
// Wait until a notification is received.
l.wait();
l.as_mut().wait();
}
}
}
Expand All @@ -88,9 +88,9 @@ impl<T> Mutex<T> {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
Some(mut l) => {
// Wait until a notification is received.
if !l.wait_deadline(deadline) {
if !l.as_mut().wait_deadline(deadline) {
return None;
}
}
Expand Down
177 changes: 100 additions & 77 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@
//! }
//!
//! // Start listening for events.
//! let listener = event.listen();
//! let mut listener = event.listen();
//!
//! // Check the flag again after creating the listener.
//! if flag.load(Ordering::SeqCst) {
//! break;
//! }
//!
//! // Wait for a notification and continue the loop.
//! listener.wait();
//! listener.as_mut().wait();
//! }
//! ```

Expand All @@ -76,6 +76,7 @@ use alloc::sync::Arc;
use core::borrow::Borrow;
use core::fmt;
use core::future::Future;
use core::marker::PhantomPinned;
use core::mem::ManuallyDrop;
use core::pin::Pin;
use core::ptr;
Expand Down Expand Up @@ -173,19 +174,9 @@ impl Event {
/// let listener = event.listen();
/// ```
#[cold]
pub fn listen(&self) -> EventListener {
let inner = self.inner();

// Register the listener.
let mut listener = EventListener(Listener {
event: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
listener: None,
});

listener.0.event.insert(&mut listener.0.listener);

// Make sure the listener is registered before whatever happens next.
full_fence();
pub fn listen(&self) -> Pin<Box<EventListener>> {
let mut listener = Box::pin(EventListener::new(self));
listener.as_mut().listen();
listener
}

Expand Down Expand Up @@ -452,13 +443,31 @@ impl fmt::Debug for EventListener {
}
}

#[cfg(feature = "std")]
impl UnwindSafe for EventListener {}
#[cfg(feature = "std")]
impl RefUnwindSafe for EventListener {}

#[cfg(feature = "std")]
impl EventListener {
/// Create a new `EventListener` that will wait for a notification from the given [`Event`].
pub fn new(event: &Event) -> Self {
let inner = event.inner();

let listener = Listener {
event: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
listener: None,
_pin: PhantomPinned,
};

Self(listener)
}

/// Register this listener into the given [`Event`].
///
/// This method can only be called after the listener has been pinned, and must be called before
/// the listener is polled.
pub fn listen(self: Pin<&mut Self>) {
self.listener().insert();

// Make sure the listener is registered before whatever happens next.
full_fence();
}

/// Blocks until a notification is received.
///
/// # Examples
Expand All @@ -467,16 +476,17 @@ impl EventListener {
/// use event_listener::Event;
///
/// let event = Event::new();
/// let listener = event.listen();
/// let mut listener = event.listen();
///
/// // Notify `listener`.
/// event.notify(1);
///
/// // Receive the notification.
/// listener.wait();
/// listener.as_mut().wait();
/// ```
pub fn wait(self) {
self.0.wait_internal(None);
#[cfg(feature = "std")]
pub fn wait(self: Pin<&mut Self>) {
self.listener().wait_internal(None);
}

/// Blocks until a notification is received or a timeout is reached.
Expand All @@ -490,13 +500,15 @@ impl EventListener {
/// use event_listener::Event;
///
/// let event = Event::new();
/// let listener = event.listen();
/// let mut listener = event.listen();
///
/// // There are no notification so this times out.
/// assert!(!listener.wait_timeout(Duration::from_secs(1)));
/// assert!(!listener.as_mut().wait_timeout(Duration::from_secs(1)));
/// ```
pub fn wait_timeout(self, timeout: Duration) -> bool {
self.0.wait_internal(Instant::now().checked_add(timeout))
#[cfg(feature = "std")]
pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> bool {
self.listener()
.wait_internal(Instant::now().checked_add(timeout))
}

/// Blocks until a notification is received or a deadline is reached.
Expand All @@ -510,39 +522,36 @@ impl EventListener {
/// use event_listener::Event;
///
/// let event = Event::new();
/// let listener = event.listen();
/// let mut listener = event.listen();
///
/// // There are no notification so this times out.
/// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
/// assert!(!listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)));
/// ```
#[cfg(feature = "std")]
pub fn wait_deadline(self, deadline: Instant) -> bool {
self.0.wait_internal(Some(deadline))
pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> bool {
self.listener().wait_internal(Some(deadline))
}
}

impl EventListener {
/// Drops this listener and discards its notification (if any) without notifying another
/// active listener.
///
/// Returns `true` if a notification was discarded. Note that this function may spuriously
/// return `false` even if a notification was received by the listener.
/// Returns `true` if a notification was discarded.
///
/// # Examples
/// ```
/// use event_listener::Event;
///
/// let event = Event::new();
/// let listener1 = event.listen();
/// let listener2 = event.listen();
/// let mut listener1 = event.listen();
/// let mut listener2 = event.listen();
///
/// event.notify(1);
///
/// assert!(listener1.discard());
/// assert!(!listener2.discard());
/// assert!(listener1.as_mut().discard());
/// assert!(!listener2.as_mut().discard());
/// ```
pub fn discard(self) -> bool {
self.0.discard()
pub fn discard(self: Pin<&mut Self>) -> bool {
self.listener().discard()
}

/// Returns `true` if this listener listens to the given `Event`.
Expand Down Expand Up @@ -579,8 +588,8 @@ impl EventListener {
ptr::eq::<Inner>(&**self.inner(), &**other.inner())
}

fn listener(&mut self) -> &mut Listener<Arc<Inner>> {
&mut self.0
fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener<Arc<Inner>>> {
unsafe { self.map_unchecked_mut(|this| &mut this.0) }
}

fn inner(&self) -> &Arc<Inner> {
Expand All @@ -591,7 +600,7 @@ impl EventListener {
impl Future for EventListener {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.listener().poll_internal(cx)
}
}
Expand All @@ -602,15 +611,36 @@ struct Listener<B: Borrow<Inner> + Unpin> {

/// The inner state of the listener.
listener: Option<sys::Listener>,

/// Enforce pinning.
_pin: PhantomPinned,
}

unsafe impl<B: Borrow<Inner> + Unpin + Send> Send for Listener<B> {}
unsafe impl<B: Borrow<Inner> + Unpin + Sync> Sync for Listener<B> {}

impl<B: Borrow<Inner> + Unpin> Listener<B> {
/// Pin-project this listener.
fn project(self: Pin<&mut Self>) -> (&Inner, Pin<&mut Option<sys::Listener>>) {
// SAFETY: `event` is `Unpin`, and `listener`'s pin status is preserved
unsafe {
let Listener {
event, listener, ..
} = self.get_unchecked_mut();

((*event).borrow(), Pin::new_unchecked(listener))
}
}

/// Register this listener with the event.
fn insert(self: Pin<&mut Self>) {
let (inner, listener) = self.project();
inner.insert(listener);
}

/// Wait until the provided deadline.
#[cfg(feature = "std")]
fn wait_internal(self, deadline: Option<Instant>) -> bool {
fn wait_internal(mut self: Pin<&mut Self>, deadline: Option<Instant>) -> bool {
use std::cell::RefCell;

std::thread_local! {
Expand All @@ -619,9 +649,9 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
}

// Try to borrow the thread-local parker/unparker pair.
let mut this = Some(self);
PARKER
.try_with({
let this = self.as_mut();
|parker| {
let mut pair = parker
.try_borrow_mut()
Expand All @@ -631,33 +661,29 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
(parker, Task::Unparker(unparker))
});

this.take()
.unwrap()
.wait_with_parker(deadline, parker, unparker.as_task_ref())
this.wait_with_parker(deadline, parker, unparker.as_task_ref())
}
})
.unwrap_or_else(|_| {
// If the pair isn't accessible, we may be being called in a destructor.
// Just create a new pair.
let (parker, unparker) = parking::pair();
this.take().unwrap().wait_with_parker(
deadline,
&parker,
TaskRef::Unparker(&unparker),
)
self.wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker))
})
}

/// Wait until the provided deadline using the specified parker/unparker pair.
#[cfg(feature = "std")]
fn wait_with_parker(
mut self,
self: Pin<&mut Self>,
deadline: Option<Instant>,
parker: &parking::Parker,
unparker: TaskRef<'_>,
) -> bool {
let (inner, mut listener) = self.project();

// Set the listener's state to `Task`.
match self.event.borrow().register(&mut self.listener, unparker) {
match inner.register(listener.as_mut(), unparker) {
Some(true) => {
// We were already notified, so we don't need to park.
return true;
Expand All @@ -683,21 +709,17 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
let now = Instant::now();
if now >= deadline {
// Remove our entry and check if we were notified.
return self
.event
.borrow()
.remove(&mut self.listener, false)
return inner
.remove(listener, false)
.expect("We never removed ourself from the list")
.is_notified();
}
}
}

// See if we were notified.
if self
.event
.borrow()
.register(&mut self.listener, unparker)
if inner
.register(listener.as_mut(), unparker)
.expect("We never removed ourself from the list")
{
return true;
Expand All @@ -707,21 +729,20 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {

/// Drops this listener and discards its notification (if any) without notifying another
/// active listener.
fn discard(mut self) -> bool {
self.event
.borrow()
.remove(&mut self.listener, false)
fn discard(self: Pin<&mut Self>) -> bool {
let (inner, listener) = self.project();

inner
.remove(listener, false)
.map_or(false, |state| state.is_notified())
}

/// Poll this listener for a notification.
fn poll_internal(&mut self, cx: &mut Context<'_>) -> Poll<()> {
fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let (inner, mut listener) = self.project();

// Try to register the listener.
match self
.event
.borrow()
.register(&mut self.listener, TaskRef::Waker(cx.waker()))
{
match inner.register(listener.as_mut(), TaskRef::Waker(cx.waker())) {
Some(true) => {
// We were already notified, so we don't need to park.
Poll::Ready(())
Expand All @@ -743,7 +764,9 @@ impl<B: Borrow<Inner> + Unpin> Listener<B> {
impl<B: Borrow<Inner> + Unpin> Drop for Listener<B> {
fn drop(&mut self) {
// If we're being dropped, we need to remove ourself from the list.
self.event.borrow().remove(&mut self.listener, true);
let (inner, listener) = unsafe { Pin::new_unchecked(self).project() };

inner.remove(listener, true);
}
}

Expand Down

0 comments on commit 996ee4d

Please sign in to comment.