Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic StreamExt::{flatten_unordered, flat_map_unordered} impls #2083

Merged
merged 23 commits into from Feb 6, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7dc39b0
Basic `StreamExt::flat_map_unordered` impl
olegnn Feb 14, 2020
090d95d
Return `StreamFut` instead of stream, small code refactoring
olegnn Feb 17, 2020
a710587
Exposed FlatMapUnordered
olegnn Feb 17, 2020
6e38d6e
Reordered next_item and inner `stream` and `futures` checks
olegnn Feb 17, 2020
aba9d26
Correct feature gate for FlatMapUnordered
olegnn Feb 17, 2020
3bbfefe
`StreamFut` => `PollStreamFut` with updated constructor
olegnn Feb 17, 2020
dae2bb9
Removed unnecessary parentheses + fixed test
olegnn Feb 17, 2020
abe6e0b
Updated comments
olegnn Feb 17, 2020
c559563
Changed `SharedPollState::set_or` Ordering + added more tests for `fl…
olegnn Feb 18, 2020
c2641d4
Fixed unrelated test (`https://github.com/rust-lang/rust/issues/69238…
olegnn Feb 18, 2020
bd9a86c
Improve performance by modifying already created `PollWaker`s instead…
olegnn Feb 18, 2020
b7c27bc
Fixed docs
olegnn Feb 22, 2020
42a8152
FlattenUnordered + merged master
olegnn May 17, 2020
ead85ec
FlattenUnordered + FlatMapUnordered
olegnn May 17, 2020
15f5866
Merge remote-tracking branch 'origin2/master' into stream_flat_map_un…
olegnn Nov 28, 2021
9bc4e4e
FlattenUnordered: check waker panic
olegnn Jan 9, 2021
b39e690
Make panic handling better + significantly improve performance by pol…
olegnn Jan 9, 2022
952ccf7
Improve logic and docs
olegnn Jan 9, 2022
12bee98
Misc tweaks
olegnn Jan 17, 2022
4091d66
Attempting to fix Miri
olegnn Jan 23, 2022
6d0ccde
Misc improvements
olegnn Jan 26, 2022
69f60cc
Add `woken` state
olegnn Jan 26, 2022
a50c9e4
Tweaks
olegnn Jan 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 66 additions & 0 deletions futures-util/benches/flatten_unordered.rs
@@ -0,0 +1,66 @@
#![feature(test)]

extern crate test;
use crate::test::Bencher;

use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::{self, FutureExt};
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use std::collections::VecDeque;
use std::thread;

#[bench]
fn oneshot_streams(b: &mut Bencher) {
const STREAM_COUNT: usize = 10_000;
const STREAM_ITEM_COUNT: usize = 1;

b.iter(|| {
let mut txs = VecDeque::with_capacity(STREAM_COUNT);
let mut rxs = Vec::new();

for _ in 0..STREAM_COUNT {
let (tx, rx) = oneshot::channel();
txs.push_back(tx);
rxs.push(rx);
}

thread::spawn(move || {
let mut last = 1;
while let Some(tx) = txs.pop_front() {
let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
last += STREAM_ITEM_COUNT;
}
});

let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
async {
if let Some(next) = vals.next() {
let val = next.await.unwrap();
Some((val, vals))
} else {
None
}
}
.boxed()
})
.flatten_unordered(None);

block_on(future::poll_fn(move |cx| {
let mut count = 0;
loop {
match flatten.poll_next_unpin(cx) {
Poll::Ready(None) => break,
Poll::Ready(Some(_)) => {
count += 1;
}
_ => {}
}
}
assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);

Poll::Ready(())
}))
});
}