Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add broadcast channel #1943

Merged
merged 10 commits into from
Dec 18, 2019
Merged

sync: add broadcast channel #1943

merged 10 commits into from
Dec 18, 2019

Conversation

carllerche
Copy link
Member

@carllerche carllerche commented Dec 11, 2019

Adds a broadcast channel implementation. A broadcast channel is a
multi-producer, multi-consumer channel where each consumer receives a
clone of every value sent. This is useful for implementing pub / sub
style patterns.

Implemented as a ring buffer, a Vec of the specified capacity is
allocated on initialization of the channel. Values are pushed into
slots.

When the channel is full, a send overwrites the oldest value. Receivers
detect this and return an error on the next call to receive. This
prevents unbounded buffering and does not make the channel vulnerable to
the slowest consumer.

This is ready for review, but should not be merged until it has some
application validation.

Closes: #1585

Adds a broadcast channel implementation. A broadcast channel is a
multi-producer, multi-consumer channel where each consumer receives a
clone of every value sent. This is useful for implementing pub / sub
style patterns.

Implemented as a ring buffer, a Vec of the specified capacity is
allocated on initialization of the channel. Values are pushed into
slots.

When the channel is full, a send overwrites the oldest value. Receivers
detect this and return an error on the next call to receive. This
prevents unbounded buffering and does not make the channel vulnerable to
the slowest consumer.

Closes: #1585
@carllerche
Copy link
Member Author

One question is if Sender::send should return the number of active receivers at the time of send.

@Matthias247
Copy link
Contributor

+1 on the bounded implementation! That part it the hardest for any "broadcast" like implementation.

API-wise you could cross-check against futures-intrusive::channel::StateBroadcastChannel, which is pretty much the same thing - but with a channel size preconfigured to 1, so it will only retain the latest element. This is the "latest state", which consumers are typically interested in. Actually it could be made configurable there too.

Anyway, what future-intrusive does is having the items IDs inside the API. Every successful receive gives you the ID of the next item to receive. Here it seems like you capture it in the Stream. Which obviously has the benefit of staying more true to the Stream API. With explicit IDs

  • users could see how many updates they missed. But you probably could (or should!) provide this error in the Err variant of receive.
  • can stop and resume reads from the channel without having to persist a Future or the channel directly. Just persisting the index is good enough. But to be fair I didn't really had a use-case for this yet.

@jonhoo
Copy link
Sponsor Contributor

jonhoo commented Dec 11, 2019

cc @habnabit

tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Show resolved Hide resolved
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
`send` now returns the number of receivers subscribed at the time `send`
is called.

The `Lagged` error now includes the number of skipped messages.
@carllerche
Copy link
Member Author

@Matthias247

I added the number of skipped messages to the Lagged error.

I also return from send the number of receivers subscribed at that moment.

carllerche and others added 6 commits December 11, 2019 13:52
Co-Authored-By: Eliza Weisman <eliza@buoyant.io>
Co-Authored-By: Eliza Weisman <eliza@buoyant.io>
Comment on lines +506 to +508
if tail.rx_cnt == MAX_RECEIVERS {
panic!("max receivers");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/tioli: could be

assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");

@jebrosen
Copy link
Contributor

I've been trying this out in a demo application of mine (commit at https://git.jebrosen.com/jeb/rocket-rooms/commit/e3fa0b106566a55a9bbcb82854b6be71cd254fff) and I really like it so far. It's as good or better than my naive wrapper over Watch was in terms of throughput - not sure exactly how much, because my test harness is a bit disappointing right now.

@carllerche
Copy link
Member Author

@jebrosen thanks for giving it a run, I tried it a bit locally and also works for me. I'm happy merging this if others are 👍

@carllerche
Copy link
Member Author

@LucioFranco @jonhoo @hawkw i need a +1 on this to merge :)


/// Receiving-half of the [`broadcast`] channel.
///
/// May not be used concurrently. Messages may be retrieved using
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// May not be used concurrently. Messages may be retrieved using
/// Should not be used concurrently. Messages may be retrieved using

I think this sounds a bit better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"should not" seems like a suggestion and "may not" is prohibited?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah then you want Must not

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't ask my why I know this is here but https://www.faa.gov/air_traffic/publications/atpubs/atc_html/chap1_section_2.html is a good reference :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find all of these wordings confusing. "May not / must not / should not" reads like something I have to keep in mind when I use the API otherwise some bad thing will happen, but the only consequence I can see of using this concurrently is that rustc will complain because all of the (public) methods take &mut self. std::sync::mpsc::Receiver uses the wording "This half can only be owned by one thread" which I'm also not quite happy with but IMO is less open to confusion.

/// assert_eq!(20, value);
/// }
/// ```
pub fn subscribe(&self) -> Receiver<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there value in having this versus the way we normally do it with a mpsc? May make sense to align the apis

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subscribing starts at the head of the list. A receiver may currently be pointing to any index. A clone() would then not return an "identical" item. I thought it didn't match.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, Im not gonna reread the docs but can we make sure this is documented somewhere. This point is totally fair.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we maybe add a method to Reciever that can get you one that starts at the head?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would opt to punt on that until there is a clear case where that is helpful. I'm not sure what it would be named or if it makes sense 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know me I like to punt 👍

/// internal cursor is updated to point to the oldest value still held by
/// the channel. A subsequent call to [`try_recv`] will return this value
/// **unless** it has been since overwritten.
///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing doc on TryRecvError::Empty

Copy link
Member

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a high level this LGTM, hopping loom covers most of the cases 😄

@carllerche carllerche merged commit 7c010ed into master Dec 18, 2019
@carllerche carllerche deleted the broadcast branch January 10, 2020 19:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Adding a broadcast channel to Tokio Sync?
6 participants