Skip to content

Commit

Permalink
rephrase code comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Giersch committed Apr 9, 2024
1 parent 744913d commit ddc89c4
Showing 1 changed file with 48 additions and 46 deletions.
94 changes: 48 additions & 46 deletions src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ impl Semaphore {
self.build_acquire(n)
}

/// Returns an correctly initialized [`Acquire`] future instance for
/// acquiring `wants` permits.
/// Returns an initialized [`Acquire`] future instance for acquiring `wants`
/// permits.
fn build_acquire(&self, wants: usize) -> Acquire<'_> {
Acquire { shared: &self.shared, waiter: Waiter::new(wants) }
}
Expand All @@ -128,7 +128,9 @@ impl fmt::Debug for Semaphore {

/// A permit representing access to the [`Semaphore`]'s guarded resource.
pub struct Permit<'a> {
/// The reference to the shared [`Semaphore`] state.
shared: &'a UnsafeCell<Shared>,
/// The number of already acquired permits.
count: usize,
}

Expand Down Expand Up @@ -212,7 +214,7 @@ impl std::error::Error for TryAcquireError {}
/// The [`Future`] returned by [`acquire`](Semaphore::acquire), which
/// resolves when the required number of permits becomes available.
pub struct Acquire<'a> {
/// The shared [`Semaphore`] state.
/// The reference to the shared [`Semaphore`] state.
shared: &'a UnsafeCell<Shared>,
/// The state for waiting and resolving the future.
waiter: Waiter,
Expand All @@ -226,10 +228,10 @@ impl<'a> Future for Acquire<'a> {
// SAFETY: The `Acquire` future can not be moved before being dropped
let waiter = unsafe { Pin::map_unchecked(self.as_ref(), |acquire| &acquire.waiter) };

// SAFETY: no mutable or aliased access to shared possible
// SAFETY: No mutable or aliased access to shared possible.
match unsafe { (*self.shared.get()).poll_acquire(waiter, cx) } {
Poll::Ready(res) => {
// unconditionally setting waiting to false here avoids having
// Unconditionally setting waiting to false here avoids having
// to traverse the waiter queue again when the future is
// dropped.
waiter.state.set(WaiterState::Woken);
Expand All @@ -249,23 +251,21 @@ impl<'a> Future for Acquire<'a> {

impl Drop for Acquire<'_> {
fn drop(&mut self) {
// SAFETY: no mutable or aliased access to shared possible
// SAFETY: No mutable or aliased access to shared possible.
let shared = unsafe { &mut (*self.shared.get()) };

// remove the queued waker, if it was already enqueued
// Remove the queued waker, if it was already enqueued.
if let WaiterState::Waiting = self.waiter.state.get() {
// check, if there exists some entry in queue of waiters with the
// same ID as this future
// SAFETY: non-live waiters did not exist in queue, no aliased
// Check, if there exists some entry in queue of waiters with the
// same ID as this future.
// SAFETY: Non-live waiters did not exist in queue, no aliased
// access possible
unsafe { shared.waiters.try_remove(&self.waiter) };
}

// return all "unused" (i.e., not passed on into a [`Permit`]) permits
// back to the semaphore
// Return all "unused" permits back to the semaphore.
let permits = self.waiter.permits.get();
// the order is important here, because `add_permits` may mark permits
// as handed out again, if they are transfered to other waiters
// The order is important here, because `add_permits` may mark permits
// as handed out again, if they are transfered to other waiters.
shared.add_permits(permits);
}
}
Expand All @@ -284,7 +284,7 @@ impl Shared {
/// Closes the semaphore and notifies all remaining waiters.
#[cold]
fn close(&mut self) -> usize {
// SAFETY: non-live waiters di not exist in queue, no aliased access
// SAFETY: Non-live waiters do not exist in queue, no aliased access
// possible
let woken = unsafe { self.waiters.wake_all() };
self.closed = true;
Expand All @@ -302,22 +302,22 @@ impl Shared {
/// completed.
fn add_permits(&mut self, mut n: usize) {
while n > 0 {
// keep checking the waiter queue until are permits are distributed
// Keep checking the waiter queue until are permits are distributed.
if let Some(waiter) = self.waiters.front() {
// SAFETY: All waiters remain valid while they are enqueued.
let waiter = unsafe { waiter.as_ref() };
// check, how many permits have already been assigned and
// how many were requested
// Check, how many permits have already been assigned and
// how many were requested.
let diff = waiter.wants - waiter.permits.get();
if diff > n {
// waiter wants more permits than are still available
// the waiter gets all available permits & the loop
// terminated (n = 0)
// The waiter wants more permits than are currently
// available so the waiter gets all available permits & the
// loop is terminated (n = 0).
waiter.permits.set(diff - n);
return;
} else {
// the waiters request can be completed, assign all
// missing permits, wake the waiter, continue the loop
// The waiters request can be fulfilled completely, assign
// all missing permits, wake the waiter & continue the loop
waiter.permits.set(waiter.wants);
n -= diff;

Expand All @@ -327,7 +327,7 @@ impl Shared {
unsafe {
waiter.state.set(WaiterState::Woken);
waiter.waker.get().wake_by_ref();
// ...finally, dequeue the notified waker
// ...finally, dequeue the notified waker.
self.waiters.pop_front(waiter);
};
}
Expand All @@ -350,12 +350,12 @@ impl Shared {
return Err(TryAcquireError::NoPermits);
}

// hand out all available permits
// Hand out all available permits.
let count = self.permits;
self.permits = 0;
Ok(count)
} else {
// can not underflow because n <= permits
// This can not underflow, because n <= permits holds.
self.permits -= n;
Ok(n)
}
Expand All @@ -367,8 +367,8 @@ impl Shared {
cx: &mut Context<'_>,
) -> Poll<Result<(), AcquireError>> {
if self.closed {
// a waiter *may* or *may not* be in the queue, but `Acquire::drop`
// will take care of this eventually
// A waiter *may* or *may not* be in the queue, but `Acquire::drop`
// will take care of this eventually.
return Poll::Ready(Err(AcquireError(())));
}

Expand All @@ -384,12 +384,12 @@ impl Shared {
waiter: Pin<&Waiter>,
cx: &mut Context<'_>,
) -> Poll<Result<(), AcquireError>> {
// on first poll, check if there are enough permits to resolve
// On the first poll, check if there are enough permits to resolve
// immediately or enqueue a waiter ticket to be notified (i.e. polled
// again) later
// again) later.
match self.try_acquire::<false>(waiter.wants) {
Ok(n) => {
// check if we got the desired amount or less
// Check if we got the desired amount or less
waiter.permits.set(n);
if n == waiter.wants {
return Poll::Ready(Ok(()));
Expand All @@ -399,9 +399,9 @@ impl Shared {
_ => {}
};

// if no or not enough permits are currently available, enqueue a
// waiter request ticket, to be notified when capacity becomes
// available
// If no (or not enough) permits are currently available, enqueue a
// waiter request "ticket", to be notified when capacity becomes
// available.
waiter.state.set(WaiterState::Waiting);
waiter.waker.set(cx.waker().clone());
// SAFETY: All waiters remain valid while they are enqueued.
Expand All @@ -425,6 +425,7 @@ impl Shared {
}
}

/// A simple doubly-linked list of pinned [`Waiter`]s.
struct WaiterQueue {
head: *const Waiter,
tail: *const Waiter,
Expand All @@ -448,10 +449,11 @@ impl WaiterQueue {
/// All pointers must reference valid, live and non-aliased `Waiter`s.
#[cold]
unsafe fn len(&self) -> usize {
// this is only used in the [`Debug`] implementation, so counting each
// waiter one by one here is irrelevant to performance
// This is only used in the [`Debug`] implementation, so counting each
// waiter one by one here is irrelevant to performance.
let mut curr = self.head;
let mut waiting = 0;

while !curr.is_null() {
// SAFETY: curr is non-null, validity is required by function safety
curr = unsafe { (*curr).next.get() };
Expand All @@ -468,11 +470,11 @@ impl WaiterQueue {
/// All pointers must reference valid, live and non-aliased `Waiter`s.
unsafe fn push_back(&mut self, waiter: &Waiter) {
if self.tail.is_null() {
// queue is empty, insert waiter at head
// The queue is empty, insert waiter at the head position.
self.head = waiter;
self.tail = waiter;
} else {
// queue is not empty, insert at tail
// The queue is not empty, insert at tail.
// SAFETY: non-live waiters did not exist in queue, no aliased
// access possible
unsafe { (*self.tail).next.set(waiter) };
Expand Down Expand Up @@ -627,7 +629,7 @@ mod tests {

use core::{
future::Future as _,
ptr,
pin, ptr,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};

Expand Down Expand Up @@ -661,7 +663,7 @@ mod tests {
fn acquire_never() {
future::block_on(async {
let sem = super::Semaphore::new(0);
let mut fut = core::pin::pin!(sem.acquire());
let mut fut = pin::pin!(sem.acquire());

core::future::poll_fn(|cx| {
assert!(fut.as_mut().poll(cx).is_pending());
Expand All @@ -677,8 +679,8 @@ mod tests {
fn acquire() {
future::block_on(async {
let sem = super::Semaphore::new(0);
let mut fut = core::pin::pin!(sem.acquire());
core::future::poll_fn(|cx| {
let mut fut = pin::pin!(sem.acquire());
future::poll_fn(|cx| {
assert!(fut.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
Expand All @@ -695,10 +697,10 @@ mod tests {
fn acquire_one() {
future::block_on(async {
let sem = super::Semaphore::new(0);
let mut fut = core::pin::pin!(sem.acquire());
let mut fut = pin::pin!(sem.acquire());

// poll future once to enqueue waiter
core::future::poll_fn(|cx| {
future::poll_fn(|cx| {
assert!(fut.as_mut().poll(cx).is_pending());
assert_eq!(sem.waiters(), 1);
// add 2 permits, one goes directly to the enqueued waiter and
Expand All @@ -720,7 +722,7 @@ mod tests {
fn poll_acquire_after_completion() {
future::block_on(async {
let sem = super::Semaphore::new(0);
let mut fut = core::pin::pin!(sem.acquire());
let mut fut = pin::pin!(sem.acquire());
core::future::poll_fn(|cx| {
assert!(fut.as_mut().poll(cx).is_pending());
Poll::Ready(())
Expand Down

0 comments on commit ddc89c4

Please sign in to comment.