Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't take the tail lock when dropping broadcast channel's Recv future #6298

Merged
merged 9 commits into from
Jan 27, 2024

Conversation

vnetserg
Copy link
Contributor

@vnetserg vnetserg commented Jan 19, 2024

Motivation

As stated in #5465, the broadcast channel is subject to contention when the number of consumers is high. This PR tries to alleviate this issue by allowing Recv::drop to skip taking a lock if a value is successfully received.

Solution

The proposed solution is to make the queued variable atomic. This way, Recv can atomically determine that the contained waiter is not enqueued for notification, so no synchronization with producers is required.

The PR includes a benchmark that on my machine shows the following results vs master:

contention/10           time:   [3.6555 ms 3.6723 ms 3.6895 ms]
                        change: [+1.6649% +2.3708% +3.0364%] (p = 0.00 < 0.05)

contention/100          time:   [18.266 ms 18.300 ms 18.336 ms]
                        change: [-13.471% -13.245% -13.032%] (p = 0.00 < 0.05)

contention/500          time:   [75.161 ms 75.410 ms 75.631 ms]
                        change: [-22.168% -21.858% -21.588%] (p = 0.00 < 0.05)

contention/1000         time:   [126.48 ms 130.42 ms 134.38 ms]
                        change: [-43.144% -41.486% -39.650%] (p = 0.00 < 0.05)

The low-contention case is slightly degraded (maybe just noise), but under high contention the proposed solution performs significantly better.

See #6284 for other approaches and more discussion.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Jan 19, 2024
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Jan 19, 2024
Comment on lines 902 to 903
// Safety: `tail` lock is still held.
let waiter = unsafe { waiter.as_mut() };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can no longer take a mutable reference to the entire waiter. You don't have mutable access to queued anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

// to synchronize with `Recv::drop` here (calling
// `Receiver::recv_ref` with a waiter implies ownership
// of the corresponding `Recv`).
if !(*ptr).queued.swap(true, Relaxed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you have exclusive access, so just perform a normal write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turned out that loom AtomicBool does not support get_mut, so I resolved to using relaxed load/store.

// Relaxed order suffices because we hold the tail lock.
let queued = self
.waiter
.with(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably here we can also use a non-atomic access.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

// `Release` is needed to synchronize with `Recv::drop`.
// It is critical to set this variable **after** waker
// is extracted, otherwise we may data race with `Recv::drop`.
assert!((*waiter.as_ptr()).queued.swap(false, Release));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This swap is equivalent to store.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be better to use load for the assertion and then store? Or simply remove the assertion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed the assertion.

A load-then-store is fine.

@@ -127,7 +127,8 @@ use std::future::Future;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::AtomicBool;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this needs to be a loom import

@hawkw hawkw self-requested a review January 24, 2024 18:05
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@Darksonn Darksonn enabled auto-merge (squash) January 27, 2024 18:43
@Darksonn Darksonn merged commit 7536132 into tokio-rs:master Jan 27, 2024
75 checks passed
kodiakhq bot pushed a commit to pdylanross/fatigue that referenced this pull request Feb 5, 2024
Bumps tokio from 1.35.1 to 1.36.0.

Release notes
Sourced from tokio's releases.

Tokio v1.36.0
1.36.0 (February 2nd, 2024)
Added

io: add tokio::io::Join (#6220)
io: implement AsyncWrite for Empty (#6235)
net: add support for anonymous unix pipes (#6127)
net: add UnixSocket (#6290)
net: expose keepalive option on TcpSocket (#6311)
sync: add {Receiver,UnboundedReceiver}::poll_recv_many (#6236)
sync: add Sender::{try_,}reserve_many (#6205)
sync: add watch::Receiver::mark_unchanged (#6252)
task: add JoinSet::try_join_next (#6280)

Changed

io: make copy cooperative (#6265)
io: make repeat and sink cooperative (#6254)
io: simplify check for empty slice (#6293)
process: use pidfd on Linux when available (#6152)
sync: use AtomicBool in broadcast channel future (#6298)

Documented

io: clarify clear_ready docs (#6304)
net: document that *Fd traits on TcpSocket are unix-only (#6294)
sync: document FIFO behavior of tokio::sync::Mutex (#6279)
chore: typographic improvements (#6262)
runtime: remove obsolete comment (#6303)
task: fix typo (#6261)

#6220: tokio-rs/tokio#6220
#6235: tokio-rs/tokio#6235
#6127: tokio-rs/tokio#6127
#6290: tokio-rs/tokio#6290
#6311: tokio-rs/tokio#6311
#6236: tokio-rs/tokio#6236
#6205: tokio-rs/tokio#6205
#6252: tokio-rs/tokio#6252
#6280: tokio-rs/tokio#6280
#6265: tokio-rs/tokio#6265
#6254: tokio-rs/tokio#6254
#6293: tokio-rs/tokio#6293
#6238: tokio-rs/tokio#6238
#6152: tokio-rs/tokio#6152
#6298: tokio-rs/tokio#6298
#6262: tokio-rs/tokio#6262
#6303: tokio-rs/tokio#6303
#6261: tokio-rs/tokio#6261


... (truncated)


Commits

eaf81ed chore: prepare Tokio v1.36.0 (#6312)
53f9e5a ci: make sure dictionary words are sorted and unique (#6316)
9077762 net: expose keepalive option on TcpSocket (#6311)
131e7b4 ci: add spellchecking (#6297)
e53b92a io: clarify clear_ready docs (#6304)
7536132 sync: use AtomicBool in broadcast channel future (#6298)
b6d0c90 macros: fix trait_method breaking change detection (#6308)
4846959 runtime: remove obsolete comment (#6303)
ec30383 net: add UnixSocket (#6290)
f80bbec io: simplify check for empty slice (#6293)
Additional commits viewable in compare view




Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


Dependabot commands and options

You can trigger Dependabot actions by commenting on this PR:

@dependabot rebase will rebase this PR
@dependabot recreate will recreate this PR, overwriting any edits that have been made to it
@dependabot merge will merge this PR after your CI passes on it
@dependabot squash and merge will squash and merge this PR after your CI passes on it
@dependabot cancel merge will cancel a previously requested merge and block automerging
@dependabot reopen will reopen this PR if it is closed
@dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
@dependabot show <dependency name> ignore conditions will show all of the ignore conditions of the specified dependency
@dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants