Skip to content

Commit

Permalink
Initial code and CI - code imported from ogle
Browse files Browse the repository at this point in the history
  • Loading branch information
lpenz committed Jan 5, 2022
1 parent c96124a commit ddb22d4
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 1 deletion.
77 changes: 77 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
---
name: CI
on: [ push, pull_request ]
jobs:
omnilint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: docker://lpenz/omnilint:v0.2
# Rust actions from:
# https://github.com/actions-rs/meta/blob/master/recipes/quickstart.md
cargo-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
with:
command: check
test-coverage:
name: cargo test with coveralls
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- id: coverage
uses: docker://lpenz/ghaction-rust-coverage:0.5.0
- uses: coverallsapp/github-action@v1.1.2
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: ${{ steps.coverage.outputs.report }}
rustfmt:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- run: rustup component add rustfmt
- uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- run: rustup component add clippy
- uses: actions-rs/cargo@v1
with:
command: clippy
args: -- -D warnings
publish-crate:
needs: [ omnilint, cargo-check, test-coverage, rustfmt, clippy ]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- id: version
uses: docker://lpenz/ghaction-version-gen:0.8.0
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- uses: katyo/publish-crates@v1
if: steps.version.outputs.version_tagged != ''
with:
registry-token: ${{ secrets.CARGO_REGISTRY_TOKEN }}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
/Cargo.lock
16 changes: 16 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "tokio-process-stream"
description = "Simple crate that wraps a tokio::process into a tokio::stream"
version = "0.1.0"
authors = ["Leandro Lisboa Penz <lpenz@lpenz.org>"]
edition = "2021"
license = "MIT"
readme = "README.md"
homepage = "https://github.com/lpenz/tokio-process-stream"
repository = "https://github.com/lpenz/tokio-process-stream"

[dependencies]
anyhow = "1"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "process"] }
tokio-stream = { version = "0", features = ["io-util"] }
pin-project-lite = "0"
54 changes: 53 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,54 @@
[![CI](https://github.com/lpenz/tokio-process-stream/actions/workflows/ci.yml/badge.svg)](https://github.com/lpenz/tokio-process-stream/actions/workflows/ci.yml)
[![coveralls](https://coveralls.io/repos/github/lpenz/tokio-process-stream/badge.svg?branch=main)](https://coveralls.io/github/lpenz/tokio-process-stream?branch=main)
[![crates.io](https://img.shields.io/crates/v/tokio-process-stream)](https://crates.io/crates/tokio-process-stream)

# tokio-process-stream
Simple crate that wraps a tokio::process into a tokio::stream

tokio-process-stream is a simple crate that wraps a [`tokio::process`] into a
[`tokio::stream`]

Having a stream interface to processes is useful when we have multiple sources of data that
we want to merge and start processing from a single entry point.

This crate provides a [`tokio_stream::Stream`] wrapper for [`tokio::process::Child`]. The
main struct is [`ProcessStream`], which implements the trait, yielding one [`Item`] enum at
a time, each containing one line from either stdout ([`Item::Stdout`]) or stderr
([`Item::Stderr`]) of the underlying process until it exits. At this point, the stream
yields a single [`Item::Done`] and finishes.

Example usage:

```rust
use tokio_process_stream::ProcessStream;
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut sleep_cmd = Command::new("sleep");
sleep_cmd.args(&["1"]);
let ls_cmd = Command::new("ls");

let sleep_procstream = ProcessStream::try_from(sleep_cmd)?;
let ls_procstream = ProcessStream::try_from(ls_cmd)?;
let mut procstream = sleep_procstream.merge(ls_procstream);

while let Some(item) = procstream.next().await {
println!("{:?}", item);
}

Ok(())
}
```

[`tokio::process`]: https://docs.rs/tokio/latest/tokio/process
[`tokio::stream`]: https://docs.rs/futures-core/latest/futures_core/stream
[`tokio_stream::Stream`]: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html
[`tokio::process::Child`]: https://docs.rs/tokio/latest/tokio/process/struct.Child.html
[`ProcessStream`]: https://docs.rs/tokio-process-stream/latest/tokio-process-stream/tokio_process_stream/struct.ProcessStream.html
[`Item`]: https://docs.rs/tokio-process-stream/latest/tokio-process-stream/tokio_process_stream/enum.Item.html
[`Item::Stdout`]: https://docs.rs/tokio-process-stream/latest/tokio-process-stream/tokio_process_stream/enum.Item.html#variant.Stdout
[`Item::Stderr`]: https://docs.rs/tokio-process-stream/latest/tokio-process-stream/tokio_process_stream/enum.Item.html#variant.Stderr
[`Item::Done`]: https://docs.rs/tokio-process-stream/latest/tokio-process-stream/tokio_process_stream/enum.Item.html#variant.Done

168 changes: 168 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright (C) 2022 Leandro Lisboa Penz <lpenz@lpenz.org>
// This file is subject to the terms and conditions defined in
// file 'LICENSE', which is part of this source code package.

#![deny(future_incompatible)]
#![deny(nonstandard_style)]
#![deny(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]

//! tokio-process-stream is a simple crate that wraps a [`tokio::process`] into a
//! [`tokio::stream`]
//!
//! Having a stream interface to processes is useful when we have multiple sources of data that
//! we want to merge and start processing from a single entry point.
//!
//! This crate provides a [`tokio_stream::Stream`] wrapper for [`tokio::process::Child`]. The
//! main struct is [`ProcessStream`], which implements the trait, yielding one [`Item`] enum at
//! a time, each containing one line from either stdout ([`Item::Stdout`]) or stderr
//! ([`Item::Stderr`]) of the underlying process until it exits. At this point, the stream
//! yields a single [`Item::Done`] and finishes.
//!
//! Example usage:
//!
//! ```rust
//! use tokio_process_stream::ProcessStream;
//! use tokio::process::Command;
//! use tokio_stream::StreamExt;
//! use std::error::Error;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn Error>> {
//! let mut sleep_cmd = Command::new("sleep");
//! sleep_cmd.args(&["1"]);
//! let ls_cmd = Command::new("ls");
//!
//! let sleep_procstream = ProcessStream::try_from(sleep_cmd)?;
//! let ls_procstream = ProcessStream::try_from(ls_cmd)?;
//! let mut procstream = sleep_procstream.merge(ls_procstream);
//!
//! while let Some(item) = procstream.next().await {
//! println!("{:?}", item);
//! }
//!
//! Ok(())
//! }
//! ```

use pin_project_lite::pin_project;
use std::convert;
use std::fmt;
use std::io;
use std::pin::Pin;
use std::process::ExitStatus;
use std::process::Stdio;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Child;
use tokio::process::Command;
use tokio::process::{ChildStderr, ChildStdout};
use tokio_stream::wrappers::LinesStream;
use tokio_stream::Stream;

/// [`ProcessStream`] yields a stream of `Items`.
#[derive(Debug, PartialEq, Eq)]
pub enum Item {
/// A stdout line printed by the process.
Stdout(String),
/// A stderr line printed by the process.
Stderr(String),
/// The [`ExitStatus`](std::process::ExitStatus), yielded after the process exits.
Done(ExitStatus),
}

impl fmt::Display for Item {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Item::Stdout(s) => fmt::Display::fmt(&s, f),
Item::Stderr(s) => fmt::Display::fmt(&s, f),
_ => Ok(()),
}
}
}

pin_project! {
/// The main tokio-process-stream struct, which implements the
/// [`Stream`](tokio_stream::Stream) trait
#[derive(Debug)]
pub struct ProcessStream {
child: Option<Child>,
stdout: Option<LinesStream<BufReader<ChildStdout>>>,
stderr: Option<LinesStream<BufReader<ChildStderr>>>,
}
}

impl convert::From<Child> for ProcessStream {
fn from(mut child: Child) -> ProcessStream {
let stdout = child
.stdout
.take()
.map(|s| LinesStream::new(BufReader::new(s).lines()));
let stderr = child
.stderr
.take()
.map(|s| LinesStream::new(BufReader::new(s).lines()));
Self {
child: Some(child),
stdout,
stderr,
}
}
}

impl convert::TryFrom<Command> for ProcessStream {
type Error = io::Error;
fn try_from(mut command: Command) -> io::Result<ProcessStream> {
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
let child = command.spawn()?;
Ok(Self::from(child))
}
}

impl Stream for ProcessStream {
type Item = Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.child.is_none() {
// Keep returning None after we are done and everything is dropped
return Poll::Ready(None);
}
let this = self.project();
if let Some(stderr) = this.stderr {
match Pin::new(stderr).poll_next(cx) {
Poll::Ready(Some(line)) => {
return Poll::Ready(Some(Item::Stderr(line.unwrap())));
}
Poll::Ready(None) => {
*this.stderr = None;
}
Poll::Pending => {}
}
}
if let Some(stdout) = this.stdout {
match Pin::new(stdout).poll_next(cx) {
Poll::Ready(Some(line)) => {
return Poll::Ready(Some(Item::Stdout(line.unwrap())));
}
Poll::Ready(None) => {
*this.stdout = None;
}
Poll::Pending => {}
}
}
if this.stdout.is_none() && this.stderr.is_none() {
if let Some(ref mut child) = this.child {
if let Ok(Some(sts)) = child.try_wait() {
*this.child = None;
return Poll::Ready(Some(Item::Done(sts)));
} else {
cx.waker().wake_by_ref();
}
}
}
Poll::Pending
}
}
60 changes: 60 additions & 0 deletions tests/test_basic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2022 Leandro Lisboa Penz <lpenz@lpenz.org>
// This file is subject to the terms and conditions defined in
// file 'LICENSE', which is part of this source code package.

use tokio_process_stream::*;

use anyhow::Result;
use std::convert::TryFrom;
use std::process::Stdio;
use tokio::process::Command;
use tokio_stream::StreamExt;

#[tokio::test]
async fn basicout() -> Result<()> {
let mut cmd = Command::new("/bin/sh");
cmd.args(&["-c", "printf 'test1\ntest2'"]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let child = cmd.spawn()?;
let mut procstream = ProcessStream::from(child);
assert_eq!(
procstream.next().await,
Some(Item::Stdout("test1".to_owned()))
);
assert_eq!(
procstream.next().await,
Some(Item::Stdout("test2".to_owned()))
);
let exitstatus = procstream.next().await;
if let Some(Item::Done(sts)) = exitstatus {
assert!(sts.success());
} else {
panic!("invalid exit status {:?}", exitstatus);
}
assert_eq!(procstream.next().await, None);
Ok(())
}

#[tokio::test]
async fn basicerr() -> Result<()> {
let mut cmd = Command::new("/bin/sh");
cmd.args(&["-c", "printf 'test1\ntest2' >&2"]);
let mut procstream = ProcessStream::try_from(cmd)?;
assert_eq!(
procstream.next().await,
Some(Item::Stderr("test1".to_owned()))
);
assert_eq!(
procstream.next().await,
Some(Item::Stderr("test2".to_owned()))
);
let exitstatus = procstream.next().await;
if let Some(Item::Done(sts)) = exitstatus {
assert!(sts.success());
} else {
panic!("invalid exit status {:?}", exitstatus);
}
assert_eq!(procstream.next().await, None);
Ok(())
}

0 comments on commit ddb22d4

Please sign in to comment.