From 98e4dfce7e63e756c575eb0ef02e0685c3e3aa28 Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Wed, 18 Nov 2020 20:56:46 +0400 Subject: [PATCH] feat(futures-util/stream): add StreamExt::unzip (#2263) --- futures-util/src/stream/mod.rs | 2 +- futures-util/src/stream/stream/mod.rs | 43 ++++++++++++++++ futures-util/src/stream/stream/unzip.rs | 66 +++++++++++++++++++++++++ futures/src/lib.rs | 2 +- 4 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 futures-util/src/stream/stream/unzip.rs diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 1b95837fc0..fe50de3f5c 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -15,7 +15,7 @@ mod stream; pub use self::stream::{ Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt, - StreamFuture, Take, TakeWhile, TakeUntil, Then, Zip, + StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, }; #[cfg(feature = "std")] diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 84b0acd862..b1b4384279 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -29,6 +29,10 @@ mod collect; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::collect::Collect; +mod unzip; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::unzip::Unzip; + mod concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::concat::Concat; @@ -477,6 +481,45 @@ pub trait StreamExt: Stream { assert_future::(Collect::new(self)) } + /// Converts a stream of pairs into a future, which + /// resolves to pair of containers. + /// + /// `unzip()` produces a future, which resolves to two + /// collections: one from the left elements of the pairs, + /// and one from the right elements. + /// + /// The returned future will be resolved when the stream terminates. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::StreamExt; + /// use std::thread; + /// + /// let (tx, rx) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx.unbounded_send((1, 2)).unwrap(); + /// tx.unbounded_send((3, 4)).unwrap(); + /// tx.unbounded_send((5, 6)).unwrap(); + /// }); + /// + /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await; + /// assert_eq!(o1, vec![1, 3, 5]); + /// assert_eq!(o2, vec![2, 4, 6]); + /// # }); + /// ``` + fn unzip(self) -> Unzip + where + FromA: Default + Extend, + FromB: Default + Extend, + Self: Sized + Stream, + { + assert_future::<(FromA, FromB), _>(Unzip::new(self)) + } + /// Concatenate all items of a stream into a single extendable /// destination, returning a future representing the end result. /// diff --git a/futures-util/src/stream/stream/unzip.rs b/futures-util/src/stream/stream/unzip.rs new file mode 100644 index 0000000000..c7846ae23a --- /dev/null +++ b/futures-util/src/stream/stream/unzip.rs @@ -0,0 +1,66 @@ +use core::mem; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project::pin_project; + +/// Future for the [`unzip`](super::StreamExt::unzip) method. +#[pin_project] +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Unzip { + #[pin] + stream: St, + left: FromA, + right: FromB, +} + +impl Unzip { + fn finish(self: Pin<&mut Self>) -> (FromA, FromB) { + let this = self.project(); + ( + mem::replace(this.left, Default::default()), + mem::replace(this.right, Default::default()), + ) + } + + pub(super) fn new(stream: St) -> Self { + Self { + stream, + left: Default::default(), + right: Default::default(), + } + } +} + +impl FusedFuture for Unzip +where St: FusedStream, + FromA: Default + Extend, + FromB: Default + Extend, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +impl Future for Unzip +where St: Stream, + FromA: Default + Extend, + FromB: Default + Extend, +{ + type Output = (FromA, FromB); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(FromA, FromB)> { + let mut this = self.as_mut().project(); + loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(e) => { + this.left.extend(Some(e.0)); + this.right.extend(Some(e.1)); + }, + None => return Poll::Ready(self.finish()), + } + } + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 01f091a16a..4f312ecc77 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -459,7 +459,7 @@ pub mod stream { Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next, SelectNextSome, Peek, Peekable, Scan, Skip, SkipWhile, Take, TakeUntil, - TakeWhile, Then, Zip, + TakeWhile, Then, Unzip, Zip, TryStreamExt, AndThen, ErrInto, MapOk, MapErr, OrElse,