Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

turn the futures-unordered footgun into a status quo story #172

Conversation

nikomatsakis
Copy link
Contributor

@nikomatsakis nikomatsakis commented Apr 24, 2021

Closes #131

@nikomatsakis
Copy link
Contributor Author

Hey @farnz, does everything in here read true to you? I tried to take what you said during that writing session last week and write it out.

Also @Darksonn, I'd appreciate you making sure I don't have any factual inaccuracies.

@Darksonn
Copy link
Contributor

In general, this is a really nasty footgun in the design of Stream. Besides the placement of stream::iter, the story seems correct to me. Another way to fix it would be to write it as:

async fn do_work(database: &Database) {
    let work = do_select(database, FIND_WORK_QUERY).await?;

    stream::iter(work)
        .map(|item| async move {
            let work_item = do_select(database, work_from_item(item)).await;
            process_work_item(database, work_item).await;
        })
        .buffered(5)
        .for_each(|()| std::future::ready(()))
        .await;
}

The above could be further rewritten to use for_each_concurrent. With this method you process the work items immediately, but they are included in the upper limit of five tasks.

To not count them in the upper limit of five, a further way to write it would be:

async fn do_work(database: &Database) {
    let work = do_select(database, FIND_WORK_QUERY).await?;

    let selects = stream::iter(work)
        .map(|item| do_select(database, work_from_item(item)))
        .buffered(5)
        .fuse();
    tokio::pin!(selects);

    let mut results = FuturesUnordered::new();

    loop {
        tokio::select! {
            Some(work_item) = selects.next() => {
                results.push(process_work_item(database, work_item));
            },
            Some(()) = results.next() => { /* do nothing */ },
            else => break,
        }
    }
}

I'm not sure if there is a simpler way to write the above.

@nikomatsakis
Copy link
Contributor Author

nikomatsakis commented Apr 25, 2021

Ah, that's interesting. I had not considered the select! version. I'll include that in the FAQ I think. I imagine there are probably utilities that aim to do something similar. I was also thinking about join, but for that, you'd need a future that 'drains' a stream completely, and I guess maybe one doesn't exist?

I was imagining something like:

join!(
    selects.map(|work_item| async move { results.push(process_work_item(database, work_item)) }).complete(),
    results.complete(),
);

where complete is a method like

impl<T: Stream> StreamExt for T {
    async fn complete(self) {
        while let Some(_) = self.next().await { }
    }
}

@Darksonn
Copy link
Contributor

I mean, my select! is essentially that solution except that by cancelling the results.next() call when a new work item becomes available, you gain the mutable access to results that you need to push a future into it.

but for that, you'd need a future that 'drains' a stream completely, and I guess maybe one doesn't exist?

You can do that with .for_each(|()| std::future::ready(())).await.

src/vision/status_quo/barbara_battles_buffered_streams.md Outdated Show resolved Hide resolved
| ----------- ...and is required to live as long as `'static` here
```

"Ah, right," she says, "spawned tasks can't use borrowed data. I wish I had [rayon] or the scoped threads from [crossbeam]." (What Barbara doesn't realize is that spawning wouldn't actually have fixed her problem anyway: the `for_each` combinator would have awaited the resulting `JoinHandle` and hence it would have blocked... but she could have tweaked her program to fix that if she had gotten that far.)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true that the spawn used here would not have fixed her problem, but spawning would have fixed her problem if she had put the spawn around do_select rather than around process_work_item. (Ignoring any issues regarding 'static)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I should note that in the FAQ,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix we use most commonly is described below - we spawn inside do_select to ensure that the work done there can complete.

@nikomatsakis
Copy link
Contributor Author

I mean, my select! is essentially that solution except that by cancelling the results.next() call when a new work item becomes available, you gain the mutable access to results that you need to push a future into it.

Right, I know they're equivalent, I was just playing around with what it might look like to not "open code" it. But you're right about the borrow checker interactions, good point.

Copy link
Contributor

@farnz farnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, LGTM. Thanks for writing this for me!

src/vision/status_quo/barbara_battles_buffered_streams.md Outdated Show resolved Hide resolved
src/vision/status_quo/barbara_battles_buffered_streams.md Outdated Show resolved Hide resolved
| ----------- ...and is required to live as long as `'static` here
```

"Ah, right," she says, "spawned tasks can't use borrowed data. I wish I had [rayon] or the scoped threads from [crossbeam]." (What Barbara doesn't realize is that spawning wouldn't actually have fixed her problem anyway: the `for_each` combinator would have awaited the resulting `JoinHandle` and hence it would have blocked... but she could have tweaked her program to fix that if she had gotten that far.)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix we use most commonly is described below - we spawn inside do_select to ensure that the work done there can complete.

src/vision/status_quo/barbara_battles_buffered_streams.md Outdated Show resolved Hide resolved
src/vision/status_quo/barbara_battles_buffered_streams.md Outdated Show resolved Hide resolved
src/vision/status_quo/barbara_battles_buffered_streams.md Outdated Show resolved Hide resolved
src/vision/status_quo/barbara_battles_buffered_streams.md Outdated Show resolved Hide resolved
src/vision/status_quo/barbara_battles_buffered_streams.md Outdated Show resolved Hide resolved
nikomatsakis and others added 2 commits April 26, 2021 14:53
Co-authored-by: Simon Farnsworth <simon@farnz.org.uk>
Co-authored-by: Ryan Levick <rylev@users.noreply.github.com>
@nikomatsakis nikomatsakis added the status-quo-story-ideas "Status quo" user story ideas label Apr 28, 2021
@nikomatsakis
Copy link
Contributor Author

I incorporated @farnz's feedback, which I think really improved the story, thanks again for that. It's much more precise now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status-quo-story-ideas "Status quo" user story ideas
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Footgun with futures unordered
4 participants