Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add poll_recv_many for Receiver & UnboundedReceiver #6236

Merged

Conversation

Owen-CH-Leung
Copy link
Contributor

fixes #6209

Add a new API poll_recv_many for Receiver & UnboundedReceiver

Motivation

Further to the implementation of recv_many, it is suggested to also add similar functionality for the API poll_recv. Hence, a new API poll_recv_many is created

Solution

This PR adds a new API poll_recv_many into Receiver & UnboundedReceiver. An example is also added for reference

Add a new API poll_recv_many for Receiver & UnboundedReceiver
@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Dec 20, 2023
@Owen-CH-Leung Owen-CH-Leung marked this pull request as ready for review December 20, 2023 15:02
@Darksonn
Copy link
Contributor

Here is the documentation for poll_recv. I was hoping to see an explanation of what it does in the same style as this.

    /// Polls to receive the next message on this channel.
    ///
    /// This method returns:
    ///
    ///  * `Poll::Pending` if no messages are available but the channel is not
    ///    closed, or if a spurious failure happens.
    ///  * `Poll::Ready(Some(message))` if a message is available.
    ///  * `Poll::Ready(None)` if the channel has been closed and all messages
    ///    sent before it was closed have been received.
    ///
    /// When the method returns `Poll::Pending`, the `Waker` in the provided
    /// `Context` is scheduled to receive a wakeup when a message is sent on any
    /// receiver, or when the channel is closed.  Note that on multiple calls to
    /// `poll_recv`, only the `Waker` from the `Context` passed to the most
    /// recent call is scheduled to receive a wakeup.
    ///
    /// If this method returns `Poll::Pending` due to a spurious failure, then
    /// the `Waker` will be notified when the situation causing the spurious
    /// failure has been resolved. Note that receiving such a wakeup does not
    /// guarantee that the next call will succeed — it could fail with another
    /// spurious failure.
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        self.chan.recv(cx)
    }

@Owen-CH-Leung
Copy link
Contributor Author

Here is the documentation for poll_recv. I was hoping to see an explanation of what it does in the same style as this.

    /// Polls to receive the next message on this channel.
    ///
    /// This method returns:
    ///
    ///  * `Poll::Pending` if no messages are available but the channel is not
    ///    closed, or if a spurious failure happens.
    ///  * `Poll::Ready(Some(message))` if a message is available.
    ///  * `Poll::Ready(None)` if the channel has been closed and all messages
    ///    sent before it was closed have been received.
    ///
    /// When the method returns `Poll::Pending`, the `Waker` in the provided
    /// `Context` is scheduled to receive a wakeup when a message is sent on any
    /// receiver, or when the channel is closed.  Note that on multiple calls to
    /// `poll_recv`, only the `Waker` from the `Context` passed to the most
    /// recent call is scheduled to receive a wakeup.
    ///
    /// If this method returns `Poll::Pending` due to a spurious failure, then
    /// the `Waker` will be notified when the situation causing the spurious
    /// failure has been resolved. Note that receiving such a wakeup does not
    /// guarantee that the next call will succeed — it could fail with another
    /// spurious failure.
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        self.chan.recv(cx)
    }

@Darksonn Thanks for your feedback! I've revised the documentation to be more detailed & clear on what this API does. Could you review again ? Thanks

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Dec 22, 2023
Comment on lines 492 to 493
/// spurious failure happens. In such cases, the `Waker` from the `Context` is scheduled
/// to be woken up when new messages are sent on the channel or when the channel is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part about the waker should be moved below this list, similar to poll_recv.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks & I've included your suggestions in my latest commit

Comment on lines 494 to 497
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
/// stored in `buffer`. This can be less than, or equal to, `limit`. If the channel is closed
/// and there are no more messages to receive, `count` will be the number of messages received
/// before the channel was closed, which could be zero.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split this into two cases: Poll::Ready(count) for count > 0 and Poll::Ready(0). The length-zero case is special because it only happens when the channel is closed (or n == 0).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup that will be much clearer. I've split it into 2

Comment on lines 481 to 488
/// This method attempts to receive messages from the channel and store them in the provided buffer,
/// up to the specified `limit`. It behaves similarly to `poll_recv` but for multiple messages. If `limit`
/// is zero, the function returns immediately with `0`.
///
/// This method also shares similarities with `recv_many`, including its behavior in response to channel closure
/// and message availability. For `limit > 0`, if there are no messages in the channel's queue,
/// but the channel has not yet been closed, this method will sleep until a message is sent or
/// the channel is closed. The channel is closed when all senders have been dropped, or when `close` is called.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two paragraphs can probably just be deleted. As long as the list below is comprehensive, this isn't necessary, and it is obvious that it is similar to poll_recv and recv_many.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure & deleted

Comment on lines 499 to 501
/// Note that this method does not guarantee that the buffer will be filled up to `limit` on each call,
/// especially if fewer messages are available. Also, the actual number of messages received can be
/// zero if the channel is empty or closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The English used here is rather complicated. Perhaps we could simplify it like this?

Suggested change
/// Note that this method does not guarantee that the buffer will be filled up to `limit` on each call,
/// especially if fewer messages are available. Also, the actual number of messages received can be
/// zero if the channel is empty or closed.
/// Note that this method does not guarantee that exactly `limit` messages
/// are received. Rather, if at least one message is available, it returns
/// as many messages as it can up to the given limit. This method returns
/// zero only if the channel is closed (or if `limit` is zero).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks & I've adopted your suggestion

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one changed I'd like left, then I think this is ready to be merged.

Comment on lines 488 to 489
/// When the method returns `Poll::Pending`, the `Waker` from the `Context` is scheduled
/// to be woken up when new messages are sent on the channel or when the channel is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the same wording as on poll_recv. That's a standardized wording taking from Future::poll, and we use the same wording on all poll functions with a note of this kind.

Suggested change
/// When the method returns `Poll::Pending`, the `Waker` from the `Context` is scheduled
/// to be woken up when new messages are sent on the channel or when the channel is closed.
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
/// passed to the most recent call is scheduled to receive a wakeup.

Please also update the documentation of poll_recv to say "poll_recv or poll_recv_many" like how I did here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn Thanks a lot. I've revised accordingly.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

@Darksonn Darksonn merged commit 7341004 into tokio-rs:master Jan 1, 2024
71 checks passed
kodiakhq bot pushed a commit to pdylanross/fatigue that referenced this pull request Feb 5, 2024
Bumps tokio from 1.35.1 to 1.36.0.

Release notes
Sourced from tokio's releases.

Tokio v1.36.0
1.36.0 (February 2nd, 2024)
Added

io: add tokio::io::Join (#6220)
io: implement AsyncWrite for Empty (#6235)
net: add support for anonymous unix pipes (#6127)
net: add UnixSocket (#6290)
net: expose keepalive option on TcpSocket (#6311)
sync: add {Receiver,UnboundedReceiver}::poll_recv_many (#6236)
sync: add Sender::{try_,}reserve_many (#6205)
sync: add watch::Receiver::mark_unchanged (#6252)
task: add JoinSet::try_join_next (#6280)

Changed

io: make copy cooperative (#6265)
io: make repeat and sink cooperative (#6254)
io: simplify check for empty slice (#6293)
process: use pidfd on Linux when available (#6152)
sync: use AtomicBool in broadcast channel future (#6298)

Documented

io: clarify clear_ready docs (#6304)
net: document that *Fd traits on TcpSocket are unix-only (#6294)
sync: document FIFO behavior of tokio::sync::Mutex (#6279)
chore: typographic improvements (#6262)
runtime: remove obsolete comment (#6303)
task: fix typo (#6261)

#6220: tokio-rs/tokio#6220
#6235: tokio-rs/tokio#6235
#6127: tokio-rs/tokio#6127
#6290: tokio-rs/tokio#6290
#6311: tokio-rs/tokio#6311
#6236: tokio-rs/tokio#6236
#6205: tokio-rs/tokio#6205
#6252: tokio-rs/tokio#6252
#6280: tokio-rs/tokio#6280
#6265: tokio-rs/tokio#6265
#6254: tokio-rs/tokio#6254
#6293: tokio-rs/tokio#6293
#6238: tokio-rs/tokio#6238
#6152: tokio-rs/tokio#6152
#6298: tokio-rs/tokio#6298
#6262: tokio-rs/tokio#6262
#6303: tokio-rs/tokio#6303
#6261: tokio-rs/tokio#6261


... (truncated)


Commits

eaf81ed chore: prepare Tokio v1.36.0 (#6312)
53f9e5a ci: make sure dictionary words are sorted and unique (#6316)
9077762 net: expose keepalive option on TcpSocket (#6311)
131e7b4 ci: add spellchecking (#6297)
e53b92a io: clarify clear_ready docs (#6304)
7536132 sync: use AtomicBool in broadcast channel future (#6298)
b6d0c90 macros: fix trait_method breaking change detection (#6308)
4846959 runtime: remove obsolete comment (#6303)
ec30383 net: add UnixSocket (#6290)
f80bbec io: simplify check for empty slice (#6293)
Additional commits viewable in compare view




Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


Dependabot commands and options

You can trigger Dependabot actions by commenting on this PR:

@dependabot rebase will rebase this PR
@dependabot recreate will recreate this PR, overwriting any edits that have been made to it
@dependabot merge will merge this PR after your CI passes on it
@dependabot squash and merge will squash and merge this PR after your CI passes on it
@dependabot cancel merge will cancel a previously requested merge and block automerging
@dependabot reopen will reopen this PR if it is closed
@dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
@dependabot show <dependency name> ignore conditions will show all of the ignore conditions of the specified dependency
@dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add poll_recv_many for mpsc Receiver/UnboundedReceiver.
2 participants