Skip to content

Commit

Permalink
broadcast: Stop notifying after we've woken all wakers
Browse files Browse the repository at this point in the history
Unboundedly looping within `notify_rx` as long as there are still
available wakers causes a quadratic slowdown as receivers which are
looping receiving from the channel are added. Instead of continually
waiting for new wakers, this commit modifies `notify_rx` to stop trying
to wake wakers once we've notified a number of wakers greater than or
equal to whatever the number of active wakers was when we started
notifying.

Fixes #5923
  • Loading branch information
glittershark committed Aug 10, 2023
1 parent 5d29bdf commit ad58664
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,8 @@ fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
impl<T> Shared<T> {
fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
let mut wakers = WakeList::new();
let active_receivers = tail.rx_cnt;
let mut woken = 0usize;
'outer: loop {
while wakers.can_push() {
match tail.waiters.pop_back() {
Expand All @@ -832,6 +834,7 @@ impl<T> Shared<T> {

if let Some(waker) = waiter.waker.take() {
wakers.push(waker);
woken += 1;
}
}
None => {
Expand All @@ -850,6 +853,10 @@ impl<T> Shared<T> {

wakers.wake_all();

if woken >= active_receivers {
return;
}

// Acquire the lock again.
tail = self.tail.lock();
}
Expand Down

0 comments on commit ad58664

Please sign in to comment.