Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement try_join_next #6280

Merged
merged 9 commits into from
Jan 14, 2024
Merged

Conversation

maminrayej
Copy link
Member

@maminrayej maminrayej commented Jan 11, 2024

This PR implements the try_join_next function on JoinSet. It also adds a test to make sure the implementation works.

There is also join_next_with_id which is unstable. I could also add try_join_next_with_id if this implementation is acceptable. Or, it could be added later on when join_next_with_id is stable.

Resolves #6277.

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-task Module: tokio/task labels Jan 11, 2024
Comment on lines 320 to 322
} else {
None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case happens if we successfully pop a JoinHandle from the list of notified tasks, but it turns out that the JoinHandle is not actually ready yet. In this case, we should not return None because there might be another JoinHandle in the list of notified task that is actually ready.

Instead, please wrap this function in a loop and go around the loop to try again in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Made the adjustments.

Comment on lines 320 to 323
let fut = unconstrained(jh);
pin!(fut);

fut.poll(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will work:

Suggested change
let fut = unconstrained(jh);
pin!(fut);
fut.poll(ctx)
Pin::new(&mut unconstrained(jh)).poll(ctx)

Comment on lines +326 to +330
if let Poll::Ready(res) = res {
let _entry = entry.remove();

return Some(res);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment about why you go around the loop when it returns Pending.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.

I've added this comment at the top of the function to explain why the loop exists. Is it too concise?

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable. Adding the version with the id sounds reasonable as well.

@maminrayej
Copy link
Member Author

@Darksonn I've implemented try_join_next_with_id and added a test for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is nice. Especially the fact that you cover the case with coop budgeting. You don't have any tests that handle the case where it returns None even though there are tasks, because all the tasks are still running. Could you do that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!
I've updated the tests so they also test the case where there are no finished task.

Comment on lines +247 to +248
send.send_replace(());
send.closed().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not actually guaranteed that the tasks are considered finished by the JoinSet when closed().await returns. This is because tasks are marked finished after the destructor runs, but the channels are dropped while the destructor runs.

If you make this into a current-thread runtime, then this race goes away.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right. Fixed it.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@Darksonn Darksonn enabled auto-merge (squash) January 14, 2024 10:30
@Darksonn Darksonn merged commit 12ce924 into tokio-rs:master Jan 14, 2024
71 checks passed
kodiakhq bot pushed a commit to pdylanross/fatigue that referenced this pull request Feb 5, 2024
Bumps tokio from 1.35.1 to 1.36.0.

Release notes
Sourced from tokio's releases.

Tokio v1.36.0
1.36.0 (February 2nd, 2024)
Added

io: add tokio::io::Join (#6220)
io: implement AsyncWrite for Empty (#6235)
net: add support for anonymous unix pipes (#6127)
net: add UnixSocket (#6290)
net: expose keepalive option on TcpSocket (#6311)
sync: add {Receiver,UnboundedReceiver}::poll_recv_many (#6236)
sync: add Sender::{try_,}reserve_many (#6205)
sync: add watch::Receiver::mark_unchanged (#6252)
task: add JoinSet::try_join_next (#6280)

Changed

io: make copy cooperative (#6265)
io: make repeat and sink cooperative (#6254)
io: simplify check for empty slice (#6293)
process: use pidfd on Linux when available (#6152)
sync: use AtomicBool in broadcast channel future (#6298)

Documented

io: clarify clear_ready docs (#6304)
net: document that *Fd traits on TcpSocket are unix-only (#6294)
sync: document FIFO behavior of tokio::sync::Mutex (#6279)
chore: typographic improvements (#6262)
runtime: remove obsolete comment (#6303)
task: fix typo (#6261)

#6220: tokio-rs/tokio#6220
#6235: tokio-rs/tokio#6235
#6127: tokio-rs/tokio#6127
#6290: tokio-rs/tokio#6290
#6311: tokio-rs/tokio#6311
#6236: tokio-rs/tokio#6236
#6205: tokio-rs/tokio#6205
#6252: tokio-rs/tokio#6252
#6280: tokio-rs/tokio#6280
#6265: tokio-rs/tokio#6265
#6254: tokio-rs/tokio#6254
#6293: tokio-rs/tokio#6293
#6238: tokio-rs/tokio#6238
#6152: tokio-rs/tokio#6152
#6298: tokio-rs/tokio#6298
#6262: tokio-rs/tokio#6262
#6303: tokio-rs/tokio#6303
#6261: tokio-rs/tokio#6261


... (truncated)


Commits

eaf81ed chore: prepare Tokio v1.36.0 (#6312)
53f9e5a ci: make sure dictionary words are sorted and unique (#6316)
9077762 net: expose keepalive option on TcpSocket (#6311)
131e7b4 ci: add spellchecking (#6297)
e53b92a io: clarify clear_ready docs (#6304)
7536132 sync: use AtomicBool in broadcast channel future (#6298)
b6d0c90 macros: fix trait_method breaking change detection (#6308)
4846959 runtime: remove obsolete comment (#6303)
ec30383 net: add UnixSocket (#6290)
f80bbec io: simplify check for empty slice (#6293)
Additional commits viewable in compare view




Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


Dependabot commands and options

You can trigger Dependabot actions by commenting on this PR:

@dependabot rebase will rebase this PR
@dependabot recreate will recreate this PR, overwriting any edits that have been made to it
@dependabot merge will merge this PR after your CI passes on it
@dependabot squash and merge will squash and merge this PR after your CI passes on it
@dependabot cancel merge will cancel a previously requested merge and block automerging
@dependabot reopen will reopen this PR if it is closed
@dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
@dependabot show <dependency name> ignore conditions will show all of the ignore conditions of the specified dependency
@dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-task Module: tokio/task
Projects
None yet
Development

Successfully merging this pull request may close these issues.

try_join_next for JoinSet
2 participants