Permalink
Browse files

Merge pull request #537 from exrook/master

Add with_flat_map sink combinator
  • Loading branch information...
alexcrichton committed Aug 7, 2017
2 parents 3d4f5ae + f43c628 commit 3c067a7f93cd891b3f082bb8555c3a896095dd78
Showing with 179 additions and 0 deletions.
  1. +40 −0 src/sink/mod.rs
  2. +126 −0 src/sink/with_flat_map.rs
  3. +13 −0 tests/sink.rs
View
@@ -12,6 +12,7 @@ use {IntoFuture, Poll, StartSend};
use stream::Stream;
mod with;
mod with_flat_map;
// mod with_map;
// mod with_filter;
// mod with_filter_map;
@@ -73,6 +74,7 @@ if_std! {
}
pub use self::with::With;
pub use self::with_flat_map::WithFlatMap;
pub use self::flush::Flush;
pub use self::send::Send;
pub use self::send_all::SendAll;
@@ -315,6 +317,44 @@ pub trait Sink {
with::new(self, f)
}
/// Composes a function *in front of* the sink.
///
/// This adapter produces a new sink that passes each value through the
/// given function `f` before sending it to `self`.
///
/// To process each value, `f` produces a *stream*, of which each value
/// is passed to the underlying sink. A new value will not be accepted until
/// the stream has been drained
///
/// Note that this function consumes the given sink, returning a wrapped
/// version, much like `Iterator::flat_map`.
///
/// # Examples
/// ---
/// Using this function with an iterator through use of the `stream::iter()`
/// function
///
/// ```
/// use futures::{Sink,Future,Stream};
/// use futures::stream;
/// use futures::sync::mpsc;
///
/// let (tx, rx) = mpsc::channel::<i32>(5);
///
/// let tx = tx.with_flat_map(|x| {
/// stream::iter(vec![42; x].into_iter().map(|y|Ok(y)))
/// });
/// tx.send(5).wait().unwrap();
/// assert_eq!(rx.collect().wait(), Ok(vec![42, 42, 42, 42, 42]))
/// ```
fn with_flat_map<U, F, St>(self, f: F) -> WithFlatMap<Self, U, F, St>
where F: FnMut(U) -> St,
St: Stream<Item = Self::SinkItem, Error=Self::SinkError>,
Self: Sized
{
with_flat_map::new(self, f)
}
/*
fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
where F: FnMut(U) -> Self::SinkItem,
View
@@ -0,0 +1,126 @@
use core::marker::PhantomData;
use {Poll, Async, StartSend, AsyncSink};
use sink::Sink;
use stream::Stream;
/// Sink for the `Sink::with_flat_map` combinator, chaining a computation that returns an iterator
/// to run prior to pushing a value into the underlying sink
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct WithFlatMap<S, U, F, St>
where
S: Sink,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
sink: S,
f: F,
stream: Option<St>,
buffer: Option<S::SinkItem>,
_phantom: PhantomData<fn(U)>,
}
pub fn new<S, U, F, St>(sink: S, f: F) -> WithFlatMap<S, U, F, St>
where
S: Sink,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
WithFlatMap {
sink: sink,
f: f,
stream: None,
buffer: None,
_phantom: PhantomData,
}
}
impl<S, U, F, St> WithFlatMap<S, U, F, St>
where
S: Sink,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
/// Get a shared reference to the inner sink.
pub fn get_ref(&self) -> &S {
&self.sink
}
/// Get a mutable reference to the inner sink.
pub fn get_mut(&mut self) -> &mut S {
&mut self.sink
}
/// Consumes this combinator, returning the underlying sink.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> S {
self.sink
}
fn try_empty_stream(&mut self) -> Poll<(), S::SinkError> {
if let Some(x) = self.buffer.take() {
if let AsyncSink::NotReady(x) = try!(self.sink.start_send(x)) {
self.buffer = Some(x);
return Ok(Async::NotReady);
}
}
if let Some(mut stream) = self.stream.take() {
while let Some(x) = try_ready!(stream.poll()) {
if let AsyncSink::NotReady(x) = try!(self.sink.start_send(x)) {
self.stream = Some(stream);
self.buffer = Some(x);
return Ok(Async::NotReady);
}
}
}
Ok(Async::Ready(()))
}
}
impl<S, U, F, St> Stream for WithFlatMap<S, U, F, St>
where
S: Stream + Sink,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
self.sink.poll()
}
}
impl<S, U, F, St> Sink for WithFlatMap<S, U, F, St>
where
S: Sink,
F: FnMut(U) -> St,
St: Stream<Item = S::SinkItem, Error=S::SinkError>,
{
type SinkItem = U;
type SinkError = S::SinkError;
fn start_send(&mut self, i: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
if try!(self.try_empty_stream()).is_not_ready() {
return Ok(AsyncSink::NotReady(i));
}
assert!(self.stream.is_none());
self.stream = Some((self.f)(i));
try!(self.try_empty_stream());
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
if try!(self.try_empty_stream()).is_not_ready() {
return Ok(Async::NotReady);
}
self.sink.poll_complete()
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
if try!(self.try_empty_stream()).is_not_ready() {
return Ok(Async::NotReady);
}
assert!(self.stream.is_none());
self.sink.close()
}
}
View
@@ -167,6 +167,19 @@ fn with_as_map() {
assert_eq!(sink.get_ref(), &[0, 2, 4]);
}
#[test]
// test simple use of with_flat_map
fn with_flat_map() {
let sink = Vec::new().with_flat_map(|item| {
stream::iter(vec!(item; item).into_iter().map(Ok))
});
let sink = sink.send(0).wait().unwrap();
let sink = sink.send(1).wait().unwrap();
let sink = sink.send(2).wait().unwrap();
let sink = sink.send(3).wait().unwrap();
assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]);
}
// Immediately accepts all requests to start pushing, but completion is managed
// by manually flushing
struct ManualFlush<T> {

0 comments on commit 3c067a7

Please sign in to comment.