Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add poll_recv_many for Receiver & UnboundedReceiver #6236

Merged
81 changes: 79 additions & 2 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,8 @@ impl<T> Receiver<T> {
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// If this method returns `Poll::Pending` due to a spurious failure, then
/// the `Waker` will be notified when the situation causing the spurious
Expand All @@ -475,6 +475,83 @@ impl<T> Receiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Polls to receive multiple messages on this channel, extending the provided buffer.
///
/// This method returns:
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens.
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
/// stored in `buffer`. This can be less than, or equal to, `limit`.
/// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// Note that this method does not guarantee that exactly `limit` messages
/// are received. Rather, if at least one message is available, it returns
/// as many messages as it can up to the given limit. This method returns
/// zero only if the channel is closed (or if `limit` is zero).
///
/// # Examples
///
/// ```
/// use std::task::{Context, Poll};
/// use std::pin::Pin;
/// use tokio::sync::mpsc;
/// use futures::Future;
///
/// struct MyReceiverFuture<'a> {
/// receiver: mpsc::Receiver<i32>,
/// buffer: &'a mut Vec<i32>,
/// limit: usize,
/// }
///
/// impl<'a> Future for MyReceiverFuture<'a> {
/// type Output = usize; // Number of messages received
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
///
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
/// Poll::Pending => Poll::Pending,
/// Poll::Ready(count) => Poll::Ready(count),
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel(32);
/// let mut buffer = Vec::new();
///
/// let my_receiver_future = MyReceiverFuture {
/// receiver: rx,
/// buffer: &mut buffer,
/// limit: 3,
/// };
///
/// for i in 0..10 {
/// tx.send(i).await.unwrap();
/// }
///
/// let count = my_receiver_future.await;
/// assert_eq!(count, 3);
/// assert_eq!(buffer, vec![0,1,2])
/// }
/// ```
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
}

impl<T> fmt::Debug for Receiver<T> {
Expand Down
81 changes: 79 additions & 2 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ impl<T> UnboundedReceiver<T> {
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// If this method returns `Poll::Pending` due to a spurious failure, then
/// the `Waker` will be notified when the situation causing the spurious
Expand All @@ -354,6 +354,83 @@ impl<T> UnboundedReceiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Polls to receive multiple messages on this channel, extending the provided buffer.
///
/// This method returns:
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens.
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
/// stored in `buffer`. This can be less than, or equal to, `limit`.
/// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.
///
/// Note that this method does not guarantee that exactly `limit` messages
/// are received. Rather, if at least one message is available, it returns
/// as many messages as it can up to the given limit. This method returns
/// zero only if the channel is closed (or if `limit` is zero).
///
/// # Examples
///
/// ```
/// use std::task::{Context, Poll};
/// use std::pin::Pin;
/// use tokio::sync::mpsc;
/// use futures::Future;
///
/// struct MyReceiverFuture<'a> {
/// receiver: mpsc::UnboundedReceiver<i32>,
/// buffer: &'a mut Vec<i32>,
/// limit: usize,
/// }
///
/// impl<'a> Future for MyReceiverFuture<'a> {
/// type Output = usize; // Number of messages received
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
///
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
/// Poll::Pending => Poll::Pending,
/// Poll::Ready(count) => Poll::Ready(count),
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::unbounded_channel::<i32>();
/// let mut buffer = Vec::new();
///
/// let my_receiver_future = MyReceiverFuture {
/// receiver: rx,
/// buffer: &mut buffer,
/// limit: 3,
/// };
///
/// for i in 0..10 {
/// tx.send(i).expect("Unable to send integer");
/// }
///
/// let count = my_receiver_future.await;
/// assert_eq!(count, 3);
/// assert_eq!(buffer, vec![0,1,2])
/// }
/// ```
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
}

impl<T> UnboundedSender<T> {
Expand Down