Skip to content

Commit

Permalink
Implement Clone for With combinator (#2290)
Browse files Browse the repository at this point in the history
Consider the following use-case of mpsc channels. There are two
categories of producers which produce items of two distinct types,
later unified in one using `With` combinator:

```
let (sender, receiver) = mspc::channel(100);

let download_status = sender.clone().with(|item| {
    futures::future::ok(Status::Download(item))
});

let unpack_status = sender.clone().with(|item| {
    futures::future::ok(Status::Unpack(item))
});
```

It would be convenient for `With` combinator to implement `Clone`,
since the underlying sink, `futures::channel::mpsc::Sender`,
implements it.
  • Loading branch information
akhramov committed Dec 22, 2020
1 parent da71932 commit 8a04b51
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
16 changes: 16 additions & 0 deletions futures-util/src/sink/with.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ where Si: Sink<Item>,
}
}

impl<Si, Item, U, Fut, F> Clone for With<Si, Item, U, Fut, F>
where
Si: Clone,
F: Clone,
Fut: Clone,
{
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
sink: self.sink.clone(),
f: self.f.clone(),
_phantom: PhantomData,
}
}
}

// Forwarding impl of Stream from the underlying sink
impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F>
where S: Stream + Sink<Item>,
Expand Down
35 changes: 35 additions & 0 deletions futures/tests/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,41 @@ fn with_flush_propagate() {
})
}

// test that `Clone` is implemented on `with` sinks
#[test]
fn with_implements_clone() {
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future;
use futures::{SinkExt, StreamExt};

let (mut tx, rx) = mpsc::channel(5);

{
let mut is_positive = tx
.clone()
.with(|item| future::ok::<bool, mpsc::SendError>(item > 0));

let mut is_long = tx
.clone()
.with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));

block_on(is_positive.clone().send(-1)).unwrap();
block_on(is_long.clone().send("123456")).unwrap();
block_on(is_long.send("123")).unwrap();
block_on(is_positive.send(1)).unwrap();
}

block_on(tx.send(false)).unwrap();

block_on(tx.close()).unwrap();

assert_eq!(
block_on(rx.collect::<Vec<_>>()),
vec![false, true, false, true, false]
);
}

// test that a buffer is a no-nop around a sink that always accepts sends
#[test]
fn buffer_noop() {
Expand Down

0 comments on commit 8a04b51

Please sign in to comment.