Skip to content

Commit

Permalink
StreamExt: add a trait for additional Stream methods
Browse files Browse the repository at this point in the history
Primarily, it offers a `timeout` method for streams.
  • Loading branch information
mathstuf committed Sep 7, 2018
1 parent 1666418 commit b89ec6b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ pub mod prelude {

pub use util::{
FutureExt,
StreamExt,
};

pub use ::std::io::{
Expand Down
6 changes: 5 additions & 1 deletion src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
//! Utilities for working with Tokio.
//!
//! This module contains utilities that are useful for working with Tokio.
//! Currently, this only includes [`FutureExt`], but this may grow over time.
//! Currently, this only includes [`FutureExt`] and [`StreamExt`], but this
//! may grow over time.
//!
//! [`FutureExt`]: trait.FutureExt.html
//! [`StreamExt`]: trait.StreamExt.html

mod future;
mod stream;

pub use self::future::FutureExt;
pub use self::stream::StreamExt;
62 changes: 62 additions & 0 deletions src/util/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use tokio_timer::Timeout;

use futures::Stream;

use std::time::Duration;


/// An extension trait for `Stream` that provides a variety of convenient
/// combinator functions.
///
/// Currently, there only is a [`timeout`] function, but this will increase
/// over time.
///
/// Users are not expected to implement this trait. All types that implement
/// `Stream` already implement `StreamExt`.
///
/// This trait can be imported directly or via the Tokio prelude: `use
/// tokio::prelude::*`.
///
/// [`timeout`]: #method.timeout
pub trait StreamExt: Stream {

/// Creates a new stream which allows `self` until `timeout`.
///
/// This combinator creates a new stream which wraps the receiving stream
/// with a timeout. For each item, the returned stream is allowed to execute
/// until it completes or `timeout` has elapsed, whichever happens first.
///
/// If an item completes before `timeout` then the stream will yield
/// with that item. Otherwise the stream will yield to an error.
///
/// # Examples
///
/// ```
/// # extern crate tokio;
/// # extern crate futures;
/// use tokio::prelude::*;
/// use std::time::Duration;
/// # use futures::future::{self, FutureResult};
///
/// # fn long_future() -> FutureResult<(), ()> {
/// # future::ok(())
/// # }
/// #
/// # fn main() {
/// let stream = long_future()
/// .into_stream()
/// .timeout(Duration::from_secs(1))
/// .for_each(|i| future::ok(println!("item = {:?}", i)))
/// .map_err(|e| println!("error = {:?}", e));
///
/// tokio::run(stream);
/// # }
/// ```
fn timeout(self, timeout: Duration) -> Timeout<Self>
where Self: Sized,
{
Timeout::new(self, timeout)
}
}

impl<T: ?Sized> StreamExt for T where T: Stream {}

0 comments on commit b89ec6b

Please sign in to comment.