Skip to content

Commit

Permalink
feat: add method for piping child output to writer (#6031)
Browse files Browse the repository at this point in the history
### Description

In order to capture task output we need to pipe it to our own writer
implementations. Since we want to stream the outputs we can't use
`wait_with_outputs` as that requires waiting for a process to finish
execution before we get the outputs.

### Testing Instructions

Added unit test to verify that the writers end up with the data that
should be written to stdout.


Closes TURBO-1362

Co-authored-by: Chris Olszewski <Chris Olszewski>
  • Loading branch information
chris-olszewski committed Sep 26, 2023
1 parent de4d82e commit ddc7ab5
Showing 1 changed file with 61 additions and 1 deletion.
62 changes: 61 additions & 1 deletion crates/turborepo-lib/src/process/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
//! them when the manager is closed.

use std::{
io,
io::{self, Write},
sync::{Arc, Mutex},
time::Duration,
};

use command_group::AsyncCommandGroup;
use futures::future::try_join3;
pub use tokio::process::Command;
use tokio::{
io::{AsyncBufReadExt, AsyncRead, BufReader},
join,
sync::{mpsc, watch, RwLock},
};
Expand Down Expand Up @@ -343,6 +345,42 @@ impl Child {
pub fn stderr(&mut self) -> Option<tokio::process::ChildStderr> {
self.stderr.lock().unwrap().take()
}

/// Wait for the `Child` to exit and pipe any stdout and stderr to the
/// provided writers.
pub async fn wait_with_piped_outputs<W: Write>(
&mut self,
stdout_pipe: W,
stderr_pipe: W,
) -> Result<Option<ChildExit>, std::io::Error> {
// Note this is similar to tokio::process::Command::wait_with_outputs
// but allows us to provide our own sinks instead of just writing to a buffers.
async fn pipe_lines<R: AsyncRead + Unpin, W: Write>(
stream: Option<R>,
mut sink: W,
) -> std::io::Result<()> {
let Some(stream) = stream else { return Ok(()) };
let mut stream = BufReader::new(stream);
let mut buffer = String::new();
loop {
// If 0 bytes are read that indicates we've hit EOF
if stream.read_line(&mut buffer).await? == 0 {
break;
}
sink.write_all(buffer.as_bytes())?;
buffer.clear();
}
Ok(())
}

let stdout_fut = pipe_lines(self.stdout(), stdout_pipe);
let stderr_fut = pipe_lines(self.stderr(), stderr_pipe);

let (exit, _stdout, _stderr) =
try_join3(async { Ok(self.wait().await) }, stdout_fut, stderr_fut).await?;

Ok(exit)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -578,4 +616,26 @@ mod test {

assert_matches!(&*state, ChildState::Exited(_expected));
}

#[tokio::test]
async fn test_wait_with_output() {
let script = find_script_dir().join_component("hello_world.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut out = Vec::new();
let mut err = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut out, &mut err)
.await
.unwrap();

assert_eq!(out, b"hello world\n");
assert!(err.is_empty());
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
}
}

0 comments on commit ddc7ab5

Please sign in to comment.