From e09a7e4bbac814257b15f620718873f46808db37 Mon Sep 17 00:00:00 2001 From: Artem Khramov Date: Sun, 13 Dec 2020 17:32:33 +0300 Subject: [PATCH 1/2] Derive Clone on With combinator Consider the following use-case of mpsc channels. There are two categories of producers which produce items of two distinct types, later unified in one using `With` combinator: ``` let (sender, receiver) = mspc::channel(100); let download_status = sender.clone().with(|item| { futures::future::ok(Status::Download(item)) }); let unpack_status = sender.clone().with(|item| { futures::future::ok(Status::Unpack(item)) }); ``` It would be convenient for `With` combinator to implement `Clone`, since the underlying sink, `futures::channel::mpsc::Sender`, implements it. This change adds `#[derive(Clone)]` to `With` combinator --- futures-util/src/sink/with.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/futures-util/src/sink/with.rs b/futures-util/src/sink/with.rs index b1d2869541..7773c2faa4 100644 --- a/futures-util/src/sink/with.rs +++ b/futures-util/src/sink/with.rs @@ -11,6 +11,7 @@ use pin_project_lite::pin_project; pin_project! { /// Sink for the [`with`](super::SinkExt::with) method. #[must_use = "sinks do nothing unless polled"] + #[derive(Clone)] pub struct With { #[pin] sink: Si, From 192f8e36e5d36edd24aa3d0d25411186417e6fc8 Mon Sep 17 00:00:00 2001 From: Artem Khramov Date: Sun, 13 Dec 2020 23:54:55 +0300 Subject: [PATCH 2/2] Derive Clone on With combinator Address review comments This change * Implements `Clone` for `With` manually since we don't want to have `U: Clone, Item: Clone` bounds. * Adds a test case for `Clone` implementation. --- futures-util/src/sink/with.rs | 17 ++++++++++++++++- futures/tests/sink.rs | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/futures-util/src/sink/with.rs b/futures-util/src/sink/with.rs index 7773c2faa4..73b87b72fc 100644 --- a/futures-util/src/sink/with.rs +++ b/futures-util/src/sink/with.rs @@ -11,7 +11,6 @@ use pin_project_lite::pin_project; pin_project! { /// Sink for the [`with`](super::SinkExt::with) method. #[must_use = "sinks do nothing unless polled"] - #[derive(Clone)] pub struct With { #[pin] sink: Si, @@ -54,6 +53,22 @@ where Si: Sink, } } +impl Clone for With +where + Si: Clone, + F: Clone, + Fut: Clone, +{ + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + sink: self.sink.clone(), + f: self.f.clone(), + _phantom: PhantomData, + } + } +} + // Forwarding impl of Stream from the underlying sink impl Stream for With where S: Stream + Sink, diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index 83a6a78532..597ed34c7a 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -483,6 +483,41 @@ fn with_flush_propagate() { }) } +// test that `Clone` is implemented on `with` sinks +#[test] +fn with_implements_clone() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future; + use futures::{SinkExt, StreamExt}; + + let (mut tx, rx) = mpsc::channel(5); + + { + let mut is_positive = tx + .clone() + .with(|item| future::ok::(item > 0)); + + let mut is_long = tx + .clone() + .with(|item: &str| future::ok::(item.len() > 5)); + + block_on(is_positive.clone().send(-1)).unwrap(); + block_on(is_long.clone().send("123456")).unwrap(); + block_on(is_long.send("123")).unwrap(); + block_on(is_positive.send(1)).unwrap(); + } + + block_on(tx.send(false)).unwrap(); + + block_on(tx.close()).unwrap(); + + assert_eq!( + block_on(rx.collect::>()), + vec![false, true, false, true, false] + ); +} + // test that a buffer is a no-nop around a sink that always accepts sends #[test] fn buffer_noop() {