Skip to content

Commit

Permalink
Adding the into_stdio make async-process more flexible. (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
BinChengZhao committed Apr 24, 2021
1 parent 716cc8a commit 7d9dc0b
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 2 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "async-process"
version = "1.0.2"
version = "1.1.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
description = "Async interface for working with processes"
Expand All @@ -19,6 +19,7 @@ once_cell = "1.4.1"

[target.'cfg(unix)'.dependencies]
async-io = "1.0.0"
libc = "0.2.88"

[target.'cfg(unix)'.dependencies.signal-hook]
version = "0.3.0"
Expand All @@ -33,5 +34,5 @@ version = "0.3.9"
default-features = false
features = [
"winbase",
"winnt",
"winnt"
]
162 changes: 162 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ use std::thread;

#[cfg(unix)]
use async_io::Async;
#[cfg(unix)]
use std::os::unix::io::AsRawFd;

#[cfg(windows)]
use blocking::Unblock;

use event_listener::Event;
use futures_lite::{future, io, prelude::*};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -443,6 +447,38 @@ pub struct ChildStdin(
#[cfg(unix)] Async<std::process::ChildStdin>,
);

impl ChildStdin {
/// Convert async_process::ChildStdin into std::process::Stdio.
///
/// You can use it to associate to the next process.
///
/// # Examples
///
/// ```no_run
/// # futures_lite::future::block_on(async {
/// use async_process::Command;
/// use std::process::Stdio;
///
/// let mut ls_child = Command::new("ls").stdin(Stdio::piped()).spawn()?;
/// let stdio:Stdio = ls_child.stdin.take().unwrap().into_stdio().await?;
///
/// let mut echo_child = Command::new("echo").arg("./").stdout(stdio).spawn()?;
///
/// # std::io::Result::Ok(()) });
/// ```
pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
cfg_if::cfg_if! {
if #[cfg(windows)] {
Ok(self.0.into_inner().await.into())
} else if #[cfg(unix)] {
let child_stdin = self.0.into_inner()?;
blocking_fd(child_stdin.as_raw_fd())?;
Ok(child_stdin.into())
}
}
}
}

impl io::AsyncWrite for ChildStdin {
fn poll_write(
mut self: Pin<&mut Self>,
Expand Down Expand Up @@ -470,6 +506,41 @@ pub struct ChildStdout(
#[cfg(unix)] Async<std::process::ChildStdout>,
);

impl ChildStdout {
/// Convert async_process::ChildStdout into std::process::Stdio.
///
/// You can use it to associate to the next process.
///
/// # Examples
///
/// ```no_run
/// # futures_lite::future::block_on(async {
/// use async_process::Command;
/// use std::process::Stdio;
/// use std::io::Read;
/// use futures_lite::AsyncReadExt;
///
/// let mut ls_child = Command::new("ls").stdout(Stdio::piped()).spawn()?;
/// let stdio:Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;
///
/// let mut echo_child = Command::new("echo").stdin(stdio).stdout(Stdio::piped()).spawn()?;
/// let mut buf = vec![];
/// echo_child.stdout.take().unwrap().read(&mut buf).await;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
cfg_if::cfg_if! {
if #[cfg(windows)] {
Ok(self.0.into_inner().await.into())
} else if #[cfg(unix)] {
let child_stdout = self.0.into_inner()?;
blocking_fd(child_stdout.as_raw_fd())?;
Ok(child_stdout.into())
}
}
}
}

impl io::AsyncRead for ChildStdout {
fn poll_read(
mut self: Pin<&mut Self>,
Expand All @@ -489,6 +560,37 @@ pub struct ChildStderr(
#[cfg(unix)] Async<std::process::ChildStderr>,
);

impl ChildStderr {
/// Convert async_process::ChildStderr into std::process::Stdio.
///
/// You can use it to associate to the next process.
///
/// # Examples
///
/// ```no_run
/// # futures_lite::future::block_on(async {
/// use async_process::Command;
/// use std::process::Stdio;
///
/// let mut ls_child = Command::new("ls").arg("x").stderr(Stdio::piped()).spawn()?;
/// let stdio:Stdio = ls_child.stderr.take().unwrap().into_stdio().await?;
///
/// let mut echo_child = Command::new("echo").stdin(stdio).spawn()?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn into_stdio(self) -> io::Result<std::process::Stdio> {
cfg_if::cfg_if! {
if #[cfg(windows)] {
Ok(self.0.into_inner().await.into())
} else if #[cfg(unix)] {
let child_stderr = self.0.into_inner()?;
blocking_fd(child_stderr.as_raw_fd())?;
Ok(child_stderr.into())
}
}
}
}

impl io::AsyncRead for ChildStderr {
fn poll_read(
mut self: Pin<&mut Self>,
Expand Down Expand Up @@ -837,3 +939,63 @@ impl Command {
async { child?.output().await }
}
}

#[cfg(unix)]
/// Moves `Fd` out of nonblocking mode.
fn blocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
// Helper macro to execute a system call that returns an `io::Result`.
macro_rules! syscall {
($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
let res = unsafe { libc::$fn($($arg, )*) };
if res == -1 {
return Err(std::io::Error::last_os_error());
} else {
res
}
}};
}

let res = syscall!(fcntl(fd, libc::F_GETFL));
syscall!(fcntl(fd, libc::F_SETFL, res & !libc::O_NONBLOCK));

Ok(())
}

#[cfg(unix)]
mod test {

#[test]
fn test_into_inner() {
futures_lite::future::block_on(async {
use crate::Command;

use std::io::Result;
use std::process::Stdio;
use std::str::from_utf8;

use futures_lite::AsyncReadExt;

let mut ls_child = Command::new("cat")
.arg("Cargo.toml")
.stdout(Stdio::piped())
.spawn()?;

let stdio: Stdio = ls_child.stdout.take().unwrap().into_stdio().await?;

let mut echo_child = Command::new("grep")
.arg("async")
.stdin(stdio)
.stdout(Stdio::piped())
.spawn()?;

let mut buf = vec![];
let mut stdout = echo_child.stdout.take().unwrap();

stdout.read_to_end(&mut buf).await?;
dbg!(from_utf8(&buf).unwrap_or(""));

Result::Ok(())
})
.unwrap();
}
}

0 comments on commit 7d9dc0b

Please sign in to comment.