From 14d0af67d25f20d03585b743a5bd8b49b73e83c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 9 Dec 2023 14:51:41 +0000 Subject: [PATCH 01/41] mpsc: add a way to reserve many permit on bounded mpsc chan --- tokio/src/sync/mpsc/bounded.rs | 213 ++++++++++++++++++++++++++++++++- 1 file changed, 209 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 4aa8b6377ca..77aa76451e9 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -68,6 +68,18 @@ pub struct Permit<'a, T> { chan: &'a chan::Tx, } +/// Permits to send `n` values into the channel. +/// +/// `ManyPermit` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`] +/// and are used to guarantee channel capacity before generating `n` message to send. +/// +/// [`Sender::reserve_many()`]: Sender::reserve_many +/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many +pub struct ManyPermit<'a, T> { + chan: &'a chan::Tx, + n: u32, +} + /// Owned permit to send one value into the channel. /// /// This is identical to the [`Permit`] type, except that it moves the sender @@ -849,10 +861,64 @@ impl Sender { /// } /// ``` pub async fn reserve(&self) -> Result, SendError<()>> { - self.reserve_inner().await?; + self.reserve_inner(1).await?; Ok(Permit { chan: &self.chan }) } + /// Waits for channel capacity. Once capacity to send `n` message is + /// available, it is reserved for the caller. + /// + /// If the channel is full, the function waits for the number of unreceived + /// messages to become less than the channel capacity. Capacity to send `n` + /// message is reserved for the caller. A [`ManyPermit`] is returned to track + /// the reserved capacity. The [`send`] function on [`ManyPermit`] consumes the + /// reserved capacity. + /// + /// Dropping [`ManyPermit`] without sending a message releases the capacity back + /// to the channel. + /// + /// [`ManyPermit`]: ManyPermit + /// [`send`]: ManyPermit::send + /// + /// # Cancel safety + /// + /// This channel uses a queue to ensure that calls to `send` and `reserve` + /// complete in the order they were requested. Cancelling a call to + /// `reserve` makes you lose your place in the queue. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(2); + /// + /// // Reserve capacity + /// let mut permit = tx.reserve_many(2).await.unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Sending on the permit succeeds + /// permit.send(456).unwrap(); + /// permit.send(457).unwrap(); + /// + /// // The third should fail due to no available capacity + /// permit.send(458).unwrap_err(); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// assert_eq!(rx.recv().await.unwrap(), 457); + /// } + /// ``` + pub async fn reserve_many(&self, n: u32) -> Result, SendError<()>> { + self.reserve_inner(n).await?; + Ok(ManyPermit { chan: &self.chan, n }) + } + /// Waits for channel capacity, moving the `Sender` and returning an owned /// permit. Once capacity to send one message is available, it is reserved /// for the caller. @@ -934,16 +1000,16 @@ impl Sender { /// [`send`]: OwnedPermit::send /// [`Arc::clone`]: std::sync::Arc::clone pub async fn reserve_owned(self) -> Result, SendError<()>> { - self.reserve_inner().await?; + self.reserve_inner(1).await?; Ok(OwnedPermit { chan: Some(self.chan), }) } - async fn reserve_inner(&self) -> Result<(), SendError<()>> { + async fn reserve_inner(&self, n: u32) -> Result<(), SendError<()>> { crate::trace::async_trace_leaf().await; - match self.chan.semaphore().semaphore.acquire(1).await { + match self.chan.semaphore().semaphore.acquire(n).await { Ok(()) => Ok(()), Err(_) => Err(SendError(())), } @@ -1002,6 +1068,64 @@ impl Sender { Ok(Permit { chan: &self.chan }) } + /// Tries to acquire n slot in the channel without waiting for the slot to become + /// available. + /// + /// If the channel is full this function will return [`TrySendError`], otherwise + /// if there is a slot available it will return a [`ManyPermit`] that will then allow you + /// to [`send`] on the channel with a guaranteed slot. This function is similar to + /// [`reserve_many`] except it does not await for the slot to become available. + /// + /// Dropping [`ManyPermit`] without sending a message releases the capacity back + /// to the channel. + /// + /// [`ManyPermit`]: ManyPermit + /// [`send`]: ManyPermit::send + /// [`reserve_many`]: Sender::reserve_many + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(2); + /// + /// // Reserve capacity + /// let mut permit = tx.try_reserve_many(2).unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Trying to reserve an additional slot on the `tx` will + /// // fail because there is no capacity. + /// assert!(tx.try_reserve().is_err()); + /// + /// // Sending on the permit succeeds + /// permit.send(456).unwrap(); + /// permit.send(457).unwrap(); + /// + /// // The third should fail due to no available capacity + /// permit.send(458).unwrap_err(); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// assert_eq!(rx.recv().await.unwrap(), 457); + /// + /// } + /// ``` + pub fn try_reserve_many(&self, n: u32) -> Result, TrySendError<()>> { + match self.chan.semaphore().semaphore.try_acquire(n) { + Ok(()) => {} + Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())), + Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())), + } + + Ok(ManyPermit { chan: &self.chan, n }) + } + /// Tries to acquire a slot in the channel without waiting for the slot to become /// available, returning an owned permit. /// @@ -1278,6 +1402,87 @@ impl fmt::Debug for Permit<'_, T> { } } + +// ===== impl ManyPermit ===== + +impl ManyPermit<'_, T> { + /// Sends a value using the reserved capacity. + /// + /// Capacity for the messages has already been reserved. The message is sent + /// to the receiver and the permit capacity is reduced by one. If there is no + /// remaining capacity a [`SendError`] will be returned with the provided value. + /// The operation will succeed even if the receiver half has been closed. + /// See [`Receiver::close`] for more details on performing a clean shutdown. + /// + /// [`Receiver::close`]: Receiver::close + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(2); + /// + /// // Reserve capacity + /// let mut permit = tx.reserve_many(2).await.unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Send 2 messages on the permit + /// permit.send(456).unwrap(); + /// permit.send(457).unwrap(); + /// + /// // The third should fail due to no available capacity + /// permit.send(458).unwrap_err(); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// assert_eq!(rx.recv().await.unwrap(), 457); + /// } + /// ``` + pub fn send(&mut self, value: T) -> Result<(), SendError> { + // There is no remaining capacity on this permit + if self.n <= 0 { + return Err(SendError(value)); + } + + self.chan.send(value); + self.n -= 1; + Ok(()) + } +} + +impl Drop for ManyPermit<'_, T> { + fn drop(&mut self) { + use chan::Semaphore; + + let semaphore = self.chan.semaphore(); + + // Add the remaining permits back to the semaphore + semaphore.add_permits(self.n as usize); + + // If this is the last sender for this channel, wake the receiver so + // that it can be notified that the channel is closed. + if semaphore.is_closed() && semaphore.is_idle() { + self.chan.wake_rx(); + } + } +} + +impl fmt::Debug for ManyPermit<'_, T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Permit") + .field("chan", &self.chan) + .field("capacity", &self.n) + .finish() + } +} + + // ===== impl Permit ===== impl OwnedPermit { From 92c26b592887ccdfdf6c876d00ef7588d46ef2b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 9 Dec 2023 15:00:08 +0000 Subject: [PATCH 02/41] mpsc: apply clippy rules + fmt --- tokio/src/sync/mpsc/bounded.rs | 66 ++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 77aa76451e9..ec127ec58e0 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -76,8 +76,8 @@ pub struct Permit<'a, T> { /// [`Sender::reserve_many()`]: Sender::reserve_many /// [`Sender::try_reserve_many()`]: Sender::try_reserve_many pub struct ManyPermit<'a, T> { - chan: &'a chan::Tx, - n: u32, + chan: &'a chan::Tx, + n: u32, } /// Owned permit to send one value into the channel. @@ -865,7 +865,7 @@ impl Sender { Ok(Permit { chan: &self.chan }) } - /// Waits for channel capacity. Once capacity to send `n` message is + /// Waits for channel capacity. Once capacity to send `n` message is /// available, it is reserved for the caller. /// /// If the channel is full, the function waits for the number of unreceived @@ -906,17 +906,20 @@ impl Sender { /// permit.send(456).unwrap(); /// permit.send(457).unwrap(); /// - /// // The third should fail due to no available capacity + /// // The third should fail due to no available capacity /// permit.send(458).unwrap_err(); /// - /// // The value sent on the permit is received + /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); /// assert_eq!(rx.recv().await.unwrap(), 457); /// } /// ``` pub async fn reserve_many(&self, n: u32) -> Result, SendError<()>> { self.reserve_inner(n).await?; - Ok(ManyPermit { chan: &self.chan, n }) + Ok(ManyPermit { + chan: &self.chan, + n, + }) } /// Waits for channel capacity, moving the `Sender` and returning an owned @@ -1068,7 +1071,7 @@ impl Sender { Ok(Permit { chan: &self.chan }) } - /// Tries to acquire n slot in the channel without waiting for the slot to become + /// Tries to acquire n slot in the channel without waiting for the slot to become /// available. /// /// If the channel is full this function will return [`TrySendError`], otherwise @@ -1106,10 +1109,10 @@ impl Sender { /// // Sending on the permit succeeds /// permit.send(456).unwrap(); /// permit.send(457).unwrap(); - /// - /// // The third should fail due to no available capacity + /// + /// // The third should fail due to no available capacity /// permit.send(458).unwrap_err(); - /// + /// /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); /// assert_eq!(rx.recv().await.unwrap(), 457); @@ -1123,7 +1126,10 @@ impl Sender { Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())), } - Ok(ManyPermit { chan: &self.chan, n }) + Ok(ManyPermit { + chan: &self.chan, + n, + }) } /// Tries to acquire a slot in the channel without waiting for the slot to become @@ -1402,17 +1408,16 @@ impl fmt::Debug for Permit<'_, T> { } } - // ===== impl ManyPermit ===== impl ManyPermit<'_, T> { /// Sends a value using the reserved capacity. /// /// Capacity for the messages has already been reserved. The message is sent - /// to the receiver and the permit capacity is reduced by one. If there is no - /// remaining capacity a [`SendError`] will be returned with the provided value. - /// The operation will succeed even if the receiver half has been closed. - /// See [`Receiver::close`] for more details on performing a clean shutdown. + /// to the receiver and the permit capacity is reduced by one. If there is no + /// remaining capacity a [`SendError`] will be returned with the provided value. + /// The operation will succeed even if the receiver half has been closed. + /// See [`Receiver::close`] for more details on performing a clean shutdown. /// /// [`Receiver::close`]: Receiver::close /// @@ -1435,30 +1440,30 @@ impl ManyPermit<'_, T> { /// // Send 2 messages on the permit /// permit.send(456).unwrap(); /// permit.send(457).unwrap(); - /// - /// // The third should fail due to no available capacity + /// + /// // The third should fail due to no available capacity /// permit.send(458).unwrap_err(); - /// - /// // The value sent on the permit is received + /// + /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); /// assert_eq!(rx.recv().await.unwrap(), 457); /// } /// ``` pub fn send(&mut self, value: T) -> Result<(), SendError> { - // There is no remaining capacity on this permit - if self.n <= 0 { - return Err(SendError(value)); - } - - self.chan.send(value); - self.n -= 1; - Ok(()) + // There is no remaining capacity on this permit + if self.n == 0 { + return Err(SendError(value)); + } + + self.chan.send(value); + self.n -= 1; + Ok(()) } } impl Drop for ManyPermit<'_, T> { fn drop(&mut self) { - use chan::Semaphore; + use chan::Semaphore; let semaphore = self.chan.semaphore(); @@ -1477,12 +1482,11 @@ impl fmt::Debug for ManyPermit<'_, T> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Permit") .field("chan", &self.chan) - .field("capacity", &self.n) + .field("capacity", &self.n) .finish() } } - // ===== impl Permit ===== impl OwnedPermit { From b684f3c931fd821a98e4b9a59d430145bd095f78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 9 Dec 2023 15:06:09 +0000 Subject: [PATCH 03/41] mpsc: export the `ManyPermit` struct --- tokio/src/sync/mpsc/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs index b2af084b2ae..6438ebdd773 100644 --- a/tokio/src/sync/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -95,7 +95,7 @@ pub(super) mod block; mod bounded; -pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender, WeakSender}; +pub use self::bounded::{channel, OwnedPermit, Permit, ManyPermit, Receiver, Sender, WeakSender}; mod chan; From c40178bfdcf226b64c97e56edcf53352f54dcc88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 9 Dec 2023 15:09:40 +0000 Subject: [PATCH 04/41] mpsc: fix fmt --- tokio/src/sync/mpsc/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs index 6438ebdd773..ce201412bd9 100644 --- a/tokio/src/sync/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -95,7 +95,7 @@ pub(super) mod block; mod bounded; -pub use self::bounded::{channel, OwnedPermit, Permit, ManyPermit, Receiver, Sender, WeakSender}; +pub use self::bounded::{channel, ManyPermit, OwnedPermit, Permit, Receiver, Sender, WeakSender}; mod chan; From 53fb399ba1872d2bcac73fad5da27f9646296285 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 00:00:22 +0000 Subject: [PATCH 05/41] mpsc: return an `Iterator` impl for the `*_reserve_many` fns --- tokio/src/sync/mpsc/bounded.rs | 111 +++++++++++---------------------- tokio/src/sync/mpsc/mod.rs | 4 +- 2 files changed, 41 insertions(+), 74 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index ec127ec58e0..7b950288690 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -68,14 +68,14 @@ pub struct Permit<'a, T> { chan: &'a chan::Tx, } -/// Permits to send `n` values into the channel. +/// An [`Iterator`] of [`Permit`] that can be used to reserve `n` slots in the channel /// -/// `ManyPermit` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`] +/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`] /// and are used to guarantee channel capacity before generating `n` message to send. /// /// [`Sender::reserve_many()`]: Sender::reserve_many /// [`Sender::try_reserve_many()`]: Sender::try_reserve_many -pub struct ManyPermit<'a, T> { +pub struct PermitIterator<'a, T> { chan: &'a chan::Tx, n: u32, } @@ -870,15 +870,16 @@ impl Sender { /// /// If the channel is full, the function waits for the number of unreceived /// messages to become less than the channel capacity. Capacity to send `n` - /// message is reserved for the caller. A [`ManyPermit`] is returned to track - /// the reserved capacity. The [`send`] function on [`ManyPermit`] consumes the - /// reserved capacity. + /// message is reserved for the caller. A [`PermitIterator`] is returned to track + /// the reserved capacity. You can call this [`Iterator`] until it is exhausted to + /// get a [`Permit`] and then call [`Permit::send`]. /// - /// Dropping [`ManyPermit`] without sending a message releases the capacity back + /// Dropping [`PermitIterator`] without sending a message releases the capacity back /// to the channel. /// - /// [`ManyPermit`]: ManyPermit - /// [`send`]: ManyPermit::send + /// [`PermitIterator`]: PermitIterator + /// [`Permit`]: Permit + /// [`send`]: Permit::send /// /// # Cancel safety /// @@ -902,21 +903,21 @@ impl Sender { /// // available capacity. /// assert!(tx.try_send(123).is_err()); /// - /// // Sending on the permit succeeds - /// permit.send(456).unwrap(); - /// permit.send(457).unwrap(); + /// // Sending with the permit iterator succeeds + /// permit.next().unwrap().send(456); + /// permit.next().unwrap().send(457); /// - /// // The third should fail due to no available capacity - /// permit.send(458).unwrap_err(); + /// // The iterator should now be exhausted + /// assert!(permit.next().is_none()); /// /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); /// assert_eq!(rx.recv().await.unwrap(), 457); /// } /// ``` - pub async fn reserve_many(&self, n: u32) -> Result, SendError<()>> { + pub async fn reserve_many(&self, n: u32) -> Result, SendError<()>> { self.reserve_inner(n).await?; - Ok(ManyPermit { + Ok(PermitIterator { chan: &self.chan, n, }) @@ -1076,14 +1077,16 @@ impl Sender { /// /// If the channel is full this function will return [`TrySendError`], otherwise /// if there is a slot available it will return a [`ManyPermit`] that will then allow you - /// to [`send`] on the channel with a guaranteed slot. This function is similar to + /// to [`send`] on the channel with a guaranteed slot. A [`PermitIterator`] is returned to track + /// the reserved capacity. You can call this [`Iterator`] until it is exhausted to + /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to /// [`reserve_many`] except it does not await for the slot to become available. /// /// Dropping [`ManyPermit`] without sending a message releases the capacity back /// to the channel. /// - /// [`ManyPermit`]: ManyPermit - /// [`send`]: ManyPermit::send + /// [`PermitIterator`]: PermitIterator + /// [`send`]: Permit::send /// [`reserve_many`]: Sender::reserve_many /// /// # Examples @@ -1106,12 +1109,12 @@ impl Sender { /// // fail because there is no capacity. /// assert!(tx.try_reserve().is_err()); /// - /// // Sending on the permit succeeds - /// permit.send(456).unwrap(); - /// permit.send(457).unwrap(); + /// // Sending with the permit iterator succeeds + /// permit.next().unwrap().send(456); + /// permit.next().unwrap().send(457); /// - /// // The third should fail due to no available capacity - /// permit.send(458).unwrap_err(); + /// // The iterator should now be exhausted + /// assert!(permit.next().is_none()); /// /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); @@ -1119,14 +1122,14 @@ impl Sender { /// /// } /// ``` - pub fn try_reserve_many(&self, n: u32) -> Result, TrySendError<()>> { + pub fn try_reserve_many(&self, n: u32) -> Result, TrySendError<()>> { match self.chan.semaphore().semaphore.try_acquire(n) { Ok(()) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())), Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())), } - Ok(ManyPermit { + Ok(PermitIterator { chan: &self.chan, n, }) @@ -1408,60 +1411,22 @@ impl fmt::Debug for Permit<'_, T> { } } -// ===== impl ManyPermit ===== +// ===== impl PermitIterator ===== -impl ManyPermit<'_, T> { - /// Sends a value using the reserved capacity. - /// - /// Capacity for the messages has already been reserved. The message is sent - /// to the receiver and the permit capacity is reduced by one. If there is no - /// remaining capacity a [`SendError`] will be returned with the provided value. - /// The operation will succeed even if the receiver half has been closed. - /// See [`Receiver::close`] for more details on performing a clean shutdown. - /// - /// [`Receiver::close`]: Receiver::close - /// - /// # Examples - /// - /// ``` - /// use tokio::sync::mpsc; - /// - /// #[tokio::main] - /// async fn main() { - /// let (tx, mut rx) = mpsc::channel(2); - /// - /// // Reserve capacity - /// let mut permit = tx.reserve_many(2).await.unwrap(); - /// - /// // Trying to send directly on the `tx` will fail due to no - /// // available capacity. - /// assert!(tx.try_send(123).is_err()); - /// - /// // Send 2 messages on the permit - /// permit.send(456).unwrap(); - /// permit.send(457).unwrap(); - /// - /// // The third should fail due to no available capacity - /// permit.send(458).unwrap_err(); - /// - /// // The value sent on the permit is received - /// assert_eq!(rx.recv().await.unwrap(), 456); - /// assert_eq!(rx.recv().await.unwrap(), 457); - /// } - /// ``` - pub fn send(&mut self, value: T) -> Result<(), SendError> { - // There is no remaining capacity on this permit +impl<'a, T> Iterator for PermitIterator<'a, T> { + type Item = Permit<'a, T>; + + fn next(&mut self) -> Option { if self.n == 0 { - return Err(SendError(value)); + return None; } - self.chan.send(value); self.n -= 1; - Ok(()) + Some(Permit { chan: self.chan }) } } -impl Drop for ManyPermit<'_, T> { +impl Drop for PermitIterator<'_, T> { fn drop(&mut self) { use chan::Semaphore; @@ -1478,7 +1443,7 @@ impl Drop for ManyPermit<'_, T> { } } -impl fmt::Debug for ManyPermit<'_, T> { +impl fmt::Debug for PermitIterator<'_, T> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Permit") .field("chan", &self.chan) diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs index ce201412bd9..052620be1a9 100644 --- a/tokio/src/sync/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -95,7 +95,9 @@ pub(super) mod block; mod bounded; -pub use self::bounded::{channel, ManyPermit, OwnedPermit, Permit, Receiver, Sender, WeakSender}; +pub use self::bounded::{ + channel, OwnedPermit, Permit, PermitIterator, Receiver, Sender, WeakSender, +}; mod chan; From 242f171eae6fddb73f5af7b25d4455baea3149c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 00:03:25 +0000 Subject: [PATCH 06/41] mpsc: fix rustdoc --- tokio/src/sync/mpsc/bounded.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 7b950288690..f03df0f954f 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1075,14 +1075,13 @@ impl Sender { /// Tries to acquire n slot in the channel without waiting for the slot to become /// available. /// - /// If the channel is full this function will return [`TrySendError`], otherwise - /// if there is a slot available it will return a [`ManyPermit`] that will then allow you - /// to [`send`] on the channel with a guaranteed slot. A [`PermitIterator`] is returned to track - /// the reserved capacity. You can call this [`Iterator`] until it is exhausted to + /// If the channel is full this function will return a [`TrySendError`], otherwise + /// A [`PermitIterator`] is returned to track the reserved capacity. + /// You can call this [`Iterator`] until it is exhausted to /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to /// [`reserve_many`] except it does not await for the slot to become available. /// - /// Dropping [`ManyPermit`] without sending a message releases the capacity back + /// Dropping [`PermitIterator`] without sending a message releases the capacity back /// to the channel. /// /// [`PermitIterator`]: PermitIterator From 81c91da1c05917e62e3c9a715f24295560b670f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 00:04:23 +0000 Subject: [PATCH 07/41] mpsc: fix fmt --- tokio/src/sync/mpsc/bounded.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index f03df0f954f..9e0fc114309 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1076,8 +1076,8 @@ impl Sender { /// available. /// /// If the channel is full this function will return a [`TrySendError`], otherwise - /// A [`PermitIterator`] is returned to track the reserved capacity. - /// You can call this [`Iterator`] until it is exhausted to + /// A [`PermitIterator`] is returned to track the reserved capacity. + /// You can call this [`Iterator`] until it is exhausted to /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to /// [`reserve_many`] except it does not await for the slot to become available. /// From ad21c6cb264f3620dc59b5e1e68a551a197be3b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 10:26:34 +0000 Subject: [PATCH 08/41] mpsc: add an implementation for `ExactSizeIterator` --- tokio/src/sync/mpsc/bounded.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 9e0fc114309..27348ad4340 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1423,7 +1423,13 @@ impl<'a, T> Iterator for PermitIterator<'a, T> { self.n -= 1; Some(Permit { chan: self.chan }) } + + fn size_hint(&self) -> (usize, Option) { + let n = self.n as usize; + (n, Some(n)) + } } +impl ExactSizeIterator for PermitIterator<'_, T> {} impl Drop for PermitIterator<'_, T> { fn drop(&mut self) { From 71c48bae793ed430cc400bfa301e69045db1131f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 11:30:24 +0100 Subject: [PATCH 09/41] mpsc: apply doc suggestions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tymoteusz Wiśniewski --- tokio/src/sync/mpsc/bounded.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 27348ad4340..cf1fbb19d0e 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -68,7 +68,7 @@ pub struct Permit<'a, T> { chan: &'a chan::Tx, } -/// An [`Iterator`] of [`Permit`] that can be used to reserve `n` slots in the channel +/// An [`Iterator`] of [`Permit`] that can be used to reserve `n` slots in the channel. /// /// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`] /// and are used to guarantee channel capacity before generating `n` message to send. @@ -865,16 +865,16 @@ impl Sender { Ok(Permit { chan: &self.chan }) } - /// Waits for channel capacity. Once capacity to send `n` message is + /// Waits for channel capacity. Once capacity to send `n` messages is /// available, it is reserved for the caller. /// /// If the channel is full, the function waits for the number of unreceived - /// messages to become less than the channel capacity. Capacity to send `n` + /// messages to become `n` less than the channel capacity. Capacity to send `n` /// message is reserved for the caller. A [`PermitIterator`] is returned to track /// the reserved capacity. You can call this [`Iterator`] until it is exhausted to /// get a [`Permit`] and then call [`Permit::send`]. /// - /// Dropping [`PermitIterator`] without sending a message releases the capacity back + /// Dropping [`PermitIterator`] without sending all messages releases the capacity back /// to the channel. /// /// [`PermitIterator`]: PermitIterator From 8dd30784268865c061bfbab3fe72f0ff2cf094b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 11:43:24 +0100 Subject: [PATCH 10/41] mpsc: fix `Debug` implementation for `PermitIterator` Co-authored-by: Alice Ryhl --- tokio/src/sync/mpsc/bounded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index cf1fbb19d0e..aa2d187e585 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1450,7 +1450,7 @@ impl Drop for PermitIterator<'_, T> { impl fmt::Debug for PermitIterator<'_, T> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Permit") + fmt.debug_struct("PermitIterator") .field("chan", &self.chan) .field("capacity", &self.n) .finish() From 9d9435bffc5d185a91e7c98d178282091544187a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 11:04:44 +0000 Subject: [PATCH 11/41] mpsc: move the `u32` n to `usize` and only cast to `u32` for the internal `Semaphore` --- tokio/src/sync/mpsc/bounded.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index aa2d187e585..64e493db1a8 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -915,11 +915,11 @@ impl Sender { /// assert_eq!(rx.recv().await.unwrap(), 457); /// } /// ``` - pub async fn reserve_many(&self, n: u32) -> Result, SendError<()>> { + pub async fn reserve_many(&self, n: usize) -> Result, SendError<()>> { self.reserve_inner(n).await?; Ok(PermitIterator { chan: &self.chan, - n, + n: n as u32, }) } @@ -1010,10 +1010,10 @@ impl Sender { }) } - async fn reserve_inner(&self, n: u32) -> Result<(), SendError<()>> { + async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> { crate::trace::async_trace_leaf().await; - match self.chan.semaphore().semaphore.acquire(n).await { + match self.chan.semaphore().semaphore.acquire(n as u32).await { Ok(()) => Ok(()), Err(_) => Err(SendError(())), } @@ -1121,8 +1121,8 @@ impl Sender { /// /// } /// ``` - pub fn try_reserve_many(&self, n: u32) -> Result, TrySendError<()>> { - match self.chan.semaphore().semaphore.try_acquire(n) { + pub fn try_reserve_many(&self, n: usize) -> Result, TrySendError<()>> { + match self.chan.semaphore().semaphore.try_acquire(n as u32) { Ok(()) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())), Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())), @@ -1130,7 +1130,7 @@ impl Sender { Ok(PermitIterator { chan: &self.chan, - n, + n: n as u32, }) } From 28d8c512c066107d60dbdfa87ce65e302d9e1231 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 12:13:45 +0100 Subject: [PATCH 12/41] mpsc: improve doc Co-authored-by: oliver <151407407+kwfn@users.noreply.github.com> --- tokio/src/sync/mpsc/bounded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 64e493db1a8..221ac806886 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1072,7 +1072,7 @@ impl Sender { Ok(Permit { chan: &self.chan }) } - /// Tries to acquire n slot in the channel without waiting for the slot to become + /// Tries to acquire `n` slot in the channel without waiting for the slot to become /// available. /// /// If the channel is full this function will return a [`TrySendError`], otherwise From 08a86220bc08c3dbd3ec6fdabf59dfcb51f96cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 22:12:17 +0000 Subject: [PATCH 13/41] mpsc: change the internal `batch_semaphore` API to take usize permit count --- tokio/src/sync/batch_semaphore.rs | 18 +++++++++--------- tokio/src/sync/mpsc/bounded.rs | 12 ++++++------ tokio/src/sync/rwlock.rs | 4 ++-- tokio/src/sync/semaphore.rs | 4 ++-- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 35de9a57436..c1960a1c54b 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -71,7 +71,7 @@ pub struct AcquireError(()); pub(crate) struct Acquire<'a> { node: Waiter, semaphore: &'a Semaphore, - num_permits: u32, + num_permits: usize, queued: bool, } @@ -293,7 +293,7 @@ impl Semaphore { } } - pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> { + pub(crate) fn acquire(&self, num_permits: usize) -> Acquire<'_> { Acquire::new(self, num_permits) } @@ -371,7 +371,7 @@ impl Semaphore { fn poll_acquire( &self, cx: &mut Context<'_>, - num_permits: u32, + num_permits: usize, node: Pin<&mut Waiter>, queued: bool, ) -> Poll> { @@ -380,7 +380,7 @@ impl Semaphore { let needed = if queued { node.state.load(Acquire) << Self::PERMIT_SHIFT } else { - (num_permits as usize) << Self::PERMIT_SHIFT + num_permits << Self::PERMIT_SHIFT }; let mut lock = None; @@ -506,12 +506,12 @@ impl fmt::Debug for Semaphore { impl Waiter { fn new( - num_permits: u32, + num_permits: usize, #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx, ) -> Self { Waiter { waker: UnsafeCell::new(None), - state: AtomicUsize::new(num_permits as usize), + state: AtomicUsize::new(num_permits), pointers: linked_list::Pointers::new(), #[cfg(all(tokio_unstable, feature = "tracing"))] ctx, @@ -591,7 +591,7 @@ impl Future for Acquire<'_> { } impl<'a> Acquire<'a> { - fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self { + fn new(semaphore: &'a Semaphore, num_permits: usize) -> Self { #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] return Self { node: Waiter::new(num_permits), @@ -635,7 +635,7 @@ impl<'a> Acquire<'a> { }); } - fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) { + fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, usize, &mut bool) { fn is_unpin() {} unsafe { // Safety: all fields other than `node` are `Unpin` @@ -673,7 +673,7 @@ impl Drop for Acquire<'_> { // Safety: we have locked the wait list. unsafe { waiters.queue.remove(node) }; - let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire); + let acquired_permits = self.num_permits - self.node.state.load(Acquire); if acquired_permits > 0 { self.semaphore.add_permits_locked(acquired_permits, waiters); } diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 221ac806886..10258561849 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -77,7 +77,7 @@ pub struct Permit<'a, T> { /// [`Sender::try_reserve_many()`]: Sender::try_reserve_many pub struct PermitIterator<'a, T> { chan: &'a chan::Tx, - n: u32, + n: usize, } /// Owned permit to send one value into the channel. @@ -919,7 +919,7 @@ impl Sender { self.reserve_inner(n).await?; Ok(PermitIterator { chan: &self.chan, - n: n as u32, + n, }) } @@ -1013,7 +1013,7 @@ impl Sender { async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> { crate::trace::async_trace_leaf().await; - match self.chan.semaphore().semaphore.acquire(n as u32).await { + match self.chan.semaphore().semaphore.acquire(n).await { Ok(()) => Ok(()), Err(_) => Err(SendError(())), } @@ -1130,7 +1130,7 @@ impl Sender { Ok(PermitIterator { chan: &self.chan, - n: n as u32, + n, }) } @@ -1425,7 +1425,7 @@ impl<'a, T> Iterator for PermitIterator<'a, T> { } fn size_hint(&self) -> (usize, Option) { - let n = self.n as usize; + let n = self.n; (n, Some(n)) } } @@ -1438,7 +1438,7 @@ impl Drop for PermitIterator<'_, T> { let semaphore = self.chan.semaphore(); // Add the remaining permits back to the semaphore - semaphore.add_permits(self.n as usize); + semaphore.add_permits(self.n); // If this is the last sender for this channel, wake the receiver so // that it can be notified that the channel is closed. diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 877458a57fb..5495d4514ed 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -772,7 +772,7 @@ impl RwLock { /// ``` pub async fn write(&self) -> RwLockWriteGuard<'_, T> { let acquire_fut = async { - self.s.acquire(self.mr).await.unwrap_or_else(|_| { + self.s.acquire(self.mr as usize).await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() @@ -907,7 +907,7 @@ impl RwLock { let resource_span = self.resource_span.clone(); let acquire_fut = async { - self.s.acquire(self.mr).await.unwrap_or_else(|_| { + self.s.acquire(self.mr as usize).await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 8b8fdb23871..cecb3f4477d 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -574,7 +574,7 @@ impl Semaphore { .await?; #[cfg(not(all(tokio_unstable, feature = "tracing")))] - self.ll_sem.acquire(n).await?; + self.ll_sem.acquire(n as usize).await?; Ok(SemaphorePermit { sem: self, @@ -771,7 +771,7 @@ impl Semaphore { true, ); #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let inner = self.ll_sem.acquire(n); + let inner = self.ll_sem.acquire(n as usize); inner.await?; Ok(OwnedSemaphorePermit { From 5f9cb1aa8434d70b5db09bd8cd6ca6e235016291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 10 Dec 2023 22:18:28 +0000 Subject: [PATCH 14/41] mpsc: fix usize casting in special features --- tokio/src/sync/semaphore.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index cecb3f4477d..9feb953ea94 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -565,7 +565,7 @@ impl Semaphore { pub async fn acquire_many(&self, n: u32) -> Result, AcquireError> { #[cfg(all(tokio_unstable, feature = "tracing"))] trace::async_op( - || self.ll_sem.acquire(n), + || self.ll_sem.acquire(n as usize), self.resource_span.clone(), "Semaphore::acquire_many", "poll", @@ -764,7 +764,7 @@ impl Semaphore { ) -> Result { #[cfg(all(tokio_unstable, feature = "tracing"))] let inner = trace::async_op( - || self.ll_sem.acquire(n), + || self.ll_sem.acquire(n as usize), self.resource_span.clone(), "Semaphore::acquire_many_owned", "poll", From 5bcc3f9e436638b529cbd95d41f1df86a712a30b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Mon, 11 Dec 2023 11:46:23 +0000 Subject: [PATCH 15/41] mpsc: fix usize casting in `try_reserve` fn for batch_semaphore --- tokio/src/sync/batch_semaphore.rs | 8 ++++---- tokio/src/sync/mpsc/bounded.rs | 2 +- tokio/src/sync/rwlock.rs | 4 ++-- tokio/src/sync/semaphore.rs | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index c1960a1c54b..aa23dea7d3c 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -262,13 +262,13 @@ impl Semaphore { self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED } - pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> { + pub(crate) fn try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError> { assert!( - num_permits as usize <= Self::MAX_PERMITS, + num_permits <= Self::MAX_PERMITS, "a semaphore may not have more than MAX_PERMITS permits ({})", Self::MAX_PERMITS ); - let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT; + let num_permits = num_permits << Self::PERMIT_SHIFT; let mut curr = self.permits.load(Acquire); loop { // Has the semaphore closed? @@ -642,7 +642,7 @@ impl<'a> Acquire<'a> { is_unpin::<&Semaphore>(); is_unpin::<&mut bool>(); - is_unpin::(); + is_unpin::(); let this = self.get_unchecked_mut(); ( diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 10258561849..22285895a80 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1122,7 +1122,7 @@ impl Sender { /// } /// ``` pub fn try_reserve_many(&self, n: usize) -> Result, TrySendError<()>> { - match self.chan.semaphore().semaphore.try_acquire(n as u32) { + match self.chan.semaphore().semaphore.try_acquire(n) { Ok(()) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())), Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())), diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 5495d4514ed..37cf73c5905 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -971,7 +971,7 @@ impl RwLock { /// } /// ``` pub fn try_write(&self) -> Result, TryLockError> { - match self.s.try_acquire(self.mr) { + match self.s.try_acquire(self.mr as usize) { Ok(permit) => permit, Err(TryAcquireError::NoPermits) => return Err(TryLockError(())), Err(TryAcquireError::Closed) => unreachable!(), @@ -1029,7 +1029,7 @@ impl RwLock { /// } /// ``` pub fn try_write_owned(self: Arc) -> Result, TryLockError> { - match self.s.try_acquire(self.mr) { + match self.s.try_acquire(self.mr as usize) { Ok(permit) => permit, Err(TryAcquireError::NoPermits) => return Err(TryLockError(())), Err(TryAcquireError::Closed) => unreachable!(), diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 9feb953ea94..25e4134373c 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -646,7 +646,7 @@ impl Semaphore { /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub fn try_acquire_many(&self, n: u32) -> Result, TryAcquireError> { - match self.ll_sem.try_acquire(n) { + match self.ll_sem.try_acquire(n as usize) { Ok(()) => Ok(SemaphorePermit { sem: self, permits: n, @@ -855,7 +855,7 @@ impl Semaphore { self: Arc, n: u32, ) -> Result { - match self.ll_sem.try_acquire(n) { + match self.ll_sem.try_acquire(n as usize) { Ok(()) => Ok(OwnedSemaphorePermit { sem: self, permits: n, From a3fd29e48124a36c092a4957938a2abfc254829b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Mon, 11 Dec 2023 11:49:33 +0000 Subject: [PATCH 16/41] mpsc: return a `SendError` if `n` > MAX_PERMIT for `*reserve_many` fns --- tokio/src/sync/mpsc/bounded.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 22285895a80..accee2c9587 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1013,6 +1013,9 @@ impl Sender { async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> { crate::trace::async_trace_leaf().await; + if n > semaphore::Semaphore::MAX_PERMITS { + return Err(SendError(())); + } match self.chan.semaphore().semaphore.acquire(n).await { Ok(()) => Ok(()), Err(_) => Err(SendError(())), @@ -1122,6 +1125,10 @@ impl Sender { /// } /// ``` pub fn try_reserve_many(&self, n: usize) -> Result, TrySendError<()>> { + if n > semaphore::Semaphore::MAX_PERMITS { + return Err(TrySendError::Full(())); + } + match self.chan.semaphore().semaphore.try_acquire(n) { Ok(()) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())), From 419bfd36314d8cd284939e22824878d18ab3bbab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Mon, 11 Dec 2023 12:12:45 +0000 Subject: [PATCH 17/41] mpsc: return a `SendError` if `n` > max_capacity for `*reserve_many` fns --- tokio/src/sync/mpsc/bounded.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index accee2c9587..f4c4b4dc5a0 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1013,7 +1013,7 @@ impl Sender { async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> { crate::trace::async_trace_leaf().await; - if n > semaphore::Semaphore::MAX_PERMITS { + if n > self.max_capacity() { return Err(SendError(())); } match self.chan.semaphore().semaphore.acquire(n).await { @@ -1125,7 +1125,7 @@ impl Sender { /// } /// ``` pub fn try_reserve_many(&self, n: usize) -> Result, TrySendError<()>> { - if n > semaphore::Semaphore::MAX_PERMITS { + if n > self.max_capacity() { return Err(TrySendError::Full(())); } From 8f9d4c125d366ee54a6b174b633298363ba1334c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Wed, 13 Dec 2023 15:32:11 +0000 Subject: [PATCH 18/41] mpsc: add `try_reserve_many_fails` test --- tokio/tests/sync_mpsc.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index a5c15a4cfc6..5232a915d90 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -545,6 +545,36 @@ async fn try_reserve_fails() { let _permit = tx.try_reserve().unwrap(); } +#[maybe_tokio_test] +async fn try_reserve_many_fails() { + for i in 4..20 { + let (tx, mut rx) = mpsc::channel(i); + + let mut permit = tx.try_reserve_many(i - 2).unwrap(); + + // This should fail, there is only two remaining permits + match assert_err!(tx.try_reserve_many(3)) { + TrySendError::Full(()) => {} + _ => panic!(), + } + + permit.next().unwrap().send("foo"); + permit.next().unwrap().send("foo"); + + assert_eq!(rx.recv().await, Some("foo")); + assert_eq!(rx.recv().await, Some("foo")); + + // There are now 4 remaining permits because of the 2 sends/recv + let _permit = tx.try_reserve_many(4).unwrap(); + + // Dropping permit iterator releases the remaining slots. + drop(permit); + drop(_permit); + + let _permit = tx.try_reserve_many(i).unwrap(); + } +} + #[tokio::test] #[cfg(feature = "full")] async fn drop_permit_releases_permit() { From 59944ba11987a8111ec31ae09a1a5fc2afdff8d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Wed, 13 Dec 2023 15:48:03 +0000 Subject: [PATCH 19/41] mpsc: fix fmt --- tokio/tests/sync_mpsc.rs | 52 ++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 5232a915d90..a6203bb3bb7 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -547,32 +547,32 @@ async fn try_reserve_fails() { #[maybe_tokio_test] async fn try_reserve_many_fails() { - for i in 4..20 { - let (tx, mut rx) = mpsc::channel(i); - - let mut permit = tx.try_reserve_many(i - 2).unwrap(); - - // This should fail, there is only two remaining permits - match assert_err!(tx.try_reserve_many(3)) { - TrySendError::Full(()) => {} - _ => panic!(), - } - - permit.next().unwrap().send("foo"); - permit.next().unwrap().send("foo"); - - assert_eq!(rx.recv().await, Some("foo")); - assert_eq!(rx.recv().await, Some("foo")); - - // There are now 4 remaining permits because of the 2 sends/recv - let _permit = tx.try_reserve_many(4).unwrap(); - - // Dropping permit iterator releases the remaining slots. - drop(permit); - drop(_permit); - - let _permit = tx.try_reserve_many(i).unwrap(); - } + for i in 4..20 { + let (tx, mut rx) = mpsc::channel(i); + + let mut permit = tx.try_reserve_many(i - 2).unwrap(); + + // This should fail, there is only two remaining permits + match assert_err!(tx.try_reserve_many(3)) { + TrySendError::Full(()) => {} + _ => panic!(), + } + + permit.next().unwrap().send("foo"); + permit.next().unwrap().send("foo"); + + assert_eq!(rx.recv().await, Some("foo")); + assert_eq!(rx.recv().await, Some("foo")); + + // There are now 4 remaining permits because of the 2 sends/recv + let _permit = tx.try_reserve_many(4).unwrap(); + + // Dropping permit iterator releases the remaining slots. + drop(permit); + drop(_permit); + + let _permit = tx.try_reserve_many(i).unwrap(); + } } #[tokio::test] From bffa2fafd417cabb7636ebf343888252b66faf58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 17 Dec 2023 14:26:45 +0000 Subject: [PATCH 20/41] mpsc: switch to `assert_ok` expr for testing --- tokio/tests/sync_mpsc.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index a6203bb3bb7..f454c1f18de 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -550,7 +550,7 @@ async fn try_reserve_many_fails() { for i in 4..20 { let (tx, mut rx) = mpsc::channel(i); - let mut permit = tx.try_reserve_many(i - 2).unwrap(); + let mut permit = assert_ok!(tx.try_reserve_many(i - 2)); // This should fail, there is only two remaining permits match assert_err!(tx.try_reserve_many(3)) { @@ -565,13 +565,13 @@ async fn try_reserve_many_fails() { assert_eq!(rx.recv().await, Some("foo")); // There are now 4 remaining permits because of the 2 sends/recv - let _permit = tx.try_reserve_many(4).unwrap(); + let _permit = assert_ok!(tx.try_reserve_many(4)); // Dropping permit iterator releases the remaining slots. drop(permit); drop(_permit); - let _permit = tx.try_reserve_many(i).unwrap(); + let _permit = assert_ok!(tx.try_reserve_many(i)); } } From d9bf13433c4dc72b5a492f8c9afab89c13ccc9a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 17 Dec 2023 14:46:01 +0000 Subject: [PATCH 21/41] mpsc: return an empty iterator for `try_reserve_many(0)` --- tokio/src/sync/mpsc/bounded.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index f4c4b4dc5a0..dd2364c65d3 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1121,13 +1121,27 @@ impl Sender { /// // The value sent on the permit is received /// assert_eq!(rx.recv().await.unwrap(), 456); /// assert_eq!(rx.recv().await.unwrap(), 457); + /// + /// // Trying to call try_reserve_many with 0 will return an empty iterator + /// let mut permit = tx.try_reserve_many(0).unwrap(); + /// assert!(permit.next().is_none()); /// + /// // Trying to call try_reserve_many with a number greater than the channel + /// // capacity will return an error + /// let permit = tx.try_reserve_many(3); + /// assert!(permit.is_err()); /// } /// ``` pub fn try_reserve_many(&self, n: usize) -> Result, TrySendError<()>> { if n > self.max_capacity() { return Err(TrySendError::Full(())); } + if n == 0 { + return Ok(PermitIterator { + chan: &self.chan, + n, + }); + } match self.chan.semaphore().semaphore.try_acquire(n) { Ok(()) => {} From 2ee4f7668ea141da1c321881128f451cc859b44b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 17 Dec 2023 15:21:59 +0000 Subject: [PATCH 22/41] mpsc: test `PermitIterator` and `reserve_many` --- tokio/tests/sync_mpsc.rs | 56 ++++++++++++++++++++++++++++++++++------ 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f454c1f18de..12e0e9d9adb 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -550,7 +550,7 @@ async fn try_reserve_many_fails() { for i in 4..20 { let (tx, mut rx) = mpsc::channel(i); - let mut permit = assert_ok!(tx.try_reserve_many(i - 2)); + let mut permit1 = assert_ok!(tx.try_reserve_many(i - 2)); // This should fail, there is only two remaining permits match assert_err!(tx.try_reserve_many(3)) { @@ -558,23 +558,43 @@ async fn try_reserve_many_fails() { _ => panic!(), } - permit.next().unwrap().send("foo"); - permit.next().unwrap().send("foo"); + permit1.next().unwrap().send("foo"); + permit1.next().unwrap().send("foo"); assert_eq!(rx.recv().await, Some("foo")); assert_eq!(rx.recv().await, Some("foo")); // There are now 4 remaining permits because of the 2 sends/recv - let _permit = assert_ok!(tx.try_reserve_many(4)); + let permit2 = assert_ok!(tx.try_reserve_many(4)); // Dropping permit iterator releases the remaining slots. - drop(permit); - drop(_permit); - - let _permit = assert_ok!(tx.try_reserve_many(i)); + drop(permit1); + drop(permit2); + + // It is possible to reserve all the permits + assert_ok!(tx.try_reserve_many(i)); + + // This should fail because the channel is closed + drop(rx); + match assert_err!(tx.try_reserve_many(3)) { + TrySendError::Closed(()) => {} + _ => panic!(), + }; } } +#[maybe_tokio_test] +async fn reserve_many_and_send() { + let (tx, mut rx) = mpsc::channel(100); + for i in 1..100 { + for permit in assert_ok!(tx.reserve_many(i).await) { + permit.send("foo"); + assert_eq!(rx.recv().await, Some("foo")); + } + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } +} + #[tokio::test] #[cfg(feature = "full")] async fn drop_permit_releases_permit() { @@ -594,6 +614,25 @@ async fn drop_permit_releases_permit() { assert_ready_ok!(reserve2.poll()); } +#[tokio::test] +#[cfg(feature = "full")] +async fn drop_permit_iterator_releases_permits() { + // poll_ready reserves capacity, ensure that the capacity is released if tx + // is dropped w/o sending a value. + let (tx1, _rx) = mpsc::channel::(10); + let tx2 = tx1.clone(); + + let permits = assert_ok!(tx1.reserve_many(10).await); + + let mut reserve2 = tokio_test::task::spawn(tx2.reserve_many(10)); + assert_pending!(reserve2.poll()); + + drop(permits); + + assert!(reserve2.is_woken()); + assert_ready_ok!(reserve2.poll()); +} + #[maybe_tokio_test] async fn dropping_rx_closes_channel() { let (tx, rx) = mpsc::channel(100); @@ -603,6 +642,7 @@ async fn dropping_rx_closes_channel() { drop(rx); assert_err!(tx.reserve().await); + assert_err!(tx.reserve_many(10).await); assert_eq!(1, Arc::strong_count(&msg)); } From 6a1ffbfbd31d56ed55724be677b8cee100aeb7a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sun, 17 Dec 2023 15:22:16 +0000 Subject: [PATCH 23/41] mpsc: fix fmt --- tokio/tests/sync_mpsc.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 12e0e9d9adb..99bd938d9e7 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -571,11 +571,11 @@ async fn try_reserve_many_fails() { drop(permit1); drop(permit2); - // It is possible to reserve all the permits + // It is possible to reserve all the permits assert_ok!(tx.try_reserve_many(i)); - + // This should fail because the channel is closed - drop(rx); + drop(rx); match assert_err!(tx.try_reserve_many(3)) { TrySendError::Closed(()) => {} _ => panic!(), @@ -585,14 +585,14 @@ async fn try_reserve_many_fails() { #[maybe_tokio_test] async fn reserve_many_and_send() { - let (tx, mut rx) = mpsc::channel(100); - for i in 1..100 { - for permit in assert_ok!(tx.reserve_many(i).await) { - permit.send("foo"); - assert_eq!(rx.recv().await, Some("foo")); - } - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - } + let (tx, mut rx) = mpsc::channel(100); + for i in 1..100 { + for permit in assert_ok!(tx.reserve_many(i).await) { + permit.send("foo"); + assert_eq!(rx.recv().await, Some("foo")); + } + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } } #[tokio::test] @@ -642,7 +642,7 @@ async fn dropping_rx_closes_channel() { drop(rx); assert_err!(tx.reserve().await); - assert_err!(tx.reserve_many(10).await); + assert_err!(tx.reserve_many(10).await); assert_eq!(1, Arc::strong_count(&msg)); } From 323e6b6b36a2b34203f3b0960c077b55bee31cc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Fri, 22 Dec 2023 18:24:41 +0000 Subject: [PATCH 24/41] mpsc: fix `reserve_many_and_send` test --- tokio/tests/sync_mpsc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 99bd938d9e7..9b988ac47dd 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -586,7 +586,7 @@ async fn try_reserve_many_fails() { #[maybe_tokio_test] async fn reserve_many_and_send() { let (tx, mut rx) = mpsc::channel(100); - for i in 1..100 { + for i in 0..100 { for permit in assert_ok!(tx.reserve_many(i).await) { permit.send("foo"); assert_eq!(rx.recv().await, Some("foo")); From 5496399996d193b81b69e296bd4cf0dd7995f603 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Fri, 22 Dec 2023 18:26:51 +0000 Subject: [PATCH 25/41] mpsc: impl `FusedIterator` for `PermitIterator` --- tokio/src/sync/mpsc/bounded.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index dd2364c65d3..210b0a7e8e6 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1451,6 +1451,7 @@ impl<'a, T> Iterator for PermitIterator<'a, T> { } } impl ExactSizeIterator for PermitIterator<'_, T> {} +impl FusedIterator for PermitIterator<'_, T> {} impl Drop for PermitIterator<'_, T> { fn drop(&mut self) { From ddd44d17f10c5d7e0d628541d0e9c6ed1c388dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 11:09:15 +0000 Subject: [PATCH 26/41] mpsc: test `reserve_many_on_closed_channel` --- tokio/src/sync/mpsc/bounded.rs | 2 +- tokio/tests/sync_mpsc.rs | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 210b0a7e8e6..0456dc0d2f5 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1136,7 +1136,7 @@ impl Sender { if n > self.max_capacity() { return Err(TrySendError::Full(())); } - if n == 0 { + if n == 0 && !self.is_closed() { return Ok(PermitIterator { chan: &self.chan, n, diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 9b988ac47dd..5a879a0d774 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -595,6 +595,25 @@ async fn reserve_many_and_send() { } } +#[maybe_tokio_test] +async fn reserve_many_on_closed_channel() { + let (tx, rx) = mpsc::channel::<()>(100); + drop(rx); + assert_err!(tx.reserve_many(10).await); + match assert_err!(tx.try_reserve_many(10)) { + TrySendError::Closed(()) => {} + _ => panic!(), + }; + + let (tx, rx) = mpsc::channel::<()>(100); + drop(rx); + assert_err!(tx.reserve_many(0).await); + match assert_err!(tx.try_reserve_many(0)) { + TrySendError::Closed(()) => {} + _ => panic!(), + }; +} + #[tokio::test] #[cfg(feature = "full")] async fn drop_permit_releases_permit() { From f103641684a4e0eb7c4b189fca40d54dc3a44774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 11:10:11 +0000 Subject: [PATCH 27/41] mpsc: doc mention `reserve_many` for Cancel Safety part --- tokio/src/sync/mpsc/bounded.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 0456dc0d2f5..c17686a315a 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -883,9 +883,9 @@ impl Sender { /// /// # Cancel safety /// - /// This channel uses a queue to ensure that calls to `send` and `reserve` + /// This channel uses a queue to ensure that calls to `send` and `reserve_many` /// complete in the order they were requested. Cancelling a call to - /// `reserve` makes you lose your place in the queue. + /// `reserve_many` makes you lose your place in the queue. /// /// # Examples /// From b688a05a9b60d97bf2ba3026cd91bc3b3d7db66e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 11:13:39 +0000 Subject: [PATCH 28/41] mpsc: improve doc for `reserve_many` fn --- tokio/src/sync/mpsc/bounded.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index c17686a315a..09f491def14 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -873,6 +873,8 @@ impl Sender { /// message is reserved for the caller. A [`PermitIterator`] is returned to track /// the reserved capacity. You can call this [`Iterator`] until it is exhausted to /// get a [`Permit`] and then call [`Permit::send`]. + /// + /// If the channel is closed, the function returns a [`SendError`]. /// /// Dropping [`PermitIterator`] without sending all messages releases the capacity back /// to the channel. @@ -1130,6 +1132,14 @@ impl Sender { /// // capacity will return an error /// let permit = tx.try_reserve_many(3); /// assert!(permit.is_err()); + /// + /// // Trying to call try_reserve_many on a closed channel will return an error + /// drop(rx); + /// let permit = tx.try_reserve_many(1); + /// assert!(permit.is_err()); + /// + /// let permit = tx.try_reserve_many(0); + /// assert!(permit.is_err()); /// } /// ``` pub fn try_reserve_many(&self, n: usize) -> Result, TrySendError<()>> { @@ -1451,7 +1461,7 @@ impl<'a, T> Iterator for PermitIterator<'a, T> { } } impl ExactSizeIterator for PermitIterator<'_, T> {} -impl FusedIterator for PermitIterator<'_, T> {} +impl std::iter::FusedIterator for PermitIterator<'_, T> {} impl Drop for PermitIterator<'_, T> { fn drop(&mut self) { From 3b705be1ee67eda78995b4303b60d49e991e8d77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 11:14:02 +0000 Subject: [PATCH 29/41] mpsc: fix formatting --- tokio/src/sync/mpsc/bounded.rs | 20 ++++++++++---------- tokio/tests/sync_mpsc.rs | 30 +++++++++++++++--------------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 09f491def14..da89ee80116 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -873,8 +873,8 @@ impl Sender { /// message is reserved for the caller. A [`PermitIterator`] is returned to track /// the reserved capacity. You can call this [`Iterator`] until it is exhausted to /// get a [`Permit`] and then call [`Permit::send`]. - /// - /// If the channel is closed, the function returns a [`SendError`]. + /// + /// If the channel is closed, the function returns a [`SendError`]. /// /// Dropping [`PermitIterator`] without sending all messages releases the capacity back /// to the channel. @@ -1132,14 +1132,14 @@ impl Sender { /// // capacity will return an error /// let permit = tx.try_reserve_many(3); /// assert!(permit.is_err()); - /// - /// // Trying to call try_reserve_many on a closed channel will return an error - /// drop(rx); - /// let permit = tx.try_reserve_many(1); - /// assert!(permit.is_err()); - /// - /// let permit = tx.try_reserve_many(0); - /// assert!(permit.is_err()); + /// + /// // Trying to call try_reserve_many on a closed channel will return an error + /// drop(rx); + /// let permit = tx.try_reserve_many(1); + /// assert!(permit.is_err()); + /// + /// let permit = tx.try_reserve_many(0); + /// assert!(permit.is_err()); /// } /// ``` pub fn try_reserve_many(&self, n: usize) -> Result, TrySendError<()>> { diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 5a879a0d774..22cb6cb1843 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -597,21 +597,21 @@ async fn reserve_many_and_send() { #[maybe_tokio_test] async fn reserve_many_on_closed_channel() { - let (tx, rx) = mpsc::channel::<()>(100); - drop(rx); - assert_err!(tx.reserve_many(10).await); - match assert_err!(tx.try_reserve_many(10)) { - TrySendError::Closed(()) => {} - _ => panic!(), - }; - - let (tx, rx) = mpsc::channel::<()>(100); - drop(rx); - assert_err!(tx.reserve_many(0).await); - match assert_err!(tx.try_reserve_many(0)) { - TrySendError::Closed(()) => {} - _ => panic!(), - }; + let (tx, rx) = mpsc::channel::<()>(100); + drop(rx); + assert_err!(tx.reserve_many(10).await); + match assert_err!(tx.try_reserve_many(10)) { + TrySendError::Closed(()) => {} + _ => panic!(), + }; + + let (tx, rx) = mpsc::channel::<()>(100); + drop(rx); + assert_err!(tx.reserve_many(0).await); + match assert_err!(tx.try_reserve_many(0)) { + TrySendError::Closed(()) => {} + _ => panic!(), + }; } #[tokio::test] From 430286b893fce482fc61b544c1123bdae66cad24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 11:19:27 +0000 Subject: [PATCH 30/41] mpsc: fix formatting --- tokio/src/sync/mpsc/bounded.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index da89ee80116..946be5b22f2 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1133,13 +1133,13 @@ impl Sender { /// let permit = tx.try_reserve_many(3); /// assert!(permit.is_err()); /// - /// // Trying to call try_reserve_many on a closed channel will return an error - /// drop(rx); - /// let permit = tx.try_reserve_many(1); - /// assert!(permit.is_err()); + /// // Trying to call try_reserve_many on a closed channel will return an error + /// drop(rx); + /// let permit = tx.try_reserve_many(1); + /// assert!(permit.is_err()); /// - /// let permit = tx.try_reserve_many(0); - /// assert!(permit.is_err()); + /// let permit = tx.try_reserve_many(0); + /// assert!(permit.is_err()); /// } /// ``` pub fn try_reserve_many(&self, n: usize) -> Result, TrySendError<()>> { From 58f764830b57d483df599ab39e1a0c0287c6c20b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 11:38:27 +0000 Subject: [PATCH 31/41] mpsc: switch to `maybe_tokio_test` --- tokio/tests/sync_mpsc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 22cb6cb1843..0c55b9c50b1 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -633,7 +633,7 @@ async fn drop_permit_releases_permit() { assert_ready_ok!(reserve2.poll()); } -#[tokio::test] +#[maybe_tokio_test] #[cfg(feature = "full")] async fn drop_permit_iterator_releases_permits() { // poll_ready reserves capacity, ensure that the capacity is released if tx From a25fb7c83063a9c87d4a638d7d373313ead30dbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 14:50:05 +0000 Subject: [PATCH 32/41] mpsc: add tests for `try_reserve_many` --- tokio/tests/sync_mpsc.rs | 168 +++++++++++++++++++++++++++------------ 1 file changed, 116 insertions(+), 52 deletions(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 0c55b9c50b1..09f7cbf6af5 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -522,6 +522,53 @@ async fn try_send_fail_with_try_recv() { assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); } +#[maybe_tokio_test] +async fn reserve_many_edge_cases() { + // MAX_PERMITS as defined in `sync/batch_semaphore.rs` + const MAX_PERMITS: usize = std::usize::MAX >> 3; + + let (tx, rx) = mpsc::channel::<()>(1); + let mut permit = assert_ok!(tx.reserve_many(0).await); + assert!(permit.next().is_none()); + + assert_err!(tx.reserve_many(MAX_PERMITS + 1).await); + assert_err!(tx.reserve_many(usize::MAX).await); + + // Dropping the receiver should close the channel + drop(rx); + match assert_err!(tx.try_reserve_many(0)) { + TrySendError::Closed(()) => {} + _ => panic!(), + }; +} + +#[maybe_tokio_test] +async fn try_reserve_many_edge_cases() { + // MAX_PERMITS as defined in `sync/batch_semaphore.rs` + const MAX_PERMITS: usize = std::usize::MAX >> 3; + + let (tx, rx) = mpsc::channel::<()>(1); + + let mut permit = assert_ok!(tx.try_reserve_many(0)); + assert!(permit.next().is_none()); + + let permit = tx.try_reserve_many(MAX_PERMITS + 1); + match assert_err!(permit) { + TrySendError::Full(..) => {} + _ => panic!(), + } + + let permit = tx.try_reserve_many(usize::MAX); + match assert_err!(permit) { + TrySendError::Full(..) => {} + _ => panic!(), + } + + // Dropping the receiver should close the channel + drop(rx); + assert_err!(tx.reserve_many(0).await); +} + #[maybe_tokio_test] async fn try_reserve_fails() { let (tx, mut rx) = mpsc::channel(1); @@ -546,48 +593,21 @@ async fn try_reserve_fails() { } #[maybe_tokio_test] -async fn try_reserve_many_fails() { - for i in 4..20 { - let (tx, mut rx) = mpsc::channel(i); - - let mut permit1 = assert_ok!(tx.try_reserve_many(i - 2)); - - // This should fail, there is only two remaining permits - match assert_err!(tx.try_reserve_many(3)) { - TrySendError::Full(()) => {} - _ => panic!(), +async fn reserve_many_and_send() { + let (tx, mut rx) = mpsc::channel(100); + for i in 0..100 { + for permit in assert_ok!(tx.reserve_many(i).await) { + permit.send("foo"); + assert_eq!(rx.recv().await, Some("foo")); } - - permit1.next().unwrap().send("foo"); - permit1.next().unwrap().send("foo"); - - assert_eq!(rx.recv().await, Some("foo")); - assert_eq!(rx.recv().await, Some("foo")); - - // There are now 4 remaining permits because of the 2 sends/recv - let permit2 = assert_ok!(tx.try_reserve_many(4)); - - // Dropping permit iterator releases the remaining slots. - drop(permit1); - drop(permit2); - - // It is possible to reserve all the permits - assert_ok!(tx.try_reserve_many(i)); - - // This should fail because the channel is closed - drop(rx); - match assert_err!(tx.try_reserve_many(3)) { - TrySendError::Closed(()) => {} - _ => panic!(), - }; + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); } } - #[maybe_tokio_test] -async fn reserve_many_and_send() { +async fn try_reserve_many_and_send() { let (tx, mut rx) = mpsc::channel(100); for i in 0..100 { - for permit in assert_ok!(tx.reserve_many(i).await) { + for permit in assert_ok!(tx.try_reserve_many(i)) { permit.send("foo"); assert_eq!(rx.recv().await, Some("foo")); } @@ -600,20 +620,59 @@ async fn reserve_many_on_closed_channel() { let (tx, rx) = mpsc::channel::<()>(100); drop(rx); assert_err!(tx.reserve_many(10).await); - match assert_err!(tx.try_reserve_many(10)) { - TrySendError::Closed(()) => {} - _ => panic!(), - }; +} - let (tx, rx) = mpsc::channel::<()>(100); +#[maybe_tokio_test] +async fn try_reserve_many_on_closed_channel() { + let (tx, rx) = mpsc::channel::(100); drop(rx); - assert_err!(tx.reserve_many(0).await); - match assert_err!(tx.try_reserve_many(0)) { + match assert_err!(tx.try_reserve_many(10)) { TrySendError::Closed(()) => {} _ => panic!(), }; } +#[maybe_tokio_test] +async fn try_reserve_many_full() { + // Reserve n capacity and send k messages + for n in 1..100 { + for k in 0..n { + let (tx, mut rx) = mpsc::channel::(n); + let permits = assert_ok!(tx.try_reserve_many(n)); + + assert_eq!(permits.len(), n); + assert_eq!(tx.capacity(), 0); + + match assert_err!(tx.try_reserve_many(1)) { + TrySendError::Full(..) => {} + _ => panic!(), + }; + + for permit in permits.take(k) { + permit.send(0); + } + // We only used k permits on the n reserved + assert_eq!(tx.capacity(), n - k); + + // We can reserve more permits + assert_ok!(tx.try_reserve_many(1)); + + // But not more than the current capacity + match assert_err!(tx.try_reserve_many(n - k + 1)) { + TrySendError::Full(..) => {} + _ => panic!(), + }; + + for _i in 0..k { + assert_eq!(rx.recv().await, Some(0)); + } + + // Now that we've received everything, capacity should be back to n + assert_eq!(tx.capacity(), n); + } + } +} + #[tokio::test] #[cfg(feature = "full")] async fn drop_permit_releases_permit() { @@ -634,22 +693,27 @@ async fn drop_permit_releases_permit() { } #[maybe_tokio_test] -#[cfg(feature = "full")] async fn drop_permit_iterator_releases_permits() { // poll_ready reserves capacity, ensure that the capacity is released if tx // is dropped w/o sending a value. - let (tx1, _rx) = mpsc::channel::(10); - let tx2 = tx1.clone(); + for n in 1..100 { + let (tx1, _rx) = mpsc::channel::(n); + let tx2 = tx1.clone(); - let permits = assert_ok!(tx1.reserve_many(10).await); + let permits = assert_ok!(tx1.reserve_many(n).await); - let mut reserve2 = tokio_test::task::spawn(tx2.reserve_many(10)); - assert_pending!(reserve2.poll()); + let mut reserve2 = tokio_test::task::spawn(tx2.reserve_many(n)); + assert_pending!(reserve2.poll()); - drop(permits); + drop(permits); - assert!(reserve2.is_woken()); - assert_ready_ok!(reserve2.poll()); + assert!(reserve2.is_woken()); + + let permits = assert_ready_ok!(reserve2.poll()); + drop(permits); + + assert_eq!(tx1.capacity(), n); + } } #[maybe_tokio_test] From bd8c3e6832ee724efffb841b51df19d9dfbedec9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 15:18:27 +0000 Subject: [PATCH 33/41] mpsc: add an early return if `n == 0` to avoid `acquire` mechanism --- tokio/src/sync/mpsc/bounded.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 946be5b22f2..bc8499027cb 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1015,6 +1015,9 @@ impl Sender { async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> { crate::trace::async_trace_leaf().await; + if n == 0 { + return Ok(()); + } if n > self.max_capacity() { return Err(SendError(())); } From 6988079c6edabec05a99fb27b63cdcb6c5db390f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 16:01:36 +0000 Subject: [PATCH 34/41] mpsc: improve doc for `reserve_many` --- tokio/src/sync/mpsc/bounded.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index bc8499027cb..dfdbdc007d9 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -868,20 +868,24 @@ impl Sender { /// Waits for channel capacity. Once capacity to send `n` messages is /// available, it is reserved for the caller. /// - /// If the channel is full, the function waits for the number of unreceived - /// messages to become `n` less than the channel capacity. Capacity to send `n` - /// message is reserved for the caller. A [`PermitIterator`] is returned to track - /// the reserved capacity. You can call this [`Iterator`] until it is exhausted to - /// get a [`Permit`] and then call [`Permit::send`]. + /// If the channel is full or if there are fewer than `n` permits available, the function waits + /// for the number of unreceived messages to become `n` less than the channel capacity. + /// Capacity to send `n` message is then reserved for the caller. + /// + /// A [`PermitIterator`] is returned to track the reserved capacity. + /// You can call this [`Iterator`] until it is exhausted to + /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to + /// [`try_reserve_many`] except it awaits for the slots to become available. /// /// If the channel is closed, the function returns a [`SendError`]. /// - /// Dropping [`PermitIterator`] without sending all messages releases the capacity back - /// to the channel. + /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining + /// permits back to the channel. /// /// [`PermitIterator`]: PermitIterator /// [`Permit`]: Permit /// [`send`]: Permit::send + /// [`try_reserve_many`]: Sender::try_reserve_many /// /// # Cancel safety /// @@ -1080,17 +1084,20 @@ impl Sender { Ok(Permit { chan: &self.chan }) } - /// Tries to acquire `n` slot in the channel without waiting for the slot to become + /// Tries to acquire `n` slots in the channel without waiting for the slot to become /// available. /// - /// If the channel is full this function will return a [`TrySendError`], otherwise /// A [`PermitIterator`] is returned to track the reserved capacity. /// You can call this [`Iterator`] until it is exhausted to /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to - /// [`reserve_many`] except it does not await for the slot to become available. + /// [`reserve_many`] except it does not await for the slots to become available. /// - /// Dropping [`PermitIterator`] without sending a message releases the capacity back - /// to the channel. + /// If the channel is full or if there are fewer than `n` permits available + /// this function will return a [`TrySendError::Full`]. If the channel is closed + /// this function will return a [`TrySendError::Closed`]. + /// + /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining + /// permits back to the channel. /// /// [`PermitIterator`]: PermitIterator /// [`send`]: Permit::send From 133f2c1493bc0fe4daf21da7530ff8ba565b5bc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 23 Dec 2023 16:08:20 +0000 Subject: [PATCH 35/41] mpsc: remove early return for `reserve_inner` --- tokio/src/sync/mpsc/bounded.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index dfdbdc007d9..5e6482402ac 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1019,9 +1019,6 @@ impl Sender { async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> { crate::trace::async_trace_leaf().await; - if n == 0 { - return Ok(()); - } if n > self.max_capacity() { return Err(SendError(())); } From c39878ea342d4e0b405eea9f0f35fdb5e372e20e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Tue, 2 Jan 2024 13:35:00 +0000 Subject: [PATCH 36/41] mpsc: remove useless empty iterator guard for `try_reserve_many` --- tokio/src/sync/mpsc/bounded.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 5e6482402ac..71df7580f89 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1153,12 +1153,6 @@ impl Sender { if n > self.max_capacity() { return Err(TrySendError::Full(())); } - if n == 0 && !self.is_closed() { - return Ok(PermitIterator { - chan: &self.chan, - n, - }); - } match self.chan.semaphore().semaphore.try_acquire(n) { Ok(()) => {} From f7025ce7e340e51881fadb0f9f4732f56ba6d76c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Tue, 2 Jan 2024 13:35:04 +0000 Subject: [PATCH 37/41] mpsc: apply doc suggestion --- tokio/src/sync/mpsc/bounded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 71df7580f89..c2941152d51 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -68,7 +68,7 @@ pub struct Permit<'a, T> { chan: &'a chan::Tx, } -/// An [`Iterator`] of [`Permit`] that can be used to reserve `n` slots in the channel. +/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel. /// /// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`] /// and are used to guarantee channel capacity before generating `n` message to send. From 0d19901c76977faa312096bbe688a76a1bcd2d0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Tue, 2 Jan 2024 14:36:55 +0100 Subject: [PATCH 38/41] mpsc: Apply suggestions from code review Co-authored-by: Alice Ryhl --- tokio/src/sync/mpsc/bounded.rs | 4 +-- tokio/tests/sync_mpsc.rs | 47 ++++++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index c2941152d51..04bdb6f0a55 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -71,7 +71,7 @@ pub struct Permit<'a, T> { /// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel. /// /// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`] -/// and are used to guarantee channel capacity before generating `n` message to send. +/// and are used to guarantee channel capacity before generating `n` messages to send. /// /// [`Sender::reserve_many()`]: Sender::reserve_many /// [`Sender::try_reserve_many()`]: Sender::try_reserve_many @@ -1089,7 +1089,7 @@ impl Sender { /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to /// [`reserve_many`] except it does not await for the slots to become available. /// - /// If the channel is full or if there are fewer than `n` permits available + /// If there are fewer than `n` permits available on the channel, then /// this function will return a [`TrySendError::Full`]. If the channel is closed /// this function will return a [`TrySendError::Closed`]. /// diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 09f7cbf6af5..9f960ca8958 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -522,24 +522,49 @@ async fn try_send_fail_with_try_recv() { assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); } -#[maybe_tokio_test] -async fn reserve_many_edge_cases() { - // MAX_PERMITS as defined in `sync/batch_semaphore.rs` - const MAX_PERMITS: usize = std::usize::MAX >> 3; +use tokio::sync::Semaphore::MAX_PERMITS; +#[maybe_tokio_test] +async fn reserve_many_above_cap() { let (tx, rx) = mpsc::channel::<()>(1); - let mut permit = assert_ok!(tx.reserve_many(0).await); - assert!(permit.next().is_none()); + assert_err!(tx.reserve_many(2).await); assert_err!(tx.reserve_many(MAX_PERMITS + 1).await); assert_err!(tx.reserve_many(usize::MAX).await); +} + +#[test] +fn try_reserve_many_zero() { + let (tx, rx) = mpsc::channel::<()>(1); + + // Succeeds when not closed. + assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none()); + + // Even when channel is full. + tx.send(()).unwrap(); + assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none()); - // Dropping the receiver should close the channel drop(rx); - match assert_err!(tx.try_reserve_many(0)) { - TrySendError::Closed(()) => {} - _ => panic!(), - }; + + // Closed error when closed. + assert!(tx.try_reserve_many(0) == TrySendError::Closed(())); +} + +#[maybe_tokio_test] +async fn reserve_many_zero() { + let (tx, rx) = mpsc::channel::<()>(1); + + // Succeeds when not closed. + assert!(assert_ok!(tx.reserve_many(0).await).next().is_none()); + + // Even when channel is full. + tx.send(()).unwrap(); + assert!(assert_ok!(tx.reserve_many(0).await).next().is_none()); + + drop(rx); + + // Closed error when closed. + assert!(tx.reserve_many(0).await == TrySendError::Closed(())); } #[maybe_tokio_test] From 3fdb5d9b4ac6f4e2fff883ccbd5d179d0e984b06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Tue, 2 Jan 2024 13:52:20 +0000 Subject: [PATCH 39/41] mpsc: fix `sync_mpsc` tests --- tokio/tests/sync_mpsc.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 9f960ca8958..95b04c13c18 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -522,11 +522,11 @@ async fn try_send_fail_with_try_recv() { assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); } -use tokio::sync::Semaphore::MAX_PERMITS; #[maybe_tokio_test] async fn reserve_many_above_cap() { - let (tx, rx) = mpsc::channel::<()>(1); + const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS; + let (tx, _rx) = mpsc::channel::<()>(1); assert_err!(tx.reserve_many(2).await); assert_err!(tx.reserve_many(MAX_PERMITS + 1).await); @@ -541,13 +541,13 @@ fn try_reserve_many_zero() { assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none()); // Even when channel is full. - tx.send(()).unwrap(); + tx.try_send(()).unwrap(); assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none()); drop(rx); // Closed error when closed. - assert!(tx.try_reserve_many(0) == TrySendError::Closed(())); + assert_eq!(assert_err!(tx.try_reserve_many(0)), TrySendError::Closed(())); } #[maybe_tokio_test] @@ -558,19 +558,18 @@ async fn reserve_many_zero() { assert!(assert_ok!(tx.reserve_many(0).await).next().is_none()); // Even when channel is full. - tx.send(()).unwrap(); + tx.send(()).await.unwrap(); assert!(assert_ok!(tx.reserve_many(0).await).next().is_none()); drop(rx); // Closed error when closed. - assert!(tx.reserve_many(0).await == TrySendError::Closed(())); + assert_err!(tx.reserve_many(0).await); } #[maybe_tokio_test] async fn try_reserve_many_edge_cases() { - // MAX_PERMITS as defined in `sync/batch_semaphore.rs` - const MAX_PERMITS: usize = std::usize::MAX >> 3; + const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS; let (tx, rx) = mpsc::channel::<()>(1); From e5f1df8123544ae970307634b0dffa644957481d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Tue, 2 Jan 2024 13:54:17 +0000 Subject: [PATCH 40/41] mpsc: fix `sync_mpsc` tests --- tokio/tests/sync_mpsc.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 95b04c13c18..1b581ce98c1 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -522,10 +522,9 @@ async fn try_send_fail_with_try_recv() { assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); } - #[maybe_tokio_test] async fn reserve_many_above_cap() { - const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS; + const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS; let (tx, _rx) = mpsc::channel::<()>(1); assert_err!(tx.reserve_many(2).await); @@ -547,7 +546,10 @@ fn try_reserve_many_zero() { drop(rx); // Closed error when closed. - assert_eq!(assert_err!(tx.try_reserve_many(0)), TrySendError::Closed(())); + assert_eq!( + assert_err!(tx.try_reserve_many(0)), + TrySendError::Closed(()) + ); } #[maybe_tokio_test] @@ -569,7 +571,7 @@ async fn reserve_many_zero() { #[maybe_tokio_test] async fn try_reserve_many_edge_cases() { - const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS; + const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS; let (tx, rx) = mpsc::channel::<()>(1); From ec8b53758c572437cb8d36f94c53a762e681f5f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Tue, 2 Jan 2024 16:23:03 +0000 Subject: [PATCH 41/41] mpsc: early return for empty `PermitIterator` for drop logic --- tokio/src/sync/mpsc/bounded.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 95801931631..a1e0a82d9e2 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1545,6 +1545,10 @@ impl Drop for PermitIterator<'_, T> { fn drop(&mut self) { use chan::Semaphore; + if self.n == 0 { + return; + } + let semaphore = self.chan.semaphore(); // Add the remaining permits back to the semaphore