diff --git a/crates/turborepo-lib/src/lib.rs b/crates/turborepo-lib/src/lib.rs index d7a698dd6b111..612f507452ae0 100644 --- a/crates/turborepo-lib/src/lib.rs +++ b/crates/turborepo-lib/src/lib.rs @@ -20,11 +20,11 @@ mod framework; pub(crate) mod globwatcher; pub mod graph; mod hash; -mod manager; mod opts; mod package_graph; mod package_json; mod package_manager; +mod process; mod rewrite_json; mod run; mod shim; diff --git a/crates/turborepo-lib/src/manager.rs b/crates/turborepo-lib/src/manager.rs deleted file mode 100644 index 66fe8fd7af691..0000000000000 --- a/crates/turborepo-lib/src/manager.rs +++ /dev/null @@ -1,11 +0,0 @@ -// Manager is a wrapper around child processes executed by turbo -#[derive(Debug)] -pub struct Manager { - // TODO -} - -impl Manager { - pub fn new() -> Self { - Self {} - } -} diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs new file mode 100644 index 0000000000000..da13cc029d120 --- /dev/null +++ b/crates/turborepo-lib/src/process/child.rs @@ -0,0 +1,534 @@ +//! `child` +//! +//! This module contains the code for spawning a child process and managing it. +//! It is responsible for forwarding signals to the child process, and closing +//! the child process when the manager is closed. +//! +//! The child process is spawned using the `shared_child` crate, which provides +//! a cross platform interface for spawning and managing child processes. +//! +//! Children can be closed in a few ways, either through killing, or more +//! gracefully by coupling a signal and a timeout. +//! +//! This loosely follows the actor model, where the child process is an actor +//! that is spawned and managed by the manager. The manager is responsible for +//! running these processes to completion, forwarding signals, and closing +//! them when the manager is closed. + +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use command_group::AsyncCommandGroup; +pub use tokio::process::Command; +use tokio::{ + join, + sync::{mpsc, watch, RwLock}, +}; +use tracing::{debug, info}; + +#[derive(Debug)] +pub enum ChildState { + Running(ChildCommandChannel), + Exited(ChildExit), +} + +impl ChildState { + pub fn command_channel(&self) -> Option<&ChildCommandChannel> { + match self { + ChildState::Running(c) => Some(c), + ChildState::Exited(_) => None, + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum ChildExit { + Finished(Option), + Killed, + KilledExternal, + Failed, +} + +#[derive(Clone)] +pub enum ShutdownStyle { + /// On windows this will immediately kill, and on posix systems it + /// will send a SIGINT. If `Duration` elapses, we then follow up with a + /// `Kill`. + Graceful(Duration), + + Kill, +} + +/// Child process stopped. +#[derive(Debug)] +pub struct ShutdownFailed; + +impl From for ShutdownFailed { + fn from(_: std::io::Error) -> Self { + ShutdownFailed + } +} + +impl ShutdownStyle { + /// Process the shutdown style for the given child process. + /// + /// If an exit channel is provided, the exit code will be sent to the + /// channel when the child process exits. + async fn process(&self, child: &mut tokio::process::Child) -> ChildState { + match self { + ShutdownStyle::Graceful(timeout) => { + // try ro run the command for the given timeout + #[cfg(unix)] + { + let fut = async { + if let Some(pid) = child.id() { + debug!("sending SIGINT to child {}", pid); + unsafe { + libc::kill(pid as i32, libc::SIGINT); + } + debug!("waiting for child {}", pid); + child.wait().await.map(|es| es.code()) + } else { + // if there is no pid, then just report successful with no exit code + Ok(None) + } + }; + + info!("starting shutdown"); + + let result = tokio::time::timeout(*timeout, fut).await; + match result { + Ok(Ok(result)) => ChildState::Exited(ChildExit::Finished(result)), + Ok(Err(_)) => ChildState::Exited(ChildExit::Failed), + Err(_) => { + info!("graceful shutdown timed out, killing child"); + match child.kill().await { + Ok(_) => ChildState::Exited(ChildExit::Killed), + Err(_) => ChildState::Exited(ChildExit::Failed), + } + } + } + } + + #[cfg(windows)] + { + debug!("timeout not supported on windows, killing"); + match child.kill().await { + Ok(_) => ChildState::Exited(ChildExit::Killed), + Err(_) => ChildState::Exited(ChildExit::Failed), + } + } + } + ShutdownStyle::Kill => match child.kill().await { + Ok(_) => ChildState::Exited(ChildExit::Killed), + Err(_) => ChildState::Exited(ChildExit::Failed), + }, + } + } +} + +/// A child process that can be interacted with asynchronously. +/// +/// This is a wrapper around the `tokio::process::Child` struct, which provides +/// a cross platform interface for spawning and managing child processes. +#[derive(Clone, Debug)] +pub struct Child { + pid: Option, + gid: Option, + state: Arc>, + exit_channel: watch::Receiver>, + stdin: Arc>>, + stdout: Arc>>, + stderr: Arc>>, +} + +#[derive(Debug)] +pub struct ChildCommandChannel(mpsc::Sender); + +impl ChildCommandChannel { + pub fn new() -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(1); + (ChildCommandChannel(tx), rx) + } + + pub async fn kill(&self) -> Result<(), mpsc::error::SendError> { + self.0.send(ChildCommand::Kill).await + } + + pub async fn stop(&self) -> Result<(), mpsc::error::SendError> { + self.0.send(ChildCommand::Stop).await + } +} + +pub enum ChildCommand { + Stop, + Kill, +} + +impl Child { + /// Start a child process, returning a handle that can be used to interact + /// with it. The command will be started immediately. + pub fn spawn(mut command: Command, shutdown_style: ShutdownStyle) -> Self { + let group = command.group().spawn().expect("failed to start child"); + + let gid = group.id(); + let mut child = group.into_inner(); + let pid = child.id(); + + let stdin = child.stdin.take(); + let stdout = child.stdout.take(); + let stderr = child.stderr.take(); + + let (command_tx, mut command_rx) = ChildCommandChannel::new(); + + // we use a watch channel to communicate the exit code back to the + // caller. we are interested in three cases: + // - the child process exits + // - the child process is killed (and doesn't have an exit code) + // - the child process fails somehow (some syscall fails) + let (exit_tx, exit_rx) = watch::channel(None); + + let state = Arc::new(RwLock::new(ChildState::Running(command_tx))); + let task_state = state.clone(); + + let _task = tokio::spawn(async move { + info!("waiting for task"); + tokio::select! { + command = command_rx.recv() => { + let state = match command { + // we received a command to stop the child process, or the channel was closed. + // in theory this happens when the last child is dropped, however in practice + // we will always get a `Permit` from the recv call before the channel can be + // dropped, and the cnannel is not closed while there are still permits + Some(ChildCommand::Stop) | None => { + debug!("stopping child process"); + shutdown_style.process(&mut child).await + } + // we received a command to kill the child process + Some(ChildCommand::Kill) => { + debug!("killing child process"); + ShutdownStyle::Kill.process(&mut child).await + } + }; + + match state { + ChildState::Exited(exit) => { + // ignore the send error, failure means the channel is dropped + exit_tx.send(Some(exit)).ok(); + } + ChildState::Running(_) => { + debug_assert!(false, "child state should not be running after shutdown"); + } + } + + { + let mut task_state = task_state.write().await; + *task_state = state; + } + } + status = child.wait() => { + // the child process exited + let child_exit = match status.map(|s| s.code()) { + Ok(Some(c)) => ChildExit::Finished(Some(c)), + // if we hit this case, it means that the child process was killed + // by someone else, and we should report that it was killed + Ok(None) => ChildExit::KilledExternal, + Err(_e) => ChildExit::Failed, + }; + { + let mut task_state = task_state.write().await; + *task_state = ChildState::Exited(child_exit); + } + + // ignore the send error, the channel is dropped anyways + exit_tx.send(Some(child_exit)).ok(); + + } + } + + debug!("child process exited"); + }); + + Self { + pid, + gid, + state, + exit_channel: exit_rx, + stdin: Arc::new(Mutex::new(stdin)), + stdout: Arc::new(Mutex::new(stdout)), + stderr: Arc::new(Mutex::new(stderr)), + } + } + + /// Wait for the `Child` to exit, returning the exit code. + pub async fn wait(&mut self) -> Option { + self.exit_channel.changed().await.ok()?; + *self.exit_channel.borrow() + } + + /// Perform a graceful shutdown of the `Child` process. + pub async fn stop(&mut self) -> Option { + let mut watch = self.exit_channel.clone(); + + let fut = async { + let state = self.state.read().await; + let child = match state.command_channel() { + Some(child) => child, + None => return, + }; + + // if this fails, it's because the channel is dropped (toctou) + // we can just ignore it + child.stop().await.ok(); + }; + + let (_, code) = join! { + fut, + async { + watch.changed().await.ok()?; + *watch.borrow() + } + }; + + code + } + + /// Kill the `Child` process immediately. + pub async fn kill(&mut self) -> Option { + let mut watch = self.exit_channel.clone(); + + let fut = async { + let rw_lock_read_guard = self.state.read().await; + let child = match rw_lock_read_guard.command_channel() { + Some(child) => child, + None => return, + }; + + // if this fails, it's because the channel is dropped (toctou) + // we can just ignore it + child.kill().await.ok(); + }; + + let (_, code) = join! { + fut, + async { + // if this fails, it is because the watch receiver is dropped. just ignore it do a best-effort + watch.changed().await.ok(); + *watch.borrow() + } + }; + + code + } + + fn pid(&self) -> Option { + self.pid + } + + pub fn stdin(&mut self) -> Option { + self.stdin.lock().unwrap().take() + } + + pub fn stdout(&mut self) -> Option { + self.stdout.lock().unwrap().take() + } + + pub fn stderr(&mut self) -> Option { + self.stderr.lock().unwrap().take() + } +} + +#[cfg(test)] +mod test { + use std::{assert_matches::assert_matches, process::Stdio, time::Duration}; + + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + process::Command, + }; + use tracing_test::traced_test; + + use super::{Child, ChildState}; + use crate::process::child::{ChildExit, ShutdownStyle}; + + const STARTUP_DELAY: Duration = Duration::from_millis(500); + + #[tokio::test] + async fn test_pid() { + let mut cmd = Command::new("node"); + cmd.args(["./test/scripts/hello_world.js"]); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill); + + assert_matches!(child.pid(), Some(_)); + child.stop().await; + + let state = child.state.read().await; + assert_matches!(&*state, ChildState::Exited(ChildExit::Killed)); + } + + #[tokio::test] + #[traced_test] + async fn test_spawn() { + let cmd = { + let mut cmd = Command::new("node"); + cmd.args(["./test/scripts/hello_world.js"]); + cmd.stdout(Stdio::piped()); + cmd + }; + + let mut child = Child::spawn(cmd, ShutdownStyle::Kill); + + { + let state = child.state.read().await; + assert_matches!(&*state, ChildState::Running(_)); + } + + let code = child.wait().await; + assert_eq!(code, Some(ChildExit::Finished(Some(0)))); + + { + let state = child.state.read().await; + assert_matches!(&*state, ChildState::Exited(ChildExit::Finished(Some(0)))); + } + } + + #[tokio::test] + #[traced_test] + async fn test_stdout() { + let mut cmd = Command::new("node"); + cmd.args(["./test/scripts/hello_world.js"]); + cmd.stdout(Stdio::piped()); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill); + + tokio::time::sleep(STARTUP_DELAY).await; + + child.wait().await; + + { + let mut output = Vec::new(); + child + .stdout() + .unwrap() + .read_to_end(&mut output) + .await + .expect("Failed to read stdout"); + + let output_str = String::from_utf8(output).expect("Failed to parse stdout"); + + assert!(output_str.contains("hello world")); + } + + let state = child.state.read().await; + + assert_matches!(&*state, ChildState::Exited(ChildExit::Finished(Some(0)))); + } + + #[tokio::test] + async fn test_stdio() { + let mut cmd = Command::new("node"); + cmd.args(["./test/scripts/stdin_stdout.js"]); + cmd.stdout(Stdio::piped()); + cmd.stdin(Stdio::piped()); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill); + + let mut stdout = child.stdout().unwrap(); + + tokio::time::sleep(STARTUP_DELAY).await; + + // drop stdin to close the pipe + { + let mut stdin = child.stdin().unwrap(); + stdin.write_all(b"hello world").await.unwrap(); + } + + let mut output = Vec::new(); + stdout.read_to_end(&mut output).await.unwrap(); + + let output_str = String::from_utf8(output).expect("Failed to parse stdout"); + + assert_eq!(output_str, "hello world"); + + child.wait().await; + + let state = child.state.read().await; + + assert_matches!(&*state, ChildState::Exited(ChildExit::Finished(Some(0)))); + } + + #[tokio::test] + #[traced_test] + async fn test_graceful_shutdown_timeout() { + let cmd = { + let mut cmd = Command::new("node"); + cmd.args(["./test/scripts/sleep_5_ignore.js"]); + cmd + }; + + let mut child = Child::spawn(cmd, ShutdownStyle::Graceful(Duration::from_millis(500))); + + // give it a moment to register the signal handler + tokio::time::sleep(STARTUP_DELAY).await; + + child.stop().await; + + let state = child.state.read().await; + + // this should time out and be killed + assert_matches!(&*state, ChildState::Exited(ChildExit::Killed)); + } + + #[tokio::test] + #[traced_test] + async fn test_graceful_shutdown() { + let cmd = { + let mut cmd = Command::new("node"); + cmd.args(["./test/scripts/sleep_5_interruptable.js"]); + cmd + }; + + let mut child = Child::spawn(cmd, ShutdownStyle::Graceful(Duration::from_millis(500))); + + tokio::time::sleep(STARTUP_DELAY).await; + + child.stop().await; + child.wait().await; + + let state = child.state.read().await; + + // process exits with no code when interrupted + #[cfg(unix)] + assert_matches!(&*state, &ChildState::Exited(ChildExit::Finished(None))); + + #[cfg(not(unix))] + assert_matches!(&*state, &ChildState::Exited(ChildExit::Killed)); + } + + #[tokio::test] + #[traced_test] + async fn test_detect_killed_someone_else() { + let cmd = { + let mut cmd = Command::new("node"); + cmd.args(["./test/scripts/sleep_5_interruptable.js"]); + cmd + }; + + let mut child = Child::spawn(cmd, ShutdownStyle::Graceful(Duration::from_millis(500))); + + tokio::time::sleep(STARTUP_DELAY).await; + + #[cfg(unix)] + if let Some(pid) = child.pid() { + unsafe { + libc::kill(pid as i32, libc::SIGINT); + } + } + + child.wait().await; + + let state = child.state.read().await; + + assert_matches!(&*state, ChildState::Exited(ChildExit::KilledExternal)); + } +} diff --git a/crates/turborepo-lib/src/process/mod.rs b/crates/turborepo-lib/src/process/mod.rs new file mode 100644 index 0000000000000..08a7bbfb4a2c1 --- /dev/null +++ b/crates/turborepo-lib/src/process/mod.rs @@ -0,0 +1,284 @@ +//! `process` +//! +//! This module contains the code that is responsible for running the commands +//! that are queued by run. It consists of a set of child processes that are +//! spawned and managed by the manager. The manager is responsible for +//! running these processes to completion, forwarding signals, and closing +//! them when the manager is closed. +//! +//! As of now, the manager will execute futures in a random order, and +//! must be either `wait`ed on or `stop`ped to drive state. + +mod child; + +use std::{ + sync::{atomic::AtomicBool, Arc, Mutex}, + time::Duration, +}; + +pub use child::Command; +use futures::Future; +use tokio::task::JoinSet; +use tracing::{debug, trace}; + +use self::child::{Child, ChildExit}; + +/// A process manager that is responsible for spawning and managing child +/// processes. When the manager is Open, new child processes can be spawned +/// using `spawn`. When the manager is Closed, all currently-running children +/// will be closed, and no new children can be spawned. +#[derive(Debug, Clone)] +pub struct ProcessManager(Arc>); + +#[derive(Debug)] +struct ProcessManagerInner { + is_closing: bool, + children: Vec, +} + +impl ProcessManager { + pub fn new() -> Self { + Self(Arc::new(Mutex::new(ProcessManagerInner { + is_closing: false, + children: Vec::new(), + }))) + } +} + +impl ProcessManager { + /// Spawn a new child process to run the given command. + /// + /// The handle of the child can be either waited or stopped by the caller, + /// as well as the entire process manager. + /// + /// If spawn returns None, + pub fn spawn(&self, command: child::Command, stop_timeout: Duration) -> Option { + let mut lock = self.0.lock().unwrap(); + if lock.is_closing { + return None; + } + let child = child::Child::spawn(command, child::ShutdownStyle::Graceful(stop_timeout)); + lock.children.push(child.clone()); + Some(child) + } + + /// Stop the process manager, closing all child processes. On posix + /// systems this will send a SIGINT, and on windows it will just kill + /// the process immediately. + pub async fn stop(&self) { + self.close(|mut c| async move { c.stop().await }).await + } + + /// Stop the process manager, waiting for all child processes to exit. + /// + /// If you want to set a timeout, use `tokio::time::timeout` and + /// `Self::stop` if the timeout elapses. + pub async fn wait(&self) { + self.close(|mut c| async move { c.wait().await }).await + } + + /// Close the process manager, running the given callback on each child + /// + /// note: this is designed to be called multiple times, ie calling close + /// with two different strategies will propagate both signals to the child + /// processes. clearing the task queue and re-enabling spawning are both + /// idempotent operations + async fn close(&self, callback: F) + where + F: Fn(Child) -> C + Sync + Send + Copy + 'static, + C: Future> + Sync + Send + 'static, + { + let mut set = JoinSet::new(); + + { + let mut lock = self.0.lock().expect("not poisoned"); + lock.is_closing = true; + for child in lock.children.iter() { + let child = child.clone(); + set.spawn(async move { callback(child).await }); + } + } + + debug!("waiting for {} processes to exit", set.len()); + + while let Some(out) = set.join_next().await { + trace!("process exited: {:?}", out); + } + + { + let mut lock = self.0.lock().expect("not poisoned"); + + // just allocate a new vec rather than clearing the old one + lock.children = vec![]; + lock.is_closing = false; + } + } +} + +#[cfg(test)] +mod test { + + use futures::{stream::FuturesUnordered, StreamExt}; + use test_case::test_case; + use time::Instant; + use tokio::{join, process::Command, time::sleep}; + use tracing_test::traced_test; + + use super::*; + + fn get_command() -> Command { + let mut cmd = Command::new("node"); + cmd.arg("./test/scripts/sleep_5_interruptable.js"); + cmd + } + + #[tokio::test] + async fn test_basic() { + let manager = ProcessManager::new(); + manager.spawn(get_command(), Duration::from_secs(2)); + manager.stop().await; + } + + #[tokio::test] + async fn test_multiple() { + let manager = ProcessManager::new(); + + manager.spawn(get_command(), Duration::from_secs(2)); + manager.spawn(get_command(), Duration::from_secs(2)); + manager.spawn(get_command(), Duration::from_secs(2)); + + sleep(Duration::from_millis(100)).await; + + manager.stop().await; + } + + #[tokio::test] + async fn test_closed() { + let manager = ProcessManager::new(); + manager.spawn(get_command(), Duration::from_secs(2)); + manager.stop().await; + + manager.spawn(get_command(), Duration::from_secs(2)); + + sleep(Duration::from_millis(100)).await; + + manager.stop().await; + } + + #[tokio::test] + async fn test_exit_code() { + let manager = ProcessManager::new(); + let mut child = manager + .spawn(get_command(), Duration::from_secs(2)) + .expect("running"); + + sleep(Duration::from_millis(100)).await; + + let code = child.wait().await; + assert_eq!(code, Some(ChildExit::Finished(Some(0)))); + + manager.stop().await; + } + + #[tokio::test] + #[traced_test] + async fn test_message_after_stop() { + let manager = ProcessManager::new(); + let mut child = manager + .spawn(get_command(), Duration::from_secs(2)) + .expect("running"); + + sleep(Duration::from_millis(100)).await; + + let exit = child.wait().await; + assert_eq!(exit, Some(ChildExit::Finished(Some(0)))); + + manager.stop().await; + + // this is idempotent, so calling it after the manager is stopped is ok + child.kill().await; + + let code = child.wait().await; + assert_eq!(code, None); + } + + #[tokio::test] + async fn test_reuse_manager() { + let manager = ProcessManager::new(); + manager.spawn(get_command(), Duration::from_secs(2)); + + sleep(Duration::from_millis(100)).await; + + manager.stop().await; + + assert!(manager.0.lock().unwrap().children.is_empty()); + + // idempotent + manager.stop().await; + } + + #[test_case("stop", ChildExit::Finished(None))] + #[test_case("wait", ChildExit::Finished(Some(0)))] + #[tokio::test] + async fn test_stop_multiple_tasks_shared(strat: &str, expected: ChildExit) { + let manager = ProcessManager::new(); + let tasks = FuturesUnordered::new(); + + for _ in 0..10 { + let manager = manager.clone(); + tasks.push(tokio::spawn(async move { + let mut command = super::child::Command::new("sleep"); + command.arg("1"); + + manager + .spawn(command, Duration::from_secs(1)) + .unwrap() + .wait() + .await + })); + } + + // wait for tasks to start + tokio::time::sleep(Duration::from_millis(50)).await; + + match strat { + "stop" => manager.stop().await, + "wait" => manager.wait().await, + _ => panic!("unknown strat"), + } + + // tasks return proper exit code + assert!( + tasks.all(|v| async { v.unwrap() == Some(expected) }).await, + "not all tasks returned the correct code: {:?}", + expected + ); + } + + #[tokio::test] + async fn test_wait_multiple_tasks() { + let manager = ProcessManager::new(); + + let mut command = super::child::Command::new("sleep"); + command.arg("1"); + + manager.spawn(command, Duration::from_secs(1)); + + // let the task start + tokio::time::sleep(Duration::from_millis(50)).await; + + let start_time = Instant::now(); + + // we support 'close escalation'; someone can call + // stop even if others are waiting + let _ = join! { + manager.wait(), + manager.wait(), + manager.stop(), + }; + + let finish_time = Instant::now(); + + assert!((finish_time - start_time).lt(&Duration::from_secs(2))); + } +} diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index ce61eca6c179f..6974d563ef5c9 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -28,10 +28,10 @@ use crate::{ config::TurboJson, daemon::DaemonConnector, engine::EngineBuilder, - manager::Manager, opts::{GraphOpts, Opts}, package_graph::{PackageGraph, WorkspaceName}, package_json::PackageJson, + process::ProcessManager, run::global_hash::get_global_hash_inputs, task_graph::Visitor, task_hash::PackageInputsHashes, @@ -40,12 +40,12 @@ use crate::{ #[derive(Debug)] pub struct Run { base: CommandBase, - processes: Manager, + processes: ProcessManager, } impl Run { pub fn new(base: CommandBase) -> Self { - let processes = Manager::new(); + let processes = ProcessManager::new(); Self { base, processes } } @@ -250,6 +250,9 @@ impl Run { debug!("package inputs hashes: {:?}", package_inputs_hashes); + // remove dead code warnings + let _proc_manager = ProcessManager::new(); + let pkg_dep_graph = Arc::new(pkg_dep_graph); let engine = Arc::new(engine); let visitor = Visitor::new( diff --git a/crates/turborepo-lib/src/run/scope/filter.rs b/crates/turborepo-lib/src/run/scope/filter.rs index a46e5b159adda..af44783a48292 100644 --- a/crates/turborepo-lib/src/run/scope/filter.rs +++ b/crates/turborepo-lib/src/run/scope/filter.rs @@ -5,7 +5,7 @@ use std::{ }; use tracing::debug; -use turbopath::{AbsoluteSystemPath, AnchoredSystemPathBuf, RelativeUnixPath}; +use turbopath::{AbsoluteSystemPath, AnchoredSystemPathBuf}; use turborepo_scm::SCM; use wax::Pattern;