Skip to content

recv_async returning Disconnected in some highly-concurrent situations even though send succeeds #78

@ecton

Description

@ecton

I've been experiencing a situation where I feel like flume is misbehaving. On a channel, I can verify I've received an Ok(()) from a sender, and receive a disconnect error from flume on the corresponding receiver. This is the smallest example I could create.

Dependencies:

[dependencies]
tokio = { version = "1", features = ["full"] }
flume = { version = "0.10" }
futures = "0.3"
use flume::Sender;

#[tokio::main]
async fn main() {
    let tasks = (0..10).map(|_| sending_loop());

    futures::future::join_all(tasks).await;
}

async fn sending_loop() {
    for _ in 0..1000usize {
        sender_test().await
    }
}

async fn sender_test() {
    let (sender, receiver) = flume::bounded(1);

    tokio::spawn(sending(sender));

    receiver.recv_async().await.unwrap()
}

async fn sending(sender: Sender<()>) {
    sender.send(()).unwrap();
}

Running this on my machine, a 8-core/16-thread Ryzen, I regularly get the output: "thread 'main' panicked at 'called Result::unwrap() on an Err value: Disconnected', src/main.rs:21:33" which corresponds to the recv_async() line.

If this test is written to spawn a lot of tasks that call sender_test in parallel all at once, it doesn't seem to trigger the behavior. However, introducing multiple loops calling sender_test repeatedly causes the behavior change.

I couldn't figure out how to simplify this example further.

To work around the issue, you can clone the sender being passed into sending(), but this prevents actual disconnections from happening.

Update 1: It appears this was introduced between 0.9.2 and 10.0. I'm not able to reproduce this behavior with 0.9.2.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions