Skip to content

Commit

Permalink
Trim unnecessary abstractions
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull committed Mar 31, 2023
1 parent be047ad commit 1a4df18
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 142 deletions.
67 changes: 4 additions & 63 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
#[cfg(feature = "std")]
use std::time::{Duration, Instant};

use node::Node;

#[cfg(feature = "std")]
use parking::Unparker;

Expand Down Expand Up @@ -162,26 +160,6 @@ impl TaskRef<'_> {
}
}

/// Details of a notification.
#[derive(Copy, Clone)]
struct Notify {
/// The number of listeners to notify.
count: usize,

/// The notification strategy.
kind: NotifyKind,
}

/// The strategy for notifying listeners.
#[derive(Copy, Clone)]
enum NotifyKind {
/// Notify non-notified listeners.
Notify,

/// Notify all listeners.
NotifyAdditional,
}

/// Inner state of [`Event`].
struct Inner {
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
Expand Down Expand Up @@ -209,15 +187,6 @@ impl Inner {
guard: Some(guard),
})
}

/// Push a pending operation to the queue.
#[cold]
pub(crate) fn push(&self, node: Node) {
self.list.queue.push(node);

// Acquire and drop the lock to make sure that the queue is flushed.
let _guard = self.lock();
}
}

/// A synchronization primitive for notifying async tasks and threads.
Expand Down Expand Up @@ -337,14 +306,7 @@ impl Event {
// Notify if there is at least one unnotified listener and the number of notified
// listeners is less than `n`.
if inner.notified.load(Ordering::Acquire) < n {
if let Some(mut lock) = inner.lock() {
lock.notify_unnotified(n);
} else {
inner.push(Node::Notify(Notify {
count: n,
kind: NotifyKind::Notify,
}));
}
inner.notify(n, false);
}
}
}
Expand Down Expand Up @@ -388,14 +350,7 @@ impl Event {
// Notify if there is at least one unnotified listener and the number of notified
// listeners is less than `n`.
if inner.notified.load(Ordering::Acquire) < n {
if let Some(mut lock) = inner.lock() {
lock.notify_unnotified(n);
} else {
inner.push(Node::Notify(Notify {
count: n,
kind: NotifyKind::Notify,
}));
}
inner.notify(n, false);
}
}
}
Expand Down Expand Up @@ -438,14 +393,7 @@ impl Event {
if let Some(inner) = self.try_inner() {
// Notify if there is at least one unnotified listener.
if inner.notified.load(Ordering::Acquire) < usize::MAX {
if let Some(mut lock) = inner.lock() {
lock.notify_additional(n);
} else {
inner.push(Node::Notify(Notify {
count: n,
kind: NotifyKind::NotifyAdditional,
}));
}
inner.notify(n, true);
}
}
}
Expand Down Expand Up @@ -490,14 +438,7 @@ impl Event {
if let Some(inner) = self.try_inner() {
// Notify if there is at least one unnotified listener.
if inner.notified.load(Ordering::Acquire) < usize::MAX {
if let Some(mut lock) = inner.lock() {
lock.notify_additional(n);
} else {
inner.push(Node::Notify(Notify {
count: n,
kind: NotifyKind::NotifyAdditional,
}));
}
inner.notify(n, true);
}
}
}
Expand Down
116 changes: 44 additions & 72 deletions src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,27 @@ impl crate::Inner {
state
}

/// Notifies a number of entries.
#[cold]
pub(crate) fn notify(&self, n: usize, additional: bool) {
match self.lock() {
Some(mut guard) => {
// Notify the listeners.
guard.notify(n, additional);
}

None => {
// Push it to the queue.
let node = Node::Notify {
count: n,
additional,
};

self.list.queue.push(node);
}
}
}

/// Register a task to be notified when the event is triggered.
///
/// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
Expand Down Expand Up @@ -247,7 +268,11 @@ impl ListenerSlab {
// Create a Slab with a permanent entry occupying index 0, so that
// it is never used (and we can therefore use 0 as a sentinel value).
let mut entries = Slab::new();
entries.insert(Entry::new());
entries.insert(Entry {
state: Cell::new(State::Created),
next: Cell::new(None),
prev: Cell::new(None),
});

Self {
entries,
Expand Down Expand Up @@ -339,64 +364,36 @@ impl ListenerSlab {
Some(state)
}

/// Notifies a number of entries, either normally or as an additional notification.
/// Notifies a number of listeners.
#[cold]
pub(crate) fn notify(&mut self, count: usize, additional: bool) {
if additional {
self.notify_additional(count);
} else {
self.notify_unnotified(count);
}
}

/// Notifies a number of entries.
#[cold]
pub(crate) fn notify_unnotified(&mut self, mut n: usize) {
if n <= self.notified {
return;
}
n -= self.notified;

while n > 0 {
n -= 1;

// Notify the first unnotified entry.
match self.start {
None => break,
Some(e) => {
// Get the entry and move the pointer forward.
let e = &self.entries[e.get()];
self.start = e.next.get();

// Set the state of this entry to `Notified` and notify.
let was_notified = e.notify(false);

// Update the counter.
self.notified += was_notified as usize;
}
pub(crate) fn notify(&mut self, mut n: usize, additional: bool) {
if !additional {
// Make sure we're not notifying more than we have.
if n <= self.notified {
return;
}
n -= self.notified;
}
}

/// Notifies a number of additional entries.
#[cold]
pub(crate) fn notify_additional(&mut self, mut n: usize) {
while n > 0 {
n -= 1;

// Notify the first unnotified entry.
// Notify the next entry.
match self.start {
None => break,

Some(e) => {
// Get the entry and move the pointer forward.
let e = &self.entries[e.get()];
self.start = e.next.get();
// Get the entry and move the pointer forwards.
let entry = &self.entries[e.get()];
self.start = entry.next.get();

// Set the state of this entry to `Notified` and notify.
let was_notified = e.notify(true);
// Set the state to `Notified` and notify.
if let State::Task(task) = entry.state.replace(State::Notified(additional)) {
task.wake();
}

// Update the counter.
self.notified += was_notified as usize;
// Bump the notified count.
self.notified += 1;
}
}
}
Expand Down Expand Up @@ -442,31 +439,6 @@ impl ListenerSlab {
}
}

impl Entry {
/// Create a new, empty entry.
pub(crate) fn new() -> Self {
Self {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(None),
}
}

/// Indicate that this entry has been notified.
#[cold]
pub(crate) fn notify(&self, additional: bool) -> bool {
match self.state.replace(State::Notified(additional)) {
State::Notified(_) => {}
State::Created => {}
State::NotifiedTaken => {}
State::Task(w) => w.wake(),
}

// Return whether the notification would have had any effect.
true
}
}

pub(crate) enum Listener {
/// The listener has a node inside of the linked list.
HasNode(NonZeroUsize),
Expand Down
17 changes: 10 additions & 7 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::list::{Listener, ListenerSlab};
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::Arc;
use crate::{Notify, NotifyKind, State, Task};
use crate::{State, Task};

use core::num::NonZeroUsize;
use crossbeam_utils::atomic::AtomicCell;
Expand All @@ -19,7 +19,13 @@ pub(crate) enum Node {
},

/// This node is notifying a listener.
Notify(Notify),
Notify {
/// The number of listeners to notify.
count: usize,

/// Whether to wake up notified listeners.
additional: bool,
},

/// This node is removing a listener.
RemoveListener {
Expand Down Expand Up @@ -71,12 +77,9 @@ impl Node {
task_waiting.entry_id.store(key.get(), Ordering::Release);
return task_waiting.task.take();
}
Node::Notify(Notify { count, kind }) => {
Node::Notify { count, additional } => {
// Notify the listener.
match kind {
NotifyKind::Notify => list.notify_unnotified(count),
NotifyKind::NotifyAdditional => list.notify_additional(count),
}
list.notify(count, additional);
}
Node::RemoveListener {
listener,
Expand Down

0 comments on commit 1a4df18

Please sign in to comment.