Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a way to request many permits on a mpsc::channel #6174

Closed
Totodore opened this issue Nov 28, 2023 · 1 comment · Fixed by #6205
Closed

Add a way to request many permits on a mpsc::channel #6174

Totodore opened this issue Nov 28, 2023 · 1 comment · Fixed by #6205
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync

Comments

@Totodore
Copy link
Contributor

Is your feature request related to a problem? Please describe.
I'm making a library where I'm exposing an API that transforms a message into many payloads, then insert them into an mpsc::channel. Because I want to be able to return an error with the original message if the channel is full I need to use the Permit API. However because I send n payload for 1 message I need to be able to ask n Permit efficiently before doing the message transformation.

Describe the solution you'd like
I noticed that the Semaphore API already has the possibility to ask n Permit and that the Permit API provided by the mpsc::channel is just a layer on top of it. So it shouldn't be too difficult to change the API. It could be something like that:

pub struct ManyPermit<'a, T> {
    chan: &'a chan::Tx<T, Semaphore>,
    n: u32,
}
pub fn try_reserve_many(&self, n: u32) -> Result<ManyPermit<'_, T>, TrySendError<()>> {
    match self.chan.semaphore().semaphore.try_acquire(n) {
        Ok(()) => {}
        Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
        Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
    }

    Ok(ManyPermit {
        chan: &self.chan,
        n,
    })
}
impl<T> ManyPermit<'_, T> {
    pub fn send(&mut self, value: T) {
				// Behavior here still needs some clarification between :
				// * returning an error
				// * Doing nothing
				// * Panic
        if self.n > 0 {
            self.chan.send(value);
            self.n -= 1;
        }
    }
}

impl<T> Drop for ManyPermit<'_, T> {
    fn drop(&mut self) {
        use chan::Semaphore;

        let semaphore = self.chan.semaphore();

        // Add the remaining permits back to the semaphore
        semaphore.add_permits(self.n as usize);

        // If this is the last sender for this channel, wake the receiver so
        // that it can be notified that the channel is closed.
        if semaphore.is_closed() && semaphore.is_idle() {
            self.chan.wake_rx();
        }
    }
}

Describe alternatives you've considered
There are two other possibilitie I considered:

  • Ask multiple permits and then return a Vec<Permit> but this would allocate a new vec, so it is quite inneficient.
  • Put a Semaphore with the same size next to my sender and synchronize it with my sender to track the number of given permits.

Additional context
I'm willing to work on this feature if it is accepted.

@Totodore Totodore added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Nov 28, 2023
@Totodore Totodore changed the title Add a way to request many permits on an mpsc::channel Add a way to request many permits on a mpsc::channel Nov 28, 2023
@Darksonn Darksonn added the M-sync Module: tokio/sync label Nov 29, 2023
@Totodore
Copy link
Contributor Author

Totodore commented Dec 6, 2023

I'm going to open a PR in the next days to propose changes. Please let me know if it is not doable or if there is any other way to fix this issue.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants