Skip to content

Commit

Permalink
simplify & optimize semaphore, prepare v0.2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Giersch committed Mar 12, 2024
1 parent 8683166 commit 63a96b3
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 92 deletions.
12 changes: 12 additions & 0 deletions BENCHMARK.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,16 @@ test uncontented_unbounded_sync ... bench: 280,363 ns/iter (+/- 3,565)
test uncontented_unbounded_unsync ... bench: 29,767 ns/iter (+/- 205)
test result: ok. 0 passed; 0 failed; 0 ignored; 4 measured; 0 filtered out; finished in 3.94s
```

Version `0.2.3`:

```
running 4 tests
test uncontented_bounded_sync ... bench: 323,460 ns/iter (+/- 19,827)
test uncontented_bounded_unsync ... bench: 54,519 ns/iter (+/- 2,647)
test uncontented_unbounded_sync ... bench: 204,268 ns/iter (+/- 17,505)
test uncontented_unbounded_unsync ... bench: 17,078 ns/iter (+/- 874)
test result: ok. 0 passed; 0 failed; 0 ignored; 4 measured; 0 filtered out; finished in 16.17s
```
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "async-unsync"
description = "asynchronous, unsynchronized (thread-local) channels and semaphores"
version = "0.2.2"
version = "0.2.3"
authors = ["Oliver Giersch"]
license = "MIT/Apache-2.0"
repository = "https://github.com/oliver-giersch/async-unsync.git"
Expand Down
2 changes: 1 addition & 1 deletion src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ mod tests {

use futures_lite::future;

use crate::{alloc::boxed::Box, queue::RecvFuture};
use crate::queue::RecvFuture;

#[test]
fn recv_split() {
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#[cfg(feature = "std")]
mod alloc {
pub use std::boxed;
pub use std::collections;
pub use std::rc;
}
Expand Down
148 changes: 59 additions & 89 deletions src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ impl Semaphore {
waiter: Waiter {
wants,
waker: LateInitWaker::new(),
waiting: Cell::new(false),
state: Cell::new(WaiterState::Inert),
permits: Cell::new(0),
next: Cell::new(ptr::null()),
_pin: PhantomPinned,
prev: Cell::new(ptr::null()),
_marker: PhantomPinned,
},
}
}
Expand Down Expand Up @@ -262,7 +263,7 @@ impl<'a> Future for Acquire<'a> {
// unconditionally setting waiting to false here avoids having
// to traverse the waiter queue again when the future is
// dropped.
waiter.waiting.set(false);
waiter.state.set(WaiterState::Woken);
match res {
Ok(_) => {
let shared = self.as_ref().shared;
Expand All @@ -283,11 +284,11 @@ impl Drop for Acquire<'_> {
let shared = unsafe { &mut (*self.shared.get()) };

// remove the queued waker, if it was already enqueued
if self.waiter.waiting.get() {
if let WaiterState::Waiting = self.waiter.state.get() {
// check, if there exists some entry in queue of waiters with the
// same ID as this future
// SAFETY: non-live waiters di not exist in queue, no aliased access
// possible
// SAFETY: non-live waiters did not exist in queue, no aliased
// access possible
unsafe { shared.waiters.try_remove(&self.waiter) };
}

Expand Down Expand Up @@ -359,6 +360,7 @@ impl Shared {
// are enqueued and all waiters remain valid while they are
// enqueued.
unsafe {
waiter.state.set(WaiterState::Woken);
waiter.waker.get().wake_by_ref();
// ...finally, dequeue the notified waker
self.waiters.pop_front(waiter);
Expand Down Expand Up @@ -411,54 +413,27 @@ impl Shared {
waiter: Pin<&Waiter>,
cx: &mut Context<'_>,
) -> Poll<Result<(), AcquireError>> {
// on first poll, check if there are enough permits to resolve
// immediately or enqueue a waiter ticket to be notified (i.e. polled
// again) later
if !waiter.waiting.get() {
return self.poll_acquire_initial(waiter, cx);
}

/*
* on any subsequent poll (which may be spurious!), the following
* conditions must be checked:
*
* 1. has the semaphore been closed in the meantime?
* 2. is a waiter for this request still enqueued? (this will occur
* frequently with spurious polls)
*/

if self.is_closed() {
if self.closed {
// a waiter *may* or *may not* be in the queue, but `Acquire::drop`
// will take care of this eventually
return Poll::Ready(Err(AcquireError(())));
}

/*
* check, if the waiter is still enqueued in the `waiters` queue, in
* which case the poll must be spurious, since any poll caused by a
* deliberate wake-call would have been preceded by removing the ticket
* from the queue
*/

// SAFETY: The `AcquireState`s are part of each `Acquire` future and
// thus share the same lifetime. When such a future is dropped, it will
// make sure to remove itself from this list. Should a future be
// "forgotten" it either exists on the heap and its memory location will
// remain valid or, if it exists on the stack, it can't actually be
// leaked, since it has to be pinned before it can insert itself into
// the list
if unsafe { self.waiters.contains(waiter.get_ref()) } {
return Poll::Pending;
match waiter.state.get() {
WaiterState::Woken => Poll::Ready(Ok(())),
WaiterState::Inert => self.poll_acquire_initial(waiter, cx),
WaiterState::Waiting => Poll::Pending,
}

Poll::Ready(Ok(()))
}

fn poll_acquire_initial(
&mut self,
waiter: Pin<&Waiter>,
cx: &mut Context<'_>,
) -> Poll<Result<(), AcquireError>> {
// on first poll, check if there are enough permits to resolve
// immediately or enqueue a waiter ticket to be notified (i.e. polled
// again) later
match self.try_acquire::<false>(waiter.wants) {
Ok(n) => {
// check if we got the desired amount or less
Expand All @@ -474,7 +449,7 @@ impl Shared {
// if no or not enough permits are currently available, enqueue a
// waiter request ticket, to be notified when capacity becomes
// available
waiter.waiting.set(true);
waiter.state.set(WaiterState::Waiting);
waiter.waker.set(cx.waker().clone());
// SAFETY: All waiters remain valid while they are enqueued.
//
Expand Down Expand Up @@ -530,41 +505,22 @@ impl WaiterQueue {
waiting
}

/// Returns `true` if the queue contains `waiter`.
///
/// # Safety
///
/// All pointers must reference valid, live and non-aliased `Waiter`s.
unsafe fn contains(&self, waiter: *const Waiter) -> bool {
let mut curr = self.head;
while !curr.is_null() {
if curr == waiter {
return true;
}
// SAFETY: non-live waiters di not exist in queue, no aliased access
// possible
let next = unsafe { (*curr).next.get() };
curr = next;
}

false
}

/// Enqueues `waiter` at the back of the queue.
///
/// # Safety
///
/// All pointers must reference valid, live and non-aliased `Waiter`s.
unsafe fn push_back(&mut self, waiter: *const Waiter) {
unsafe fn push_back(&mut self, waiter: &Waiter) {
if self.tail.is_null() {
// queue is empty, insert waiter at head
self.head = waiter;
self.tail = waiter;
} else {
// queue is not empty, insert at tail
// SAFETY: non-live waiters di not exist in queue, no aliased access
// possible
// SAFETY: non-live waiters did not exist in queue, no aliased
// access possible
unsafe { (*self.tail).next.set(waiter) };
waiter.prev.set(self.tail);
self.tail = waiter;
}
}
Expand All @@ -574,25 +530,21 @@ impl WaiterQueue {
/// # Safety
///
/// All pointers must reference valid, live and non-aliased `Waiter`s.
unsafe fn try_remove(&mut self, waiter: *const Waiter) {
let mut pred = Cell::from_mut(&mut self.head);
while !pred.get().is_null() {
let curr = pred.get();
// SAFETY: curr is non-null, liveness is required by function safety
// invariant
let next = unsafe { &(*curr).next };
if curr == waiter {
pred.set(next.get());

// if the last waiter is removed, tail must also become `null`
if self.head.is_null() {
self.tail = ptr::null();
}

return;
}
unsafe fn try_remove(&mut self, waiter: &Waiter) {
let prev = waiter.prev.get();
if prev.is_null() {
self.head = waiter.next.get();
} else {
// SAFETY: prev is non-null, liveness required by function invariant
unsafe { (*prev).next.set(waiter.next.get()) };
}

pred = next;
let next = waiter.next.get();
if next.is_null() {
self.tail = waiter.prev.get();
} else {
// SAFETY: next non-null, liveness required by function invariant
unsafe { (*next).prev.set(waiter.prev.get()) };
}
}

Expand All @@ -606,6 +558,8 @@ impl WaiterQueue {
self.head = head.next.get();
if self.head.is_null() {
self.tail = ptr::null();
} else {
unsafe { (*self.head).prev.set(ptr::null()) };
}
}

Expand All @@ -617,9 +571,10 @@ impl WaiterQueue {
// SAFETY: liveness/non-aliasedness required for all waiters by
// function invariant, curr is non-null and valid
unsafe {
(*curr).waiting.set(false);
(*curr).waker.get().wake_by_ref();
curr = (*curr).next.get();
let waiter = &*curr;
waiter.state.set(WaiterState::Woken);
waiter.waker.get().wake_by_ref();
curr = waiter.next.get();
}

woken += 1;
Expand All @@ -639,15 +594,30 @@ struct Waiter {
/// This field is **never** used, if the waiter does not get enqueued,
/// because its request can be fulfilled immediately.
waker: LateInitWaker,
/// The flag determining, whether this future has already been polled.
waiting: Cell<bool>,
/// The flag indicating the waiter's state.
state: Cell<WaiterState>,
/// The counter of already collected permits.
permits: Cell<usize>,
/// The pointer to the next enqueued waiter
next: Cell<*const Self>,
/// The pointer to the previous enqueued waiter
prev: Cell<*const Self>,
// see: https://gist.github.com/Darksonn/1567538f56af1a8038ecc3c664a42462
// this marker lets miri pass the self-referential nature of this struct
_pin: PhantomPinned,
_marker: PhantomPinned,
}

/// The current state of a [`Waiter`].
#[derive(Clone, Copy)]
enum WaiterState {
/// The waiter is inert and its future has not yet been polled.
Inert,
/// The waiter's future has been polled and the waiter was enqueued.
Waiting,
/// The waiter's future has been polled to completion.
///
/// If the waiter had been queued it is now no longer queued.
Woken,
}

/// The `Waker` in an `Acquire` future is only used in case it gets enqueued
Expand Down

0 comments on commit 63a96b3

Please sign in to comment.