Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rethinking the bounded mpsc channel #800

Closed
danburkert opened this issue Feb 24, 2018 · 18 comments
Closed

Rethinking the bounded mpsc channel #800

danburkert opened this issue Feb 24, 2018 · 18 comments
Milestone

Comments

@danburkert
Copy link
Contributor

Hi, perhaps this may eventually need to be an RFC, but I want to get a conversation started with a wider audience about a problematic aspect of the current bounded mpsc channel implementation. Quoting from internal implementation comments:

The general idea is that the channel is created with a buffer size of n.
The channel capacity is n + num-senders. Each sender gets one "guaranteed"
slot to hold a message.

So, every time a Sender is cloned, a new 'slot' is allocated internally, and the first item pushed to that Sender is guaranteed to succeed without blocking. The most common and easiest way to push a value into the channel is via Sink::send(..), which takes self by value.

Bounded mpsc channels are commonly used to send requests to capacity-limited services (tasks). It's critical in such a scenario that the mpsc channel be bounded so that it can apply back-pressure to callers of the service, otherwise 'buffer bloat' can occur where requests are arbitrarily delayed while sitting in internal queues.

Unfortunately, the 'slot' semantics of the current bounded mpsc channel, along with the API of Sink::send means it's very difficult to actually get the bounded sender to apply back-pressure. In circumstances where the caller needs to potentially retry the call if the response is a failure, and thus keep around the Sender as part of the response future, it becomes impossible.

I've been looking into how the current channel implementation might be changed to remove the 'slot' mechanism. There's a promising strategy that @carllerche and @stjepang and I have discussed that would require dropping the Sink impl from Sender and having Sender::send return a custom future type, but there's not working prototype yet. We've also been looking at how the internals of crossbeam-channel might be leveraged to help, which relates to crossbeam-rs/crossbeam-channel#22. Using crossbeam-channel internals may also open the door to making the channel mpmc.

I'm interested to see if others have run into the backpressure/slot issue, and if there is support behind changing the default mpsc bounded channel which ships with futures if a suitable solution can be found.

@ghost
Copy link

ghost commented Feb 24, 2018

And here's what we think a new futures-based channel might look like, more or less:

Click to expand
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>);
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>);

struct Receiver<T> { ... }

unsafe impl<T: Send> Send for Receiver<T> {}
impl<T> Clone for Receiver<T> { ... }

impl<T> Receiver<T> {
    pub fn try_recv(&self) -> Result<T, TryRecvError>;
    pub fn recv(&self) -> RecvFuture<T, RecvError>;
    pub fn poll_ready(&mut self) -> Poll<(), RecvError>;

    pub fn is_empty(&self) -> bool;
    pub fn len(&self) -> usize;
    pub fn capacity(&self) -> Option<usize>;

    pub fn close(&self) -> bool;
    pub fn is_closed(&self) -> bool;
}

impl<T> Stream for Receiver<T> {
    type Item = T;
    type Error = ();
    // ...
}

struct Sender<T> { ... }

unsafe impl<T: Send> Send for Sender<T> {}
impl<T> Clone for Sender<T> { ... }

impl<T> Sender<T> {
    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>>;
    pub fn send(&self, msg: T) -> SendFuture<(), SendError<T>>;
    pub fn poll_ready(&mut self) -> Poll<(), SendError<()>>;

    pub fn is_empty(&self) -> bool;
    pub fn len(&self) -> usize;
    pub fn capacity(&self) -> Option<usize>;

    pub fn close(&self) -> bool;
    pub fn is_closed(&self) -> bool;
}

cc @cramertj

@cramertj
Copy link
Member

@stjepang @danburkert Interesting! Can you explain more about why your preferred design is incompatible with the current Sink trait?

@danburkert
Copy link
Contributor Author

Sure, with a bit of background:

For a futures-aware channel to function correctly, a blocked Sender task must be notified when capacity becomes available (there is a reciprocal arrangement necessary for the Receiver with an mpmc queue, but I'm going to focus on just the Sender for now). There's a delicate balance here, though: you don't want to wakeup every blocked sender if there is just capacity to send a single item. Otherwise you risk a 'thundering herd' issue where many senders are competing for that single unit of capacity, which is really just wasted wakeups and effort.

The simplest solution to the 'thundering herd' is to have the Receiver notify a single blocked task for every item it receives from the channel. This is problematic though, because notified tasks are not guaranteed to actually send an item to the channel. The idea @carllerche had to work around this is to have Sender::send return a future with a custom drop impl which will wakeup an alternate task whenever it's dropped before it is able to send it's item to the channel. Since Sink::send returns a concrete type which doesn't do this, it's not suitable.

In my opinion the Sink trait isn't really all that useful, since it doesn't have the big catalog of combinators and helper functions that, e.g., Future and Stream do. I also think it's much less common to need to abstract over sink types than it is over Futures and Streams.

@danburkert
Copy link
Contributor Author

I glossed over it, but if the notified task doesn't push on to the channel, it's termed a 'lost wakeup', which can lead to deadlock, since other parked tasks will never get notified.

@cramertj
Copy link
Member

cramertj commented Feb 24, 2018

The idea @carllerche had to work around this is to have Sender::send return a future with a custom drop impl which will wakeup an alternate task whenever it's dropped before it is able to send it's item to the channel. Since Sink::send returns a concrete type which doesn't do this, it's not suitable.

Couldn't you add this impl to the Sender itself? Since Sink::send consumes the Sender, you could store the information you need inside the Sender itself so that when Send<Sender> is dropped, another Sender's task is awakened.

@danburkert
Copy link
Contributor Author

That's an interesting idea, it might work.

@cramertj
Copy link
Member

(FWIW I've used the same trick for allowing multiplexed reads on sockets and channels in the past-- keep a list of readers to awaken, mapped to by the ID of the response they're waiting for. When the socket becomes readable, awaken one of them to perform the read, and then have it notify the one who the message is actually for. If the notified read handle is dropped, it awakens a different reader.)

@carllerche
Copy link
Member

The strategy works w/ Sink as well if poll_ready makes the reservation and sending or dropping releases it (same as holding a future).

@carllerche
Copy link
Member

I guess that I should specify that the newly proposed Sink API could support this strategy. Sink in 0.1 does not have a way to impl the strategy of claiming the capacity before producing the value.

@danburkert
Copy link
Contributor Author

Here's a proof-of-concept implementation which leverages some of the crossbeam-channel internals: https://github.com/danburkert/crossbeam-channel/blob/futures-channel/src/futures/mpsc.rs. Using the strategy @cramertj suggested it's able to keep the Sink for Sender impl. I ported over the mpsc tests in the futures-rs repo and it passes them, but there's no doubt things to shake out. I think it's a promising sign in terms of the feasibility of removing the slot requirement, though.

If anyone wants to pick it up and run with it please go ahead.

@seunlanlege
Copy link

@danburkert, I have a small program that i can try this on, will definitely give you feedback.

@seunlanlege
Copy link

just read through the code, Clone is not implemented for Receiver

@danburkert
Copy link
Contributor Author

Right, the proof of concept is an mpsc channel. I think it could be extended to be an mpmc channel, but it would be a bit more complex. I've never had a need for an mpmc channel with futures so I decided to skip that for the first cut.

@danburkert
Copy link
Contributor Author

I pushed a commit to https://github.com/danburkert/crossbeam-channel/tree/futures-channel which greatly expands the implementation comments so that it should be easier to understand the internal channel mechanisms. Hopefully this should make it easier for others to review/extend the implementation.

@ghost
Copy link

ghost commented Mar 3, 2018

EDIT: Moved the comment here since it is not really on topic.

Click to expand

Since we're rethinking our channel design, I'd like to take a step back and
challenge some of the core design decisions in crossbeam-channel, std::sync::mpsc and futures::sync::mpsc.

Here goes a lengthy motivational intro, but bear with me. Or skip it. :)

Motivation: select! for crossbeam-channel is hard to get right

The current select_loop! macro is kinda silly (it's a loop, you can't use break/continue inside it, it causes potentially subtle side effects on every iteration). I'm trying to come up with a new, nicer macro, with fewer surprises, and without the implicit loop.

This is what seems like the best solution so far:

select! {
    recv(receiver, msg)     => {} // `msg` is of type `T`
    send(sender, foo + bar) => {} // sends message `foo + bar`
    closed(receiver)        => {} // fires when `receiver` is closed
    closed(sender)          => {} // fires when `sender` is closed
    default(timeout)        => {} // fires when `timeout` expires
}

And this is how it works:

  1. If any of the recv or send cases can complete without blocking, a random one is chosen. The chosen operation is executed and its block (in this example {} for simplicity) is evaluated.
  2. If any of the closed cases are ready (because a channel is closed), it is evaluated.
  3. If the timeout has expired, the default case is evaluated.
  4. Otherwise, we wait until something changes.

Pros:

  • No surprises (apart from the complexity) - it behaves just as one would expect.
  • Very flexible - allows a lot of freedom in choosing how to react to channel events.

Cons:

  • Using a recv or a send case doesn't nudge you into handling the closed case. For example, this is in contrast to a bare Receiver::recv operation, which returns a Result so the compiler advises you to do something with it (like call .unwrap()).
  • The macro looks kind of complicated.
  • Internal implementation is a little challenging (although not too bad).

There were some other different ideas for the macro but I won't go there.

Folks from Servo (more concretely, @SimonSapin and @nox) were cool with this macro idea, although it wasn't the absolute best option for their use case.

@matthieu-m had a good point that the select! macro should ideally be exhaustive in the sense that it makes sure you don't forget edge cases like a channel being unexpectedly closed.

Now let's see if we can somehow force the user to handle the closed case for every recv and send. One idea is to change recv so that it returns a Result<T, RecvError>:

select! {
    recv(receiver, msg) => {
        match msg {
            Ok(m) => println!("got {:?}", m),
            Err(RecvError) => println!("the channel is closed"),
        }
    }
}

But what about the send case? Here's a try:

let mut greeting = String::new("Howdy!");
select! {
    send(sender, greeting, result) => {
        match result {
            Ok(()) => println!("successfully sent"),
            Err(SendError(m)) => greeting = m,
        }
    }
}

That'd work and eliminate the need for closed case, but it doesn't look very nice. To be fair, users of this select! syntax would typically just raise a panic on 'closed' events:

let mut greeting = String::new("Howdy!");
select! {
    recv(receiver, msg) => {
        let m = msg.expect("the channel is closed");
        println!("got {:?}", m),
    }
    send(sender, greeting, result) => {
        result.expect("the channel is closed");
    }
}

Looks a bit better, but still kind of clumsy.

When does a channel become closed/disconnected?

Depends on the implementation.

  • In std::sync::mpsc: when either the Receiver or all Senders get dropped.
  • In crossbeam-channel: when either all Receivers or all Senders get dropped.
  • In futures::sync::mpsc: when either the Receiver or all Senders get dropped, or when you call Receiver::close.
  • In chan: when all Senders get dropped.
  • In Go: when you call close(sender). It is not possible to close from the receiver side.

These differences are important. The chan crate follows Go's model very closely so they have very similar behavior. The reason why Go allows closing from the sender side is because it follows the principle of unidirectionality.

Unidirectionality means that information flows in one direction only. Closing a channel signals to the receivers that no more messages will arrive, ever. Note that even if all receivers get dropped, sending into the channel works as usual. The whole idea is that receiver side cannot signal anything to the sender side!

Why do we want unidirectionality? See this and this comment by Russ Cox for an explanation.

Channels in Go

Channels in Go are quite peculiar. They come with a bunch of rules that seem arbitrary at first sight, but are actually well-thought-out. See Channel Axioms and Curious Channels by Dave Cheney. The controversial blog post titled Go channels are bad and you should feel bad is worth a read, too, despite the title.

Here are a few interesting rules:

  1. Sending into a closed channel panics. The idea is that senders have to coordinate among themselves so that the last sender closes the channel.

  2. Receiving from a closed channel returns a 'zero value'. This is like returning None, but Go doesn't have sum types.

  3. Sending into a nil channel blocks forever. This is handy when you want to disable a send operation inside a select - just set the sender to nil!

  4. Receiving from a nil channel blocks forever. This is useful for the same reason the previous rule is. See this StackOverflow answer for an example.

How would we port Go's channels to Rust

Let's solve some of the quirks in Go's channels by using two useful language features of Rust: destructors and sum types.

Keeping the idea of unidirectionality, we change the disconnection behavior: channel gets closed only when all Senders get dropped. That's it. This means sending into a channel cannot panic because having a sender implies the channel is not closed. Next, receiving from a closed channel returns an Option<T> rather than a 'zero value'. The chan crate follows the same design - see here and here.

I'm feeling very tempted to redesign crossbeam-channel around the philosophy of Go channels:

impl<T> Sender<T> {
    fn send(&self, msg: T);
}

impl<T> Receiver<T> {
    fn recv(&self) -> Option<T>; // `None` if closed.
}

let greeting = String::new("Howdy!");
select! {
    recv(receiver, msg) => {
        // `msg` is of type `None`
        let m = msg.expect("the channel is closed");
        println!("got {}", m);
    }
    send(sender, greeting) => {
        println!("successfully sent");
    }
    default(timeout) => println!("timed out!"),
}

This is beautifully simple:

  • Easy to understand, especially for programmers coming from the world of Go.
  • No need for those annoying unwraps in sender.send(foo).unwrap(). @BurntSushi is going to like this. :)
  • select! macro doesn't need the closed case.
  • You're advised to handle the possibility of the channel being closed in the recv case. Perhaps we might want to change the Option type to Result.

Some drawbacks:

  • Dropping all receivers doesn't prevent senders from sending. This might or might not be a drawback - we could argue both ways.
  • select! is not as powerful as before. But it probably doesn't matter since all real-world cases should be covered by this simpler version.

Note that we could also accept Option<Sender<T>> and Option<Receiver<T>> in the recv and send cases. That would be equivalent to using nil channels in Go.

We can get all the benefits of Go channels without its weak spots like accidental panics (e.g. sending into a closed channel), accidental deadlocks (e.g. receiving from a nil channel), and incorrect closing (we close automatically when the last Sender gets dropped).

Final words

The simple Go-like channel interface currently seems to me to be sitting in some kind of sweet spot of the design space. I've been thinking about this problem for way too long now, constantly switching from one idea to another. In the end, I'm not sure whether this one is the way to go and need your opinion. Any thoughts? :)

Maybe we should take lessons from Go and design channels in futures::sync::mpsc around a similar philosophy, too? There is something to be said about the precedent of Go, after all. Honestly, the more I think about all this, the more I admire its channel interface, despite all the shortcomings stemming from the lack of language-level features.

cc @arcnmx - you might be interested in this comment, too.

@danburkert
Copy link
Contributor Author

@stjepang is your comment meant to be in the crossbeam-channel repository? Most of it doesn't apply to non-blocking channels, as far as I can tell.

I do have some thoughts on the specific points, though:

  • I'm not going to comment on the select! macro discussion, since I haven't used it, and it's not usable from a futures/async context.
  • I'm fine with removing Receiver::close, although I find the 'unidirectional' argument pretty flimsy. I read the linked discussions and as far as I can tell they explained what unidirectional means in this context, but not why it's useful. In other words, I've never personally needed Receiver::close, but I also don't see a reason to discourage it either.
  • Making Sender::send() not return an error when the Receiver has been dropped would be a huge mistake. This is just asking for task leaks where producer tasks never realize that there's no one listening on the other side, and they continue on their merry way indefinitely.
  • Sender::close() is extremely useful for implementing more complex shutdown sequences. If all Sender instances have to be dropped before the channel is closed, that means all producer contexts need a separate way to be notified of shutdown. If Sender::send() can already fail due to the Receiver being dropped (as I argue it should in the previous bullet), then there's no reason not to support Sender::close().

@ghost
Copy link

ghost commented Mar 4, 2018

@danburkert Yes - in hindsight, I thought the comment would be more relevant to futures but turned out to be mostly about crossbeam-channel. Sorry for the rambling. In order not to derail this thread, I've opened new issues (here and here).

@danburkert
Copy link
Contributor Author

Thanks @stjepang! Those are definitely interesting questions as applied to synchronous channels. Another difference between a sync and async Sender::send API is that the async version pretty much has to return a future, and that currently means having an embedded error type, so the ergonomic improvements can't apply as cleanly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants