Skip to content

Commit

Permalink
review some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Giersch committed May 13, 2024
1 parent ddc89c4 commit 5776f41
Showing 1 changed file with 25 additions and 31 deletions.
56 changes: 25 additions & 31 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,27 @@ where
}

pub(crate) fn len(&self) -> usize {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
unsafe { (*self.0.get()).queue.len() }
}

pub(crate) fn close<const COUNTED: bool>(&self) {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
unsafe { &mut *self.0.get() }.close::<COUNTED>();
}

pub(crate) fn is_closed<const COUNTED: bool>(&self) -> bool {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
unsafe { (*self.0.get()).mask.is_closed::<COUNTED>() }
}

pub(crate) fn try_recv<const COUNTED: bool>(&self) -> Result<T, TryRecvError> {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
unsafe { (*self.0.get()).try_recv::<COUNTED>() }
}

pub(crate) fn poll_recv<const COUNTED: bool>(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
unsafe { (*self.0.get()).poll_recv::<COUNTED>(cx) }
}

Expand All @@ -69,15 +69,12 @@ impl<T> UnsyncQueue<T, Unbounded> {
}

pub(crate) fn send<const COUNTED: bool>(&self, elem: T) -> Result<(), SendError<T>> {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
let queue = unsafe { &mut *self.0.get() };

// check if the channel was closed
if queue.mask.is_closed::<COUNTED>() {
return Err(SendError(elem));
}

// ..otherwise push `elem` and wake a potential waiter
queue.push_and_wake(elem);
Ok(())
}
Expand Down Expand Up @@ -128,26 +125,25 @@ impl<T> UnsyncQueue<T, Bounded> {
}

pub(crate) fn max_capacity(&self) -> usize {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
unsafe { (*self.0.get()).extra.max_capacity }
}

pub(crate) fn capacity(&self) -> usize {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
unsafe { (*self.0.get()).extra.semaphore.available_permits() }
}

pub(crate) fn unbounded_send(&self, elem: T) {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
let queue = unsafe { &mut *self.0.get() };
queue.push_and_wake(elem);
}

pub(crate) fn try_send<const COUNTED: bool>(&self, elem: T) -> Result<(), TrySendError<T>> {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
let queue = unsafe { &mut *self.0.get() };

// check if there is room in the channel and the channel is still open
// Check if there is room in the channel and the channel is still open.
let permit = match queue.extra.semaphore.try_acquire() {
Ok(permit) => permit,
Err(e) => return Err((e, elem).into()),
Expand All @@ -165,9 +161,8 @@ impl<T> UnsyncQueue<T, Bounded> {

/// Performs a bounded send.
pub(crate) async fn send<const COUNTED: bool>(&self, elem: T) -> Result<(), SendError<T>> {
// try to acquire a free slot in the queue
let ptr = self.0.get();
// SAFETY: no mutable or aliased access to queue possible (a mutable
// SAFETY: No mutable or aliased access to queue possible (a mutable
// reference **MUST NOT** be held across the await!)
let Ok(permit) = unsafe { (*ptr).extra.semaphore.acquire() }.await else {
return Err(SendError(elem));
Expand All @@ -178,17 +173,16 @@ impl<T> UnsyncQueue<T, Bounded> {
// The order, i.e., forget first, is somewhat important, because `wake`
// might panic (which can be caught), but only after `elem` is pushed.
permit.forget();
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
unsafe { (*ptr).push_and_wake(elem) };

Ok(())
}

pub(crate) fn try_reserve<const COUNTED: bool>(&self) -> Result<(), TrySendError<()>> {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
let queue = unsafe { &mut *self.0.get() };

// check if there is room in the channel and the channel is still open
// Check if there is room in the channel and the channel is still open.
let permit = queue.extra.semaphore.try_acquire()?;

// Forgetting the permit permanently decreases the number of
Expand All @@ -203,9 +197,8 @@ impl<T> UnsyncQueue<T, Bounded> {
}

pub(crate) async fn reserve<const COUNTED: bool>(&self) -> Result<(), SendError<()>> {
// acquire a free slot in the queue
let ptr = self.0.get();
// SAFETY: no mutable or aliased access to queue possible (a mutable
// SAFETY: No mutable or aliased access to queue possible (a mutable
// reference **MUST NOT** be held across the await!)
let Ok(permit) = unsafe { (*ptr).extra.semaphore.acquire() }.await else {
return Err(SendError(()));
Expand All @@ -218,12 +211,13 @@ impl<T> UnsyncQueue<T, Bounded> {
// avoid storing an additional (redundant) reference in the `Permit`
// struct.
permit.forget();
// SAFETY: No mutable or aliased access to queue possible.
unsafe { (*ptr).extra.reserved += 1 };
Ok(())
}

pub(crate) fn unreserve(&self, consumed: bool) {
// SAFETY: no mutable or aliased access to queue possible
// SAFETY: No mutable or aliased access to queue possible.
let queue = unsafe { &mut (*self.0.get()) };
queue.extra.reserved -= 1;
if !consumed {
Expand Down Expand Up @@ -322,9 +316,9 @@ where
Ok(elem) => Poll::Ready(Some(elem)),
Err(TryRecvError::Disconnected) => Poll::Ready(None),
Err(TryRecvError::Empty) => {
// this overwrite any previous waker, this is unproblematic if
// This overwrites any previous waker, which is unproblematic if
// the same future is polled (spuriously) more than once, but
// would like result in one future to stay pending forever if
// would likely result in one future to stay pending forever, if
// more than one `RecvFuture`s for one channel with overlapping
// lifetimes were to be polled.
self.waker = Some(cx.waker().clone());
Expand Down Expand Up @@ -369,22 +363,22 @@ impl<T> MaybeBoundedQueue for Queue<T, Bounded> {

#[cold]
fn close<const COUNTED: bool>(&mut self) {
// must also close semaphore in order to notify all waiting senders
// Must also close semaphore in order to notify all waiting senders.
self.mask.close::<COUNTED>();
let _ = self.extra.semaphore.close();
}

fn try_recv<const COUNTED: bool>(&mut self) -> Result<Self::Item, TryRecvError> {
match self.pop_front() {
// an element exists in the channel, wake the next blocked
// sender, if any, and return the element
// An element exists in the channel, wake the next blocked
// sender (if any) and return the element.
Some(elem) => {
self.extra.semaphore.add_permits(1);
Ok(elem)
}
// the channel is empty, but may also have been closed already
// The channel is empty, but may also have been closed already, so
// we must also check, if there are outstanding reserved permits
// before the queue can be assessed to be empty
// before the queue can be ascertained to be empty.
None => match self.extra.reserved == 0 && self.mask.is_closed::<COUNTED>() {
true => Err(TryRecvError::Disconnected),
false => Err(TryRecvError::Empty),
Expand Down

0 comments on commit 5776f41

Please sign in to comment.