Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add poll_recv_many for Receiver & UnboundedReceiver #6236

Merged
74 changes: 74 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,80 @@ impl<T> Receiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Polls to receive multiple messages on this channel, extending the provided buffer.
///
/// This method returns:
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens.
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
/// stored in `buffer`. This can be less than, or equal to, `limit`.
/// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
///
/// When the method returns `Poll::Pending`, the `Waker` from the `Context` is scheduled
/// to be woken up when new messages are sent on the channel or when the channel is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the same wording as on poll_recv. That's a standardized wording taking from Future::poll, and we use the same wording on all poll functions with a note of this kind.

Suggested change
/// When the method returns `Poll::Pending`, the `Waker` from the `Context` is scheduled
/// to be woken up when new messages are sent on the channel or when the channel is closed.
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.

Please also update the documentation of poll_recv to say "poll_recv or poll_recv_many" like how I did here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn Thanks a lot. I've revised accordingly.

///
/// Note that this method does not guarantee that exactly `limit` messages
/// are received. Rather, if at least one message is available, it returns
/// as many messages as it can up to the given limit. This method returns
/// zero only if the channel is closed (or if `limit` is zero).
///
/// # Examples
///
/// ```
/// use std::task::{Context, Poll};
/// use std::pin::Pin;
/// use tokio::sync::mpsc;
/// use futures::Future;
///
/// struct MyReceiverFuture<'a> {
/// receiver: mpsc::Receiver<i32>,
/// buffer: &'a mut Vec<i32>,
/// limit: usize,
/// }
///
/// impl<'a> Future for MyReceiverFuture<'a> {
/// type Output = usize; // Number of messages received
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
///
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
/// Poll::Pending => Poll::Pending,
/// Poll::Ready(count) => Poll::Ready(count),
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel(32);
/// let mut buffer = Vec::new();
///
/// let my_receiver_future = MyReceiverFuture {
/// receiver: rx,
/// buffer: &mut buffer,
/// limit: 3,
/// };
///
/// for i in 0..10 {
/// tx.send(i).await.unwrap();
/// }
///
/// let count = my_receiver_future.await;
/// assert_eq!(count, 3);
/// assert_eq!(buffer, vec![0,1,2])
/// }
/// ```
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
}

impl<T> fmt::Debug for Receiver<T> {
Expand Down
74 changes: 74 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,80 @@ impl<T> UnboundedReceiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Polls to receive multiple messages on this channel, extending the provided buffer.
///
/// This method returns:
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens.
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
/// stored in `buffer`. This can be less than, or equal to, `limit`.
/// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
///
/// When the method returns `Poll::Pending`, the `Waker` from the `Context` is scheduled
/// to be woken up when new messages are sent on the channel or when the channel is closed.
///
/// Note that this method does not guarantee that exactly `limit` messages
/// are received. Rather, if at least one message is available, it returns
/// as many messages as it can up to the given limit. This method returns
/// zero only if the channel is closed (or if `limit` is zero).
///
/// # Examples
///
/// ```
/// use std::task::{Context, Poll};
/// use std::pin::Pin;
/// use tokio::sync::mpsc;
/// use futures::Future;
///
/// struct MyReceiverFuture<'a> {
/// receiver: mpsc::UnboundedReceiver<i32>,
/// buffer: &'a mut Vec<i32>,
/// limit: usize,
/// }
///
/// impl<'a> Future for MyReceiverFuture<'a> {
/// type Output = usize; // Number of messages received
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
///
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
/// Poll::Pending => Poll::Pending,
/// Poll::Ready(count) => Poll::Ready(count),
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::unbounded_channel::<i32>();
/// let mut buffer = Vec::new();
///
/// let my_receiver_future = MyReceiverFuture {
/// receiver: rx,
/// buffer: &mut buffer,
/// limit: 3,
/// };
///
/// for i in 0..10 {
/// tx.send(i).expect("Unable to send integer");
/// }
///
/// let count = my_receiver_future.await;
/// assert_eq!(count, 3);
/// assert_eq!(buffer, vec![0,1,2])
/// }
/// ```
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
}

impl<T> UnboundedSender<T> {
Expand Down