Skip to content

Commit

Permalink
Semaphore::acquire[_many] returns non-anonymous future
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Giersch committed Apr 22, 2023
1 parent 5d24c84 commit 08e9eb9
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ impl<T> Future for Receiver<T> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let slot = &self.get_mut().slot;
// SAFETY: no mutable or aliased access to slot possible
unsafe { (*slot.get()).poll_recv(cx) }
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl<T> UnsyncQueue<T, Bounded> {

pub(crate) fn unreserve(&self) {
// SAFETY: no mutable or aliased access to queue possible
drop(unsafe { (*self.0.get()).extra.semaphore.make_permit(1) });
unsafe { (*self.0.get()).extra.semaphore.return_permits(1) }
}

#[cfg(test)]
Expand Down Expand Up @@ -441,8 +441,8 @@ where
type Output = Option<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: no mutable or aliased access to queue possible
let queue = self.get_mut().queue;
// SAFETY: no mutable or aliased access to queue possible
unsafe { (*queue.get()).poll_recv::<COUNTED>(cx) }
}
}
85 changes: 55 additions & 30 deletions src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,38 +97,37 @@ impl Semaphore {
/// permits.
pub fn try_acquire_many(&self, n: usize) -> Result<Permit<'_>, TryAcquireError> {
// SAFETY: no mutable or aliased access to shared possible
unsafe { (*self.shared.get()).try_acquire::<true>(n) }.map(|_| self.make_permit(n))
unsafe { (*self.shared.get()).try_acquire::<true>(n) }.map(|_| Permit::new(&self.shared, n))
}

/// Acquires a single [`Permit`], potentially blocking until one becomes
/// available.
///
/// # Errors
///
/// Fails, if the semaphore has been closed.
pub async fn acquire(&self) -> Result<Permit<'_>, AcquireError> {
self.acquire_many(1).await
/// Awaiting the [`Future`] fails, if the semaphore has been closed.
pub fn acquire(&self) -> Acquire<'_> {
self.build_acquire(1)
}

/// Acquires `n` [`Permit`]s, potentially blocking until they become
/// available.
///
/// # Errors
///
/// Fails, if the semaphore has been closed.
pub async fn acquire_many(&self, n: usize) -> Result<Permit<'_>, AcquireError> {
let _ = self.build_acquire(n).await?;
Ok(self.make_permit(n))
/// Awaiting the [`Future`] fails, if the semaphore has been closed.
pub fn acquire_many(&self, n: usize) -> Acquire<'_> {
self.build_acquire(n)
}

/// Returns a new `Permit` without actually acquiring it.
///
/// NOTE: Only use this to "revive" a Permit that has been explicitly
/// [forgotten](Permit::forget)!
pub(crate) fn make_permit(&self, count: usize) -> Permit<'_> {
Permit { shared: &self.shared, count }
/// Returns `n` previously "forgotten" permits back to the semaphore.
pub(crate) fn return_permits(&self, n: usize) {
// SAFETY: no mutable or aliased access to shared possible
unsafe { (*self.shared.get()).return_permits(n) }
}

/// Returns an correctly initialized [`Acquire`] future instance for
/// acquiring `wants` permits.
fn build_acquire(&self, wants: usize) -> Acquire<'_> {
Acquire {
shared: &self.shared,
Expand Down Expand Up @@ -161,6 +160,14 @@ pub struct Permit<'a> {
}

impl<'a> Permit<'a> {
/// Returns a new [`Permit`] without actually acquiring it.
///
/// NOTE: Only use this to "revive" a Permit that has been explicitly
/// [forgotten](Permit::forget)!
fn new(shared: &'a UnsafeCell<Shared>, count: usize) -> Self {
Self { shared, count }
}

/// Drops the permit without returning it to the [`Semaphore`].
///
/// This permanently reduces the number of available permits.
Expand All @@ -175,14 +182,7 @@ impl Drop for Permit<'_> {
fn drop(&mut self) {
// SAFETY: no mutable or aliased access to shared possible
let shared = unsafe { &mut (*self.shared.get()) };

// underflow *would* be possible, since `Permit`s can be forgotten,
// which reduces outstanding permits and then be created again
// (internally) "out of thin air"; the order is important here, because
// `add_permits` may mark permits as handed out again, if they are
// transfered to other waiters
shared.outstanding_permits = shared.outstanding_permits.saturating_sub(self.count);
shared.add_permits(self.count);
shared.return_permits(self.count);
}
}

Expand Down Expand Up @@ -240,15 +240,15 @@ impl std::error::Error for TryAcquireError {}

/// The [`Future`] returned by [`acquire`](Semaphore::acquire), which
/// resolves when the required number of permits becomes available.
struct Acquire<'a> {
pub struct Acquire<'a> {
/// The shared [`Semaphore`] state.
shared: &'a UnsafeCell<Shared>,
/// The state for waiting and resolving the future.
waiter: Waiter,
}

impl Future for Acquire<'_> {
type Output = Result<usize, AcquireError>;
impl<'a> Future for Acquire<'a> {
type Output = Result<Permit<'a>, AcquireError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: The `Acquire` future can not be moved before being dropped
Expand All @@ -263,8 +263,9 @@ impl Future for Acquire<'_> {
waiter.waiting.set(false);
match res {
Ok(_) => {
let shared = self.as_ref().shared;
let count = waiter.permits.take();
Poll::Ready(Ok(count))
Poll::Ready(Ok(Permit::new(shared, count)))
}
Err(e) => Poll::Ready(Err(e)),
}
Expand All @@ -283,6 +284,8 @@ impl Drop for Acquire<'_> {
if self.waiter.waiting.get() {
// check, if there exists some entry in queue of waiters with the
// same ID as this future
// SAFETY: non-live waiters di not exist in queue, no aliased access
// possible
unsafe { shared.waiters.try_remove(&self.waiter) };
}

Expand Down Expand Up @@ -311,8 +314,10 @@ struct Shared {
impl Shared {
/// Closes the semaphore and notifies all remaining waiters.
fn close(&mut self) -> usize {
self.closed = true;
// SAFETY: non-live waiters di not exist in queue, no aliased access
// possible
let woken = unsafe { self.waiters.wake_all() };
self.closed = true;
self.waiters = WaiterQueue::new();

woken
Expand Down Expand Up @@ -365,6 +370,16 @@ impl Shared {
}
}

fn return_permits(&mut self, n: usize) {
// underflow *would* be possible, since `Permit`s can be forgotten,
// which reduces outstanding permits and then be created again
// (internally) "out of thin air"; the order is important here, because
// `add_permits` may mark permits as handed out again, if they are
// transfered to other waiters
self.outstanding_permits = self.outstanding_permits.saturating_sub(n);
self.add_permits(n);
}

/// Attempts to reduce available permits by up to `n` or returns an error,
/// if the semaphore has been closed or has no available permits.
fn try_acquire<const STRICT: bool>(&mut self, n: usize) -> Result<usize, TryAcquireError> {
Expand Down Expand Up @@ -477,8 +492,7 @@ impl Shared {
// actually prevent the destructor from running, since only the pinned
// reference can be leaked.
unsafe { self.waiters.push_back(waiter.get_ref()) }

return Poll::Pending;
Poll::Pending
}
}

Expand Down Expand Up @@ -507,6 +521,7 @@ impl WaiterQueue {
let mut curr = self.head;
let mut waiting = 0;
while !curr.is_null() {
// SAFETY: curr is non-null, validity is required by function safety
curr = unsafe { (*curr).next.get() };
waiting += 1;
}
Expand All @@ -525,7 +540,8 @@ impl WaiterQueue {
if curr == waiter {
return true;
}

// SAFETY: non-live waiters di not exist in queue, no aliased access
// possible
let next = unsafe { (*curr).next.get() };
curr = next;
}
Expand All @@ -545,6 +561,8 @@ impl WaiterQueue {
self.tail = waiter;
} else {
// queue is not empty, insert at tail
// SAFETY: non-live waiters di not exist in queue, no aliased access
// possible
unsafe { (*self.tail).next.set(waiter) };
self.tail = waiter;
}
Expand All @@ -559,6 +577,8 @@ impl WaiterQueue {
let mut pred = Cell::from_mut(&mut self.head);
while !pred.get().is_null() {
let curr = pred.get();
// SAFETY: curr is non-null, liveness is required by function safety
// invariant
let next = unsafe { &(*curr).next };
if curr == waiter {
pred.set(next.get());
Expand Down Expand Up @@ -593,6 +613,8 @@ impl WaiterQueue {
let mut woken = 0;

while !curr.is_null() {
// SAFETY: liveness/non-aliasedness required for all waiters by
// function invariant, curr is non-null and valid
unsafe {
(*curr).waiting.set(false);
(*curr).waker.get().wake_by_ref();
Expand Down Expand Up @@ -640,10 +662,13 @@ impl LateInitWaker {
}

fn set(&self, waker: Waker) {
// SAFETY: no mutable or aliased access to waker possible, writing the
// waker is unproblematic due to the required liveness of the pointer.
unsafe { (*self.0.get()).as_mut_ptr().write(waker) };
}

unsafe fn get(&self) -> &Waker {
// SAFETY: initness required as function invariant, no al
unsafe { (*self.0.get()).assume_init_ref() }
}
}
Expand Down

0 comments on commit 08e9eb9

Please sign in to comment.