tokio-process-stream is a simple crate that wraps a tokio::process
into a
tokio::stream
Having a stream interface to processes is useful when we have multiple sources of data that we want to merge and start processing from a single entry point.
This crate provides a futures::stream::Stream
wrapper for tokio::process::Child
. The
main struct is ProcessLineStream
, which implements the trait, yielding one Item
enum
at a time, each containing one line from either stdout (Item::Stdout
) or stderr
(Item::Stderr
) of the underlying process until it exits. At this point, the stream
yields a single Item::Done
and finishes.
Example usage:
use tokio_process_stream::ProcessLineStream;
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut sleep_cmd = Command::new("sleep");
sleep_cmd.args(&["1"]);
let ls_cmd = Command::new("ls");
let sleep_procstream = ProcessLineStream::try_from(sleep_cmd)?;
let ls_procstream = ProcessLineStream::try_from(ls_cmd)?;
let mut procstream = sleep_procstream.merge(ls_procstream);
while let Some(item) = procstream.next().await {
println!("{:?}", item);
}
Ok(())
}
It is also possible to stream Item<Bytes>
chunks with ProcessChunkStream
.
use tokio_process_stream::{Item, ProcessChunkStream};
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
.arg("-c")
.arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
.try_into()?;
while let Some(item) = procstream.next().await {
println!("{:?}", item);
}
Ok(())
}