-
-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clarify behavior rendezvous channels #145
Comments
What do you mean by 'wake up'? In the extreme case, pessimistic scheduler behaviour could certainly mean that the sender could start running before the receiver starts running, and there's pretty much nothing you can do to stop that.
That's entirely governed by the behaviour of the async runtime. Theoretically, flume won't wake up the sender until it knows that a receiver is currently ready and waiting to receive a message (note that this is not the same thing as the receiver having woken up!), but there's no telling what order the async runtime's scheduler will actually decide to wake the tasks up. I strongly recommend not building your system around very strict assumptions like this. If you want to guarantee ordering, use the synchronisation primitives available to you - like mutexes, signals, etc. - so that you're not relying on changeable behaviour of the runtime. |
Unblocking is probably the better word. I mean that
In a single-threaded executor that is literally impossible, how it's implemented right now.
You absolutely could, and flume already does. It returns
I would consider a rendezvous channel a synchronization primitive.
What I'm describing is runtime-agnostic. |
I think you're getting confused here. There are two distinct concepts here that are not the same. An async task being placed into a ready state first does not necessarily imply that it will be the first task to be run when the scheduler next gets to make such a decision. Consider the following timeline:
Note that all of this applies for a single-threaded async runtime too.
As demonstrated, it does not. What it does guarantee is that for rendezvous queues (i.e: those with a bound of
It is not. One could feasibly imagine reasonable scheduler algorithms that result in either Task 1 or Task 2 being run first after the send occurs (again, this also applies for single-threaded runtimes too).
A rendezvous channel is not a synchronisation primitive in the sense that it does not guarantee a canonical post-rendezvous ordering for sender and receiver. Why? Because to guarantee such an ordering, you need a critical section: and there is no critical section between the two. The instant the |
I understand the confusion now, I misread the code. You are right that
I assumed an implementation where the sender is blocked until the receiver unblocks it. An implementation I wrote does have this feature, and I need it so I guess I can't use flume's |
For the record, this does mean that Even though it is a rendezvous channel where the sender returns Consider this example: #![feature(noop_waker)]
use std::pin::Pin;
use std::task::{Waker, Context};
use std::future::Future;
#[derive(Debug)]
struct MayNeverDrop;
impl Drop for MayNeverDrop {
fn drop(&mut self) {
panic!("dropped");
}
}
async fn produces_neverdrops(s: flume::Sender<MayNeverDrop>) {
loop {
let p = MayNeverDrop;
println!("produced");
match s.send_async(p).await {
Ok(()) => continue,
Err(x) => {
core::mem::forget(x);
break;
}
}
}
}
async fn consumes_neverdrops(r: flume::Receiver<MayNeverDrop>) {
while let Ok(x) = r.recv_async().await {
println!("consumed");
core::mem::forget(x);
}
}
fn main() {
let (send, recv) = flume::bounded(0);
let mut producer = produces_neverdrops(send).boxed();
let mut consumer = consumes_neverdrops(recv).boxed();
let dummy_waker = Waker::noop();
let mut cx = Context::from_waker(&dummy_waker);
dbg!(Future::poll(Pin::new(&mut consumer), &mut cx));
dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
dbg!(Future::poll(Pin::new(&mut consumer), &mut cx));
dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
dbg!(Future::poll(Pin::new(&mut consumer), &mut cx));
dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
println!("cancelling consumer");
drop(consumer);
println!("simulate join() on producer");
dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
println!("leaving scope")
} One might expect that as long as you
since our channel has zero capacity. Instead we see the following:
One might wonder how we managed to pass TWO values per |
To further support the idea that the current design of async fn producer(s: flume::Sender<()>) {
loop {
println!("produced");
s.send_async(()).await.ok();
}
}
async fn consumer(r: flume::Receiver<()>) {
loop {
r.recv_async().await.ok();
println!("consumed");
}
}
fn main() {
for capacity in [3, 2, 1, 0] {
let (send, recv) = flume::bounded(capacity);
let mut producer = producer(send).boxed();
let mut consumer = consumer(recv).boxed();
let dummy_waker = Waker::noop();
let mut cx = Context::from_waker(&dummy_waker);
println!("capacity: {capacity}");
let _ = Future::poll(Pin::new(&mut producer), &mut cx);
let _ = Future::poll(Pin::new(&mut consumer), &mut cx);
let _ = Future::poll(Pin::new(&mut producer), &mut cx);
let _ = Future::poll(Pin::new(&mut consumer), &mut cx);
println!("");
}
} This prints:
As you would expect, for This pattern holds for all |
This is actually a broader issue in the async ecosystem, that of future cancellation. It's a very difficult problem to engineer around (even more so in the case of streams), and is the main thing that makes having flume support both sync and async in the same implementation quite difficult. The sync implementations don't need to deal with this since you can't remotely cancel a thread (or, at least, it's reasonable to expect lost values if this is done).
I don't think that this example demonstrates what you think it's demonstrating. It is entirely possible for both As I said, If you've got the impression that this is a guarantee made by flume, then I apologise: that's not what it's designed to do, and this isn't a bug. If you want that sort of behaviour, then the thing to do is sending some sort of sync primitive to the receiving task. For example, the sending might send a |
My initial opening of the issue was based on a misunderstanding of the current behavior, but I've since come to feel more strongly and that my expected behavior is actually the correct behavior of a zero-capacity channel. A zero-capacity channel should never be able to end up in a position where it owns data, in my opinion, as it models something with no capacity whatsoever. However if your interpretation is that it's not incorrect and thus not a bug we can close this. |
I don't see this as the channel 'owning' data: just that the receiving task is being given ownership of data by the sending task, but is not yet in a state to process that change in ownership. It's an interesting and subtle distinction though, so might be worth further documentation. |
|
For my purpose it is important that the sender does not wake up before the receiver received its value in a rendezvous channel.
It does appear (reading from the code) that
flume
actually does this, only returningPoll::Ready
once a value has physically been received. However, the docs do not guarantee this (emphasis mine):Technically in this scenario (where
send, recv
are fromflume::bounded(0)
):it is allowed to print
ho, hey
instead ofhey, ho
on a single-threaded executor since by the timesend.send_async(()).await
occurs the waiter could be "available" and thus the sender immediately continues.Could the guarantee be given that this never happens? That is, instead of this:
flume would guarantee this:
The text was updated successfully, but these errors were encountered: