Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TryFlattenUnordered #2577

Merged
merged 11 commits into from
Apr 16, 2022
7 changes: 3 additions & 4 deletions futures-util/benches/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::test::Bencher;

use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::{self, FutureExt};
use futures::future;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use std::collections::VecDeque;
Expand Down Expand Up @@ -35,15 +35,14 @@ fn oneshot_streams(b: &mut Bencher) {
});

let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
async {
Box::pin(async {
if let Some(next) = vals.next() {
let val = next.await.unwrap();
Some((val, vals))
} else {
None
}
}
.boxed()
})
})
.flatten_unordered(None);

Expand Down
12 changes: 7 additions & 5 deletions futures-util/src/future/try_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub struct TrySelect<A, B> {

impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}

type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>;
type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>;

/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either one of both
Expand Down Expand Up @@ -52,18 +55,17 @@ where
A: TryFuture + Unpin,
B: TryFuture + Unpin,
{
super::assert_future::<
Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>,
_,
>(TrySelect { inner: Some((future1, future2)) })
super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
inner: Some((future1, future2)),
})
}

impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
where
A: TryFuture,
B: TryFuture,
{
type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>;
type Output = Result<EitherOk<A, B>, EitherErr<A, B>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
Expand Down
15 changes: 9 additions & 6 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
#[allow(clippy::module_inception)]
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold,
TryForEach, Unzip, Zip,
All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
TryFold, TryForEach, Unzip, Zip,
};

#[cfg(feature = "std")]
Expand All @@ -39,7 +39,10 @@ pub use self::stream::Forward;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent, TryForEachConcurrent};
pub use self::stream::{
BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent,
TryForEachConcurrent,
};

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "sink")]
Expand All @@ -61,7 +64,7 @@ pub use self::try_stream::IntoAsyncRead;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryBufferUnordered, TryBuffered};
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryFlattenUnordered};

#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
Expand Down
Loading