Skip to content
Branch: master
Find file History
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
..
Failed to load latest commit information.
src
Cargo.toml
README.md

README.md

stream-util

Crate stream-util provides mechanisms for draining a Receiver or UnboundedReceiver and canceling a Stream.

Usage

To use this crate, add stream-util as a dependency to your project's Cargo.toml:

[dependencies]
stream-util = { git = "https://github.com/lopopolo/punchtop" }

Drain

The extension trait Drainable provides a new Receiver and UnboundedReceiver combinator, drain. Drain yields elements from the underlying channel until the provided Future resolves. It then closes the receiver and continues to yield the remaining elements in the channel until it is empty.

Example: Drain a Channel

The following code creates an mpsc::unbounded channel and drains two messages from the channel after it has been canceled.

use std::thread;
use futures::{Future, Stream};
use stream_util::{self, Drainable};
use futures::sync::mpsc;

let (trigger, valve) = stream_util::valve();
let (sender, receiver) = mpsc::unbounded::<()>();

sender.unbounded_send(()).unwrap();
sender.unbounded_send(()).unwrap();

// Trigger the drain before the channel starts consuming messages. Expect all
// existing messages to be drained from the channel.
trigger.terminate();
let chan = thread::spawn(move || {
    let task = receiver
        .drain(valve)
        .for_each(|_| Ok(()))
        .map_err(|e| eprintln!("receive failed: {:?}", e));
    // start send-receive channel
    tokio::run(task);
});

// The receiver thread will normally never exit, since the sender is open. With a
// `Drain` we can close the receiver and drain any messages still in the channel.
chan.join().unwrap();

Cancel

The extension trait Cancelable provides a new Stream combinator, cancel. Cancel yields elements from the underlying Stream until the provided Future resolves. It then short-circuits the underlying stream by returning Async::Ready(None), which stops polling of the underlying Stream.

Example: Cancel an Interval

The following code creates an infinite stream from a tokio Interval and cancels it.

use std::thread;
use std::time::Duration;
use futures::{Future, Stream};
use stream_util::{self, Cancelable};
use tokio::timer::Interval;

let (trigger, valve) = stream_util::valve();
let interval = thread::spawn(move || {
    let task = Interval::new_interval(Duration::from_millis(250))
        .cancel(valve)
        .for_each(|_| Ok(()))
        .map_err(|e| eprintln!("interval failed: {:?}", e));
    // start send-receive channel
    tokio::run(task);
});

// The interval thread will normally never exit, since the interval repeats
// forever. With a `Cancel` we can short circuit the stream.
trigger.terminate();
interval.join().unwrap();

Trigger and Valve

The valve function returns a tuple of (Trigger, Valve) as a convenience for generating a Future for the drain and cancel combinators that resolves when triggered.

License

stream-util is licensed under the MIT license.

stream-util is based on stream-cancel by Jon Gjengset. stream-cancel is dual-licensed under the MIT and Apache 2.0 licenses.

You can’t perform that action at this time.