From 7341004535ffccc05ee8bd1fd856e587509335bf Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Mon, 1 Jan 2024 21:26:58 +0800 Subject: [PATCH] sync: add `{Receiver,UnboundedReceiver}::poll_recv_many` (#6236) --- tokio/src/sync/mpsc/bounded.rs | 81 +++++++++++++++++++++++++++++++- tokio/src/sync/mpsc/unbounded.rs | 81 +++++++++++++++++++++++++++++++- 2 files changed, 158 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 4aa8b6377ca..3a795d55774 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -464,8 +464,8 @@ impl Receiver { /// When the method returns `Poll::Pending`, the `Waker` in the provided /// `Context` is scheduled to receive a wakeup when a message is sent on any /// receiver, or when the channel is closed. Note that on multiple calls to - /// `poll_recv`, only the `Waker` from the `Context` passed to the most - /// recent call is scheduled to receive a wakeup. + /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` + /// passed to the most recent call is scheduled to receive a wakeup. /// /// If this method returns `Poll::Pending` due to a spurious failure, then /// the `Waker` will be notified when the situation causing the spurious @@ -475,6 +475,83 @@ impl Receiver { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { self.chan.recv(cx) } + + /// Polls to receive multiple messages on this channel, extending the provided buffer. + /// + /// This method returns: + /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a + /// spurious failure happens. + /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and + /// stored in `buffer`. This can be less than, or equal to, `limit`. + /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. Note that on multiple calls to + /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` + /// passed to the most recent call is scheduled to receive a wakeup. + /// + /// Note that this method does not guarantee that exactly `limit` messages + /// are received. Rather, if at least one message is available, it returns + /// as many messages as it can up to the given limit. This method returns + /// zero only if the channel is closed (or if `limit` is zero). + /// + /// # Examples + /// + /// ``` + /// use std::task::{Context, Poll}; + /// use std::pin::Pin; + /// use tokio::sync::mpsc; + /// use futures::Future; + /// + /// struct MyReceiverFuture<'a> { + /// receiver: mpsc::Receiver, + /// buffer: &'a mut Vec, + /// limit: usize, + /// } + /// + /// impl<'a> Future for MyReceiverFuture<'a> { + /// type Output = usize; // Number of messages received + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self; + /// + /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied + /// match receiver.poll_recv_many(cx, *buffer, *limit) { + /// Poll::Pending => Poll::Pending, + /// Poll::Ready(count) => Poll::Ready(count), + /// } + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel(32); + /// let mut buffer = Vec::new(); + /// + /// let my_receiver_future = MyReceiverFuture { + /// receiver: rx, + /// buffer: &mut buffer, + /// limit: 3, + /// }; + /// + /// for i in 0..10 { + /// tx.send(i).await.unwrap(); + /// } + /// + /// let count = my_receiver_future.await; + /// assert_eq!(count, 3); + /// assert_eq!(buffer, vec![0,1,2]) + /// } + /// ``` + pub fn poll_recv_many( + &mut self, + cx: &mut Context<'_>, + buffer: &mut Vec, + limit: usize, + ) -> Poll { + self.chan.recv_many(cx, buffer, limit) + } } impl fmt::Debug for Receiver { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index d996b8564af..7dff942ee70 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -343,8 +343,8 @@ impl UnboundedReceiver { /// When the method returns `Poll::Pending`, the `Waker` in the provided /// `Context` is scheduled to receive a wakeup when a message is sent on any /// receiver, or when the channel is closed. Note that on multiple calls to - /// `poll_recv`, only the `Waker` from the `Context` passed to the most - /// recent call is scheduled to receive a wakeup. + /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` + /// passed to the most recent call is scheduled to receive a wakeup. /// /// If this method returns `Poll::Pending` due to a spurious failure, then /// the `Waker` will be notified when the situation causing the spurious @@ -354,6 +354,83 @@ impl UnboundedReceiver { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { self.chan.recv(cx) } + + /// Polls to receive multiple messages on this channel, extending the provided buffer. + /// + /// This method returns: + /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a + /// spurious failure happens. + /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and + /// stored in `buffer`. This can be less than, or equal to, `limit`. + /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided + /// `Context` is scheduled to receive a wakeup when a message is sent on any + /// receiver, or when the channel is closed. Note that on multiple calls to + /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` + /// passed to the most recent call is scheduled to receive a wakeup. + /// + /// Note that this method does not guarantee that exactly `limit` messages + /// are received. Rather, if at least one message is available, it returns + /// as many messages as it can up to the given limit. This method returns + /// zero only if the channel is closed (or if `limit` is zero). + /// + /// # Examples + /// + /// ``` + /// use std::task::{Context, Poll}; + /// use std::pin::Pin; + /// use tokio::sync::mpsc; + /// use futures::Future; + /// + /// struct MyReceiverFuture<'a> { + /// receiver: mpsc::UnboundedReceiver, + /// buffer: &'a mut Vec, + /// limit: usize, + /// } + /// + /// impl<'a> Future for MyReceiverFuture<'a> { + /// type Output = usize; // Number of messages received + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self; + /// + /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied + /// match receiver.poll_recv_many(cx, *buffer, *limit) { + /// Poll::Pending => Poll::Pending, + /// Poll::Ready(count) => Poll::Ready(count), + /// } + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::unbounded_channel::(); + /// let mut buffer = Vec::new(); + /// + /// let my_receiver_future = MyReceiverFuture { + /// receiver: rx, + /// buffer: &mut buffer, + /// limit: 3, + /// }; + /// + /// for i in 0..10 { + /// tx.send(i).expect("Unable to send integer"); + /// } + /// + /// let count = my_receiver_future.await; + /// assert_eq!(count, 3); + /// assert_eq!(buffer, vec![0,1,2]) + /// } + /// ``` + pub fn poll_recv_many( + &mut self, + cx: &mut Context<'_>, + buffer: &mut Vec, + limit: usize, + ) -> Poll { + self.chan.recv_many(cx, buffer, limit) + } } impl UnboundedSender {