Skip to content

Commit

Permalink
feat(futures-util/stream): add StreamExt::unzip (#2263)
Browse files Browse the repository at this point in the history
  • Loading branch information
binier committed Nov 18, 2020
1 parent 34015c8 commit 98e4dfc
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 2 deletions.
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Expand Up @@ -15,7 +15,7 @@ mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
StreamFuture, Take, TakeWhile, TakeUntil, Then, Zip,
StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
};

#[cfg(feature = "std")]
Expand Down
43 changes: 43 additions & 0 deletions futures-util/src/stream/stream/mod.rs
Expand Up @@ -29,6 +29,10 @@ mod collect;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::collect::Collect;

mod unzip;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::unzip::Unzip;

mod concat;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::concat::Concat;
Expand Down Expand Up @@ -477,6 +481,45 @@ pub trait StreamExt: Stream {
assert_future::<C, _>(Collect::new(self))
}

/// Converts a stream of pairs into a future, which
/// resolves to pair of containers.
///
/// `unzip()` produces a future, which resolves to two
/// collections: one from the left elements of the pairs,
/// and one from the right elements.
///
/// The returned future will be resolved when the stream terminates.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::channel::mpsc;
/// use futures::stream::StreamExt;
/// use std::thread;
///
/// let (tx, rx) = mpsc::unbounded();
///
/// thread::spawn(move || {
/// tx.unbounded_send((1, 2)).unwrap();
/// tx.unbounded_send((3, 4)).unwrap();
/// tx.unbounded_send((5, 6)).unwrap();
/// });
///
/// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await;
/// assert_eq!(o1, vec![1, 3, 5]);
/// assert_eq!(o2, vec![2, 4, 6]);
/// # });
/// ```
fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
where
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
Self: Sized + Stream<Item = (A, B)>,
{
assert_future::<(FromA, FromB), _>(Unzip::new(self))
}

/// Concatenate all items of a stream into a single extendable
/// destination, returning a future representing the end result.
///
Expand Down
66 changes: 66 additions & 0 deletions futures-util/src/stream/stream/unzip.rs
@@ -0,0 +1,66 @@
use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use pin_project::pin_project;

/// Future for the [`unzip`](super::StreamExt::unzip) method.
#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Unzip<St, FromA, FromB> {
#[pin]
stream: St,
left: FromA,
right: FromB,
}

impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> {
fn finish(self: Pin<&mut Self>) -> (FromA, FromB) {
let this = self.project();
(
mem::replace(this.left, Default::default()),
mem::replace(this.right, Default::default()),
)
}

pub(super) fn new(stream: St) -> Self {
Self {
stream,
left: Default::default(),
right: Default::default(),
}
}
}

impl<St, A, B, FromA, FromB> FusedFuture for Unzip<St, FromA, FromB>
where St: FusedStream<Item = (A, B)>,
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<St, A, B, FromA, FromB> Future for Unzip<St, FromA, FromB>
where St: Stream<Item = (A, B)>,
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
{
type Output = (FromA, FromB);

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(FromA, FromB)> {
let mut this = self.as_mut().project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(e) => {
this.left.extend(Some(e.0));
this.right.extend(Some(e.1));
},
None => return Poll::Ready(self.finish()),
}
}
}
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Expand Up @@ -459,7 +459,7 @@ pub mod stream {
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
Fold, Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next,
SelectNextSome, Peek, Peekable, Scan, Skip, SkipWhile, Take, TakeUntil,
TakeWhile, Then, Zip,
TakeWhile, Then, Unzip, Zip,

TryStreamExt,
AndThen, ErrInto, MapOk, MapErr, OrElse,
Expand Down

0 comments on commit 98e4dfc

Please sign in to comment.