Skip to content

Commit

Permalink
Add future::flatten_stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kpp committed Feb 23, 2019
1 parent f333266 commit 06702ec
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -18,7 +18,7 @@ Future
- [x] future::and_then
- [x] future::err_into
- [x] future::flatten
- [ ] future::flatten_stream
- [x] future::flatten_stream
- [x] future::inspect
- [ ] future::into_stream
- [x] future::map
Expand Down
37 changes: 37 additions & 0 deletions src/future.rs
@@ -1,4 +1,5 @@
use futures::future::Future;
use futures::stream::Stream;

pub async fn ready<T>(value: T) -> T {
value
Expand Down Expand Up @@ -101,6 +102,28 @@ pub async fn unwrap_or_else<Fut, T, E, F>(future: Fut, f: F) -> T
future_result.unwrap_or_else(f)
}

pub fn flatten_stream<Fut, St, T>(future: Fut) -> impl Stream<Item = T>
where Fut: Future<Output = St>,
St: Stream<Item = T>,
{
use crate::stream::next;
futures::stream::unfold((Some(future), None), async move | (future, stream)| {
match (future, stream) {
(Some(future), None) => {
let stream = await!(future);
let mut stream = Box::pin(stream);
let item = await!(next(&mut stream));
item.map(|item| (item, (None, Some(stream))))
},
(None, Some(mut stream)) => {
let item = await!(next(&mut stream));
item.map(|item| (item, (None, Some(stream))))
},
_ => unreachable!()
}
})
}

#[cfg(test)]
mod tests {
use futures::executor;
Expand Down Expand Up @@ -204,4 +227,18 @@ mod tests {
assert_eq!(await!(new_future), ());
});
}

#[test]
fn test_flatten_stream() {
use futures::stream;
use crate::stream::collect;
executor::block_on(async {
let stream_items = vec![17, 18, 19];
let future_of_a_stream = ready(stream::iter(stream_items));

let stream = flatten_stream(future_of_a_stream);
let list: Vec<_> = await!(collect(stream));
assert_eq!(list, vec![17, 18, 19]);
});
}
}

0 comments on commit 06702ec

Please sign in to comment.