Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: make copy cooperative #6265

Merged
merged 2 commits into from
Jan 6, 2024
Merged

Conversation

hi-rustin
Copy link
Contributor

Motivation

close #6250

Solution

Add poll_proceed_and_make_progress to check if it exceeds the budget.

Copy link
Contributor Author

@hi-rustin hi-rustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔢 Self-check

@@ -138,3 +138,28 @@ async fn immediate_exit_on_read_error() {

assert!(copy_bidirectional(&mut a, &mut b).await.is_err());
}

#[tokio::test]
async fn copy_bidirectional_is_cooperative() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the copy_bidirectional is already cooperative before I make the CopyBuffer cooperative. I'm not sure why. Could I be using the wrong case to test it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably change this test to use something different than tokio_test::io::Builder to fix that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use symmetric, but it also didn't work.

@@ -82,6 +83,8 @@ impl CopyBuffer {
R: AsyncRead + ?Sized,
W: AsyncWrite + ?Sized,
{
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should only make progress if it actually copied data.

See the mpsc channel for an example:

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

Some(Read::Value(value)) => {
self.inner.semaphore.add_permit();
coop.made_progress();
return Ready(Some(value));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review.
How about now? After filling, writing, and flushing data, I called make_progress.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like in the examples I linked, you need to call crate::runtime::coop::poll_proceed at the start of the function, and then call coop.made_progress() in the places where you are currently calling poll_proceed_and_make_progress.

It's not enough to just call it when it exits. The poll_proceed_and_make_progress function can only be used for functions that never return Poll::Pending.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like in the examples I linked, you need to call crate::runtime::coop::poll_proceed at the start of the function, and then call coop.made_progress()

But the crate::runtime::coop::poll_proceed only exists if we enable the coop feature. What should we do if we don't have the coop feature?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then do nothing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. I just used #[cfg(any( feature = "fs", feature = "io-std", feature = "net", feature = "process", feature = "rt", feature = "signal", feature = "sync", feature = "time", ))] to control it. I am not sure if we have a better way to do it. Thanks for your help. Sorry for those stupid questions. 😢

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have a macro for doing that. You should be able to write

cfg_coop! {
    let coop = ready!(crate::runtime::coop::poll_proceed(cx));
}

And similar for the calls to make_progress. The macro does not actually introduce a scope, so I believe this should work.

Your questions are not stupid :). You're asking questions about internal Tokio APIs that don't exist elsewhere. It's no surprise that you are not familiar with them. If we did not have the cfg_coop! macro, then what you did would be the only way to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply.

Actually, I tried it. But it seemed this macro only supports starting with an item keyword.

Do you think I should extend it to support it? (I will try, but I am unsure if it is doable).

@hi-rustin hi-rustin force-pushed the rustin-patch-io-copy branch 4 times, most recently from 3ee6af2 to 8e0546a Compare January 5, 2024 16:03
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-io Module: tokio/io M-coop Module: tokio/coop labels Jan 6, 2024
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. If the cfg_coop! macro can't do this today, then this is good enough.

@Darksonn Darksonn merged commit 3275cfb into tokio-rs:master Jan 6, 2024
76 checks passed
@hi-rustin
Copy link
Contributor Author

Thanks for your review! 💚 💙 💜 💛 ❤️

kodiakhq bot pushed a commit to pdylanross/fatigue that referenced this pull request Feb 5, 2024
Bumps tokio from 1.35.1 to 1.36.0.

Release notes
Sourced from tokio's releases.

Tokio v1.36.0
1.36.0 (February 2nd, 2024)
Added

io: add tokio::io::Join (#6220)
io: implement AsyncWrite for Empty (#6235)
net: add support for anonymous unix pipes (#6127)
net: add UnixSocket (#6290)
net: expose keepalive option on TcpSocket (#6311)
sync: add {Receiver,UnboundedReceiver}::poll_recv_many (#6236)
sync: add Sender::{try_,}reserve_many (#6205)
sync: add watch::Receiver::mark_unchanged (#6252)
task: add JoinSet::try_join_next (#6280)

Changed

io: make copy cooperative (#6265)
io: make repeat and sink cooperative (#6254)
io: simplify check for empty slice (#6293)
process: use pidfd on Linux when available (#6152)
sync: use AtomicBool in broadcast channel future (#6298)

Documented

io: clarify clear_ready docs (#6304)
net: document that *Fd traits on TcpSocket are unix-only (#6294)
sync: document FIFO behavior of tokio::sync::Mutex (#6279)
chore: typographic improvements (#6262)
runtime: remove obsolete comment (#6303)
task: fix typo (#6261)

#6220: tokio-rs/tokio#6220
#6235: tokio-rs/tokio#6235
#6127: tokio-rs/tokio#6127
#6290: tokio-rs/tokio#6290
#6311: tokio-rs/tokio#6311
#6236: tokio-rs/tokio#6236
#6205: tokio-rs/tokio#6205
#6252: tokio-rs/tokio#6252
#6280: tokio-rs/tokio#6280
#6265: tokio-rs/tokio#6265
#6254: tokio-rs/tokio#6254
#6293: tokio-rs/tokio#6293
#6238: tokio-rs/tokio#6238
#6152: tokio-rs/tokio#6152
#6298: tokio-rs/tokio#6298
#6262: tokio-rs/tokio#6262
#6303: tokio-rs/tokio#6303
#6261: tokio-rs/tokio#6261


... (truncated)


Commits

eaf81ed chore: prepare Tokio v1.36.0 (#6312)
53f9e5a ci: make sure dictionary words are sorted and unique (#6316)
9077762 net: expose keepalive option on TcpSocket (#6311)
131e7b4 ci: add spellchecking (#6297)
e53b92a io: clarify clear_ready docs (#6304)
7536132 sync: use AtomicBool in broadcast channel future (#6298)
b6d0c90 macros: fix trait_method breaking change detection (#6308)
4846959 runtime: remove obsolete comment (#6303)
ec30383 net: add UnixSocket (#6290)
f80bbec io: simplify check for empty slice (#6293)
Additional commits viewable in compare view




Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


Dependabot commands and options

You can trigger Dependabot actions by commenting on this PR:

@dependabot rebase will rebase this PR
@dependabot recreate will recreate this PR, overwriting any edits that have been made to it
@dependabot merge will merge this PR after your CI passes on it
@dependabot squash and merge will squash and merge this PR after your CI passes on it
@dependabot cancel merge will cancel a previously requested merge and block automerging
@dependabot reopen will reopen this PR if it is closed
@dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
@dependabot show <dependency name> ignore conditions will show all of the ignore conditions of the specified dependency
@dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-coop Module: tokio/coop M-io Module: tokio/io
Projects
None yet
Development

Successfully merging this pull request may close these issues.

tokio::io::copy should yield if it copies too many times
2 participants