diff --git a/crates/turbopack-node/src/pool.rs b/crates/turbopack-node/src/pool.rs index 2d68c59807b5d..a7088f54666dc 100644 --- a/crates/turbopack-node/src/pool.rs +++ b/crates/turbopack-node/src/pool.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + mem::take, path::{Path, PathBuf}, process::{ExitStatus, Stdio}, sync::{Arc, Mutex}, @@ -7,9 +8,13 @@ use std::{ }; use anyhow::{bail, Context, Result}; +use indexmap::IndexSet; use serde::{de::DeserializeOwned, Serialize}; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + io::{ + stderr, stdout, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, + BufReader, + }, net::{TcpListener, TcpStream}, process::{Child, Command}, select, @@ -55,11 +60,53 @@ impl Drop for RunningNodeJsPoolProcess { const CONNECT_TIMEOUT: Duration = Duration::from_secs(30); +type SharedOutputSet = Arc, u32)>>>; + +/// Pipes the `stream` from `final_stream`, but uses `shared` to deduplicate +/// lines that has beem emitted by other `handle_output_stream` instances with +/// the same `shared` before. +async fn handle_output_stream( + stream: impl AsyncRead + Unpin, + shared: SharedOutputSet, + mut final_stream: impl AsyncWrite + Unpin, +) { + let mut buffered = BufReader::new(stream); + let mut own_output = HashMap::, u32>::new(); + let mut buffer = Vec::new(); + loop { + match buffered.read_until(b'\n', &mut buffer).await { + Ok(0) => { + break; + } + Err(err) => { + eprintln!("error reading from stream: {}", err); + break; + } + Ok(_) => {} + } + let line = Arc::from(take(&mut buffer).into_boxed_slice()); + let occurance_number = *own_output + .entry(Arc::clone(&line)) + .and_modify(|c| *c += 1) + .or_insert(0); + let new_line = { + let mut shared = shared.lock().unwrap(); + shared.insert((line.clone(), occurance_number)) + }; + if new_line && final_stream.write(&line).await.is_err() { + // Whatever happened with stdout/stderr, we can't write to it anymore. + break; + } + } +} + impl NodeJsPoolProcess { async fn new( cwd: &Path, env: &HashMap, entrypoint: &Path, + shared_stdout: SharedOutputSet, + shared_stderr: SharedOutputSet, debug: bool, ) -> Result { let listener = TcpListener::bind("127.0.0.1:0") @@ -85,10 +132,21 @@ impl NodeJsPoolProcess { .expect("the SystemRoot environment variable should always be set"), ); cmd.envs(env); - cmd.stderr(Stdio::inherit()); - cmd.stdout(Stdio::inherit()); + cmd.stderr(Stdio::piped()); + cmd.stdout(Stdio::piped()); - let child = cmd.spawn().context("spawning node pooled process")?; + let mut child = cmd.spawn().context("spawning node pooled process")?; + + tokio::spawn(handle_output_stream( + child.stdout.take().unwrap(), + shared_stdout, + stdout(), + )); + tokio::spawn(handle_output_stream( + child.stderr.take().unwrap(), + shared_stderr, + stderr(), + )); Ok(Self::Spawned(SpawnedNodeJsPoolProcess { listener, @@ -184,6 +242,10 @@ pub struct NodeJsPool { processes: Arc>>, #[turbo_tasks(trace_ignore, debug_ignore)] semaphore: Arc, + #[turbo_tasks(trace_ignore, debug_ignore)] + shared_stdout: SharedOutputSet, + #[turbo_tasks(trace_ignore, debug_ignore)] + shared_stderr: SharedOutputSet, debug: bool, } @@ -203,6 +265,8 @@ impl NodeJsPool { env, processes: Arc::new(Mutex::new(Vec::new())), semaphore: Arc::new(Semaphore::new(if debug { 1 } else { concurrency })), + shared_stdout: Arc::new(Mutex::new(IndexSet::new())), + shared_stderr: Arc::new(Mutex::new(IndexSet::new())), debug, } } @@ -220,6 +284,8 @@ impl NodeJsPool { self.cwd.as_path(), &self.env, self.entrypoint.as_path(), + self.shared_stdout.clone(), + self.shared_stderr.clone(), self.debug, ) .await