Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoids emitting the same output from multiple processes of the same process pool #3531

Merged
merged 4 commits into from
Feb 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 70 additions & 4 deletions crates/turbopack-node/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use std::{
collections::HashMap,
mem::take,
path::{Path, PathBuf},
process::{ExitStatus, Stdio},
sync::{Arc, Mutex},
time::Duration,
};

use anyhow::{bail, Context, Result};
use indexmap::IndexSet;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{
stderr, stdout, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
BufReader,
},
net::{TcpListener, TcpStream},
process::{Child, Command},
select,
Expand Down Expand Up @@ -55,11 +60,53 @@ impl Drop for RunningNodeJsPoolProcess {

const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);

type SharedOutputSet = Arc<Mutex<IndexSet<(Arc<[u8]>, u32)>>>;

/// Pipes the `stream` from `final_stream`, but uses `shared` to deduplicate
/// lines that has beem emitted by other `handle_output_stream` instances with
/// the same `shared` before.
async fn handle_output_stream(
stream: impl AsyncRead + Unpin,
shared: SharedOutputSet,
mut final_stream: impl AsyncWrite + Unpin,
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If stream A reports:

error A:
   shared line
   non-shared line (A)

while stream B reports:

error B:
  shared line
  non-shared line (B)

We would end up with:

error A:
  shared line
  non-shared line (A)
error B:
  non-shared line (B)

I expect this to also happen with empty lines printed for formatting purposes.

Should we instead only dedupe the start of a stream, up until the first different character?

let mut buffered = BufReader::new(stream);
let mut own_output = HashMap::<Arc<[u8]>, u32>::new();
let mut buffer = Vec::new();
loop {
match buffered.read_until(b'\n', &mut buffer).await {
alexkirsz marked this conversation as resolved.
Show resolved Hide resolved
Ok(0) => {
break;
}
Err(err) => {
eprintln!("error reading from stream: {}", err);
break;
}
Ok(_) => {}
}
let line = Arc::from(take(&mut buffer).into_boxed_slice());
let occurance_number = *own_output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let occurance_number = *own_output
let occurrence_number = *own_output

.entry(Arc::clone(&line))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we store a hash instead of the raw bytes? Wouldn't that save memory overall?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That work work (with a small risk of collisions), but that would loose the ability to read the logged output from the browser ui in future.

.and_modify(|c| *c += 1)
.or_insert(0);
let new_line = {
let mut shared = shared.lock().unwrap();
shared.insert((line.clone(), occurance_number))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need occurance_number? Wouldn't just a HashSet work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to allow logging lines multiple times from a process.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially planned to use HashMap<line: String, count: u32> but that won't preserve the order of logging. So it's a IndexSet<(line: String, occurance_number: u32)> which preserves all logging in the correct order.

We can display that in the browser UI for inspecting by iterating over the IndexSet. That will give all logging in the right order. Merged from all processes of the pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
shared.insert((line.clone(), occurance_number))
shared.insert((line.clone(), occurrence_number))

};
if new_line && final_stream.write(&line).await.is_err() {
// Whatever happened with stdout/stderr, we can't write to it anymore.
break;
}
}
}

impl NodeJsPoolProcess {
async fn new(
cwd: &Path,
env: &HashMap<String, String>,
entrypoint: &Path,
shared_stdout: SharedOutputSet,
shared_stderr: SharedOutputSet,
debug: bool,
) -> Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")
Expand All @@ -85,10 +132,21 @@ impl NodeJsPoolProcess {
.expect("the SystemRoot environment variable should always be set"),
);
cmd.envs(env);
cmd.stderr(Stdio::inherit());
cmd.stdout(Stdio::inherit());
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());

let child = cmd.spawn().context("spawning node pooled process")?;
let mut child = cmd.spawn().context("spawning node pooled process")?;

tokio::spawn(handle_output_stream(
child.stdout.take().unwrap(),
shared_stdout,
stdout(),
));
tokio::spawn(handle_output_stream(
child.stderr.take().unwrap(),
shared_stderr,
stderr(),
));

Ok(Self::Spawned(SpawnedNodeJsPoolProcess {
listener,
Expand Down Expand Up @@ -184,6 +242,10 @@ pub struct NodeJsPool {
processes: Arc<Mutex<Vec<NodeJsPoolProcess>>>,
#[turbo_tasks(trace_ignore, debug_ignore)]
semaphore: Arc<Semaphore>,
#[turbo_tasks(trace_ignore, debug_ignore)]
shared_stdout: SharedOutputSet,
#[turbo_tasks(trace_ignore, debug_ignore)]
shared_stderr: SharedOutputSet,
debug: bool,
}

Expand All @@ -203,6 +265,8 @@ impl NodeJsPool {
env,
processes: Arc::new(Mutex::new(Vec::new())),
semaphore: Arc::new(Semaphore::new(if debug { 1 } else { concurrency })),
shared_stdout: Arc::new(Mutex::new(IndexSet::new())),
shared_stderr: Arc::new(Mutex::new(IndexSet::new())),
debug,
}
}
Expand All @@ -220,6 +284,8 @@ impl NodeJsPool {
self.cwd.as_path(),
&self.env,
self.entrypoint.as_path(),
self.shared_stdout.clone(),
self.shared_stderr.clone(),
self.debug,
)
.await
Expand Down