Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a way to reserve many permits on bounded mpsc channel #6205

Merged
merged 45 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
14d0af6
mpsc: add a way to reserve many permit on bounded mpsc chan
Totodore Dec 9, 2023
92c26b5
mpsc: apply clippy rules + fmt
Totodore Dec 9, 2023
b684f3c
mpsc: export the `ManyPermit` struct
Totodore Dec 9, 2023
c40178b
mpsc: fix fmt
Totodore Dec 9, 2023
53fb399
mpsc: return an `Iterator` impl for the `*_reserve_many` fns
Totodore Dec 10, 2023
242f171
mpsc: fix rustdoc
Totodore Dec 10, 2023
81c91da
mpsc: fix fmt
Totodore Dec 10, 2023
ad21c6c
mpsc: add an implementation for `ExactSizeIterator`
Totodore Dec 10, 2023
71c48ba
mpsc: apply doc suggestions
Totodore Dec 10, 2023
8dd3078
mpsc: fix `Debug` implementation for `PermitIterator`
Totodore Dec 10, 2023
9d9435b
mpsc: move the `u32` n to `usize` and only cast to `u32` for the inte…
Totodore Dec 10, 2023
28d8c51
mpsc: improve doc
Totodore Dec 10, 2023
08a8622
mpsc: change the internal `batch_semaphore` API to take usize permit …
Totodore Dec 10, 2023
5f9cb1a
mpsc: fix usize casting in special features
Totodore Dec 10, 2023
5bcc3f9
mpsc: fix usize casting in `try_reserve` fn for batch_semaphore
Totodore Dec 11, 2023
a3fd29e
mpsc: return a `SendError` if `n` > MAX_PERMIT for `*reserve_many` fns
Totodore Dec 11, 2023
419bfd3
mpsc: return a `SendError` if `n` > max_capacity for `*reserve_many` fns
Totodore Dec 11, 2023
006d7aa
Merge branch 'master' into feat-mpsc-many-permit
Totodore Dec 12, 2023
8f9d4c1
mpsc: add `try_reserve_many_fails` test
Totodore Dec 13, 2023
59944ba
mpsc: fix fmt
Totodore Dec 13, 2023
bffa2fa
mpsc: switch to `assert_ok` expr for testing
Totodore Dec 17, 2023
d9bf134
mpsc: return an empty iterator for `try_reserve_many(0)`
Totodore Dec 17, 2023
2ee4f76
mpsc: test `PermitIterator` and `reserve_many`
Totodore Dec 17, 2023
6a1ffbf
mpsc: fix fmt
Totodore Dec 17, 2023
d585434
Merge branch 'master' into feat-mpsc-many-permit
Totodore Dec 21, 2023
323e6b6
mpsc: fix `reserve_many_and_send` test
Totodore Dec 22, 2023
5496399
mpsc: impl `FusedIterator` for `PermitIterator`
Totodore Dec 22, 2023
ddd44d1
mpsc: test `reserve_many_on_closed_channel`
Totodore Dec 23, 2023
f103641
mpsc: doc mention `reserve_many` for Cancel Safety part
Totodore Dec 23, 2023
b688a05
mpsc: improve doc for `reserve_many` fn
Totodore Dec 23, 2023
3b705be
mpsc: fix formatting
Totodore Dec 23, 2023
430286b
mpsc: fix formatting
Totodore Dec 23, 2023
58f7648
mpsc: switch to `maybe_tokio_test`
Totodore Dec 23, 2023
a25fb7c
mpsc: add tests for `try_reserve_many`
Totodore Dec 23, 2023
2bd5c8b
Merge branch 'master' into feat-mpsc-many-permit
Totodore Dec 23, 2023
bd8c3e6
mpsc: add an early return if `n == 0` to avoid `acquire` mechanism
Totodore Dec 23, 2023
6988079
mpsc: improve doc for `reserve_many`
Totodore Dec 23, 2023
133f2c1
mpsc: remove early return for `reserve_inner`
Totodore Dec 23, 2023
c39878e
mpsc: remove useless empty iterator guard for `try_reserve_many`
Totodore Jan 2, 2024
f7025ce
mpsc: apply doc suggestion
Totodore Jan 2, 2024
0d19901
mpsc: Apply suggestions from code review
Totodore Jan 2, 2024
3fdb5d9
mpsc: fix `sync_mpsc` tests
Totodore Jan 2, 2024
e5f1df8
mpsc: fix `sync_mpsc` tests
Totodore Jan 2, 2024
830188e
Merge branch 'master' into feat-mpsc-many-permit
Totodore Jan 2, 2024
ec8b537
mpsc: early return for empty `PermitIterator` for drop logic
Totodore Jan 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 213 additions & 4 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ pub struct Permit<'a, T> {
chan: &'a chan::Tx<T, Semaphore>,
}

/// Permits to send `n` values into the channel.
///
/// `ManyPermit` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
/// and are used to guarantee channel capacity before generating `n` message to send.
Totodore marked this conversation as resolved.
Show resolved Hide resolved
///
/// [`Sender::reserve_many()`]: Sender::reserve_many
/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
pub struct ManyPermit<'a, T> {
chan: &'a chan::Tx<T, Semaphore>,
n: u32,
}

/// Owned permit to send one value into the channel.
///
/// This is identical to the [`Permit`] type, except that it moves the sender
Expand Down Expand Up @@ -849,10 +861,67 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
self.reserve_inner().await?;
self.reserve_inner(1).await?;
Ok(Permit { chan: &self.chan })
}

/// Waits for channel capacity. Once capacity to send `n` message is
Totodore marked this conversation as resolved.
Show resolved Hide resolved
/// available, it is reserved for the caller.
///
/// If the channel is full, the function waits for the number of unreceived
/// messages to become less than the channel capacity. Capacity to send `n`
Totodore marked this conversation as resolved.
Show resolved Hide resolved
/// message is reserved for the caller. A [`ManyPermit`] is returned to track
/// the reserved capacity. The [`send`] function on [`ManyPermit`] consumes the
/// reserved capacity.
///
/// Dropping [`ManyPermit`] without sending a message releases the capacity back
/// to the channel.
///
/// [`ManyPermit`]: ManyPermit
/// [`send`]: ManyPermit::send
///
/// # Cancel safety
///
/// This channel uses a queue to ensure that calls to `send` and `reserve`
/// complete in the order they were requested. Cancelling a call to
/// `reserve` makes you lose your place in the queue.
Totodore marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(2);
///
/// // Reserve capacity
/// let mut permit = tx.reserve_many(2).await.unwrap();
///
/// // Trying to send directly on the `tx` will fail due to no
/// // available capacity.
/// assert!(tx.try_send(123).is_err());
///
/// // Sending on the permit succeeds
/// permit.send(456).unwrap();
/// permit.send(457).unwrap();
///
/// // The third should fail due to no available capacity
/// permit.send(458).unwrap_err();
///
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
/// assert_eq!(rx.recv().await.unwrap(), 457);
/// }
/// ```
pub async fn reserve_many(&self, n: u32) -> Result<ManyPermit<'_, T>, SendError<()>> {
Totodore marked this conversation as resolved.
Show resolved Hide resolved
self.reserve_inner(n).await?;
Ok(ManyPermit {
chan: &self.chan,
n,
})
}

/// Waits for channel capacity, moving the `Sender` and returning an owned
/// permit. Once capacity to send one message is available, it is reserved
/// for the caller.
Expand Down Expand Up @@ -934,16 +1003,16 @@ impl<T> Sender<T> {
/// [`send`]: OwnedPermit::send
/// [`Arc::clone`]: std::sync::Arc::clone
pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
self.reserve_inner().await?;
self.reserve_inner(1).await?;
Ok(OwnedPermit {
chan: Some(self.chan),
})
}

async fn reserve_inner(&self) -> Result<(), SendError<()>> {
async fn reserve_inner(&self, n: u32) -> Result<(), SendError<()>> {
crate::trace::async_trace_leaf().await;

match self.chan.semaphore().semaphore.acquire(1).await {
match self.chan.semaphore().semaphore.acquire(n).await {
Ok(()) => Ok(()),
Err(_) => Err(SendError(())),
}
Expand Down Expand Up @@ -1002,6 +1071,67 @@ impl<T> Sender<T> {
Ok(Permit { chan: &self.chan })
}

/// Tries to acquire n slot in the channel without waiting for the slot to become
Totodore marked this conversation as resolved.
Show resolved Hide resolved
/// available.
///
/// If the channel is full this function will return [`TrySendError`], otherwise
/// if there is a slot available it will return a [`ManyPermit`] that will then allow you
/// to [`send`] on the channel with a guaranteed slot. This function is similar to
/// [`reserve_many`] except it does not await for the slot to become available.
///
/// Dropping [`ManyPermit`] without sending a message releases the capacity back
/// to the channel.
///
/// [`ManyPermit`]: ManyPermit
/// [`send`]: ManyPermit::send
/// [`reserve_many`]: Sender::reserve_many
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(2);
///
/// // Reserve capacity
/// let mut permit = tx.try_reserve_many(2).unwrap();
///
/// // Trying to send directly on the `tx` will fail due to no
/// // available capacity.
/// assert!(tx.try_send(123).is_err());
///
/// // Trying to reserve an additional slot on the `tx` will
/// // fail because there is no capacity.
/// assert!(tx.try_reserve().is_err());
///
/// // Sending on the permit succeeds
/// permit.send(456).unwrap();
/// permit.send(457).unwrap();
///
/// // The third should fail due to no available capacity
/// permit.send(458).unwrap_err();
///
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
/// assert_eq!(rx.recv().await.unwrap(), 457);
///
/// }
/// ```
pub fn try_reserve_many(&self, n: u32) -> Result<ManyPermit<'_, T>, TrySendError<()>> {
match self.chan.semaphore().semaphore.try_acquire(n) {
Ok(()) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
}

Ok(ManyPermit {
chan: &self.chan,
n,
})
}

/// Tries to acquire a slot in the channel without waiting for the slot to become
/// available, returning an owned permit.
///
Expand Down Expand Up @@ -1278,6 +1408,85 @@ impl<T> fmt::Debug for Permit<'_, T> {
}
}

// ===== impl ManyPermit =====

impl<T> ManyPermit<'_, T> {
/// Sends a value using the reserved capacity.
///
/// Capacity for the messages has already been reserved. The message is sent
/// to the receiver and the permit capacity is reduced by one. If there is no
/// remaining capacity a [`SendError`] will be returned with the provided value.
/// The operation will succeed even if the receiver half has been closed.
/// See [`Receiver::close`] for more details on performing a clean shutdown.
///
/// [`Receiver::close`]: Receiver::close
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(2);
///
/// // Reserve capacity
/// let mut permit = tx.reserve_many(2).await.unwrap();
///
/// // Trying to send directly on the `tx` will fail due to no
/// // available capacity.
/// assert!(tx.try_send(123).is_err());
///
/// // Send 2 messages on the permit
/// permit.send(456).unwrap();
/// permit.send(457).unwrap();
///
/// // The third should fail due to no available capacity
/// permit.send(458).unwrap_err();
///
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
/// assert_eq!(rx.recv().await.unwrap(), 457);
/// }
/// ```
pub fn send(&mut self, value: T) -> Result<(), SendError<T>> {
// There is no remaining capacity on this permit
if self.n == 0 {
return Err(SendError(value));
}

self.chan.send(value);
self.n -= 1;
Ok(())
}
}

impl<T> Drop for ManyPermit<'_, T> {
fn drop(&mut self) {
use chan::Semaphore;

let semaphore = self.chan.semaphore();

// Add the remaining permits back to the semaphore
semaphore.add_permits(self.n as usize);

// If this is the last sender for this channel, wake the receiver so
// that it can be notified that the channel is closed.
if semaphore.is_closed() && semaphore.is_idle() {
self.chan.wake_rx();
}
}
}

impl<T> fmt::Debug for ManyPermit<'_, T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Permit")
Totodore marked this conversation as resolved.
Show resolved Hide resolved
.field("chan", &self.chan)
.field("capacity", &self.n)
.finish()
}
}

// ===== impl Permit =====

impl<T> OwnedPermit<T> {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
pub(super) mod block;

mod bounded;
pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender, WeakSender};
pub use self::bounded::{channel, ManyPermit, OwnedPermit, Permit, Receiver, Sender, WeakSender};

mod chan;

Expand Down
Loading