diff --git a/hyperactor_mesh/Cargo.toml b/hyperactor_mesh/Cargo.toml index 9f6385e7b..232fcf740 100644 --- a/hyperactor_mesh/Cargo.toml +++ b/hyperactor_mesh/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/hyperactor_mesh:[hyperactor_mesh,hyperactor_mesh_test_bootstrap] +# @generated by autocargo from //monarch/hyperactor_mesh:[hyperactor_mesh,hyperactor_mesh_test_bootstrap,process_allocator_cleanup,process_allocator_test_bin,process_allocator_test_bootstrap] [package] name = "hyperactor_mesh" @@ -11,11 +11,24 @@ license = "BSD-3-Clause" name = "hyperactor_mesh_test_bootstrap" path = "test/bootstrap.rs" +[[bin]] +name = "process_allocator_test_bin" +path = "test/process_allocator_cleanup/process_allocator_test_bin.rs" + +[[bin]] +name = "process_allocator_test_bootstrap" +path = "test/process_allocator_cleanup/process_allocator_test_bootstrap.rs" + +[[test]] +name = "process_allocator_cleanup" +path = "test/process_allocator_cleanup/process_allocator_cleanup.rs" + [dependencies] anyhow = "1.0.98" async-trait = "0.1.86" bincode = "1.3.3" bitmaps = "3.2.1" +buck-resources = "1" enum-as-inner = "0.6.0" erased-serde = "0.3.27" futures = { version = "0.3.30", features = ["async-await", "compat"] } @@ -28,15 +41,16 @@ nix = { version = "0.29.0", features = ["dir", "event", "hostname", "inotify", " rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1.0.185", features = ["derive", "rc"] } serde_bytes = "0.11" +serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] } tempfile = "3.15" thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] } tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal", "sync", "time"] } tokio-util = { version = "0.7.15", features = ["full"] } tracing = { version = "0.1.41", features = ["attributes", "valuable"] } +tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] } [dev-dependencies] -buck-resources = "1" dir-diff = "0.3" maplit = "1.0" timed_test = { version = "0.0.0", path = "../timed_test" } diff --git a/hyperactor_mesh/src/alloc/local.rs b/hyperactor_mesh/src/alloc/local.rs index 6333c8627..5950eaa33 100644 --- a/hyperactor_mesh/src/alloc/local.rs +++ b/hyperactor_mesh/src/alloc/local.rs @@ -204,6 +204,7 @@ impl Alloc for LocalAlloc { let created = ProcState::Created { proc_id: proc_id.clone(), coords, + pid: std::process::id(), }; self.queue.push_back(ProcState::Running { proc_id, diff --git a/hyperactor_mesh/src/alloc/mod.rs b/hyperactor_mesh/src/alloc/mod.rs index 56981ad81..da6b9aa34 100644 --- a/hyperactor_mesh/src/alloc/mod.rs +++ b/hyperactor_mesh/src/alloc/mod.rs @@ -96,6 +96,8 @@ pub enum ProcState { proc_id: ProcId, /// Its assigned coordinates (in the alloc's shape). coords: Vec, + /// The system process ID of the created child process. + pid: u32, }, /// A proc was started. Running { @@ -130,16 +132,21 @@ pub enum ProcState { impl fmt::Display for ProcState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, + coords, + pid, + } => { write!( f, - "{}: created at ({})", + "{}: created at ({}) with PID {}", proc_id, coords .iter() .map(|c| c.to_string()) .collect::>() - .join(",") + .join(","), + pid ) } ProcState::Running { proc_id, addr, .. } => { @@ -334,7 +341,9 @@ pub(crate) mod testing { let mut running = HashSet::new(); while running.len() != 4 { match alloc.next().await.unwrap() { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, coords, .. + } => { procs.insert(proc_id, coords); } ProcState::Running { proc_id, .. } => { diff --git a/hyperactor_mesh/src/alloc/process.rs b/hyperactor_mesh/src/alloc/process.rs index 862e72ad8..9401953ea 100644 --- a/hyperactor_mesh/src/alloc/process.rs +++ b/hyperactor_mesh/src/alloc/process.rs @@ -329,22 +329,29 @@ impl ProcessAlloc { description: message, }) } - Ok(mut process) => match self.ranks.assign(index) { - Err(_index) => { - tracing::info!("could not assign rank to {}", proc_id); - let _ = process.kill().await; - None - } - Ok(rank) => { - let (handle, monitor) = Child::monitored(process); - self.children.spawn(async move { (index, monitor.await) }); - self.active.insert(index, handle); - // Adjust for shape slice offset for non-zero shapes (sub-shapes). - let rank = rank + self.spec.shape.slice().offset(); - let coords = self.spec.shape.slice().coordinates(rank).unwrap(); - Some(ProcState::Created { proc_id, coords }) + Ok(mut process) => { + let pid = process.id().unwrap_or(0); + match self.ranks.assign(index) { + Err(_index) => { + tracing::info!("could not assign rank to {}", proc_id); + let _ = process.kill().await; + None + } + Ok(rank) => { + let (handle, monitor) = Child::monitored(process); + self.children.spawn(async move { (index, monitor.await) }); + self.active.insert(index, handle); + // Adjust for shape slice offset for non-zero shapes (sub-shapes). + let rank = rank + self.spec.shape.slice().offset(); + let coords = self.spec.shape.slice().coordinates(rank).unwrap(); + Some(ProcState::Created { + proc_id, + coords, + pid, + }) + } } - }, + } } } diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index 1cb1350df..632a2d5a3 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -936,29 +936,24 @@ impl Alloc for RemoteProcessAlloc { } break match update { - Some(ProcState::Created { proc_id, coords }) => { - match self.project_proc_into_global_shape(&proc_id, &coords) { - Ok(global_coords) => { - tracing::debug!( - "reprojected coords: {:?} -> {:?}", - coords, - global_coords - ); - Some(ProcState::Created { - proc_id, - coords: global_coords, - }) - } - Err(e) => { - tracing::error!( - "failed to project coords for proc: {}: {}", - proc_id, - e - ); - None - } + Some(ProcState::Created { + proc_id, + coords, + pid, + }) => match self.project_proc_into_global_shape(&proc_id, &coords) { + Ok(global_coords) => { + tracing::debug!("reprojected coords: {:?} -> {:?}", coords, global_coords); + Some(ProcState::Created { + proc_id, + coords: global_coords, + pid, + }) } - } + Err(e) => { + tracing::error!("failed to project coords for proc: {}: {}", proc_id, e); + None + } + }, Some(ProcState::Failed { world_id: _, @@ -1059,10 +1054,13 @@ mod test { for i in 0..alloc_len { let proc_id = format!("test[{}]", i).parse().unwrap(); let coords = shape.slice().coordinates(i).unwrap(); - alloc - .expect_next() - .times(1) - .return_once(|| Some(ProcState::Created { proc_id, coords })); + alloc.expect_next().times(1).return_once(|| { + Some(ProcState::Created { + proc_id, + coords, + pid: 0, + }) + }); } for i in 0..alloc_len { let proc_id = format!("test[{}]", i).parse().unwrap(); @@ -1155,7 +1153,11 @@ mod test { while i < alloc_len { let m = rx.recv().await.unwrap(); match m { - RemoteProcessProcStateMessage::Update(ProcState::Created { proc_id, coords }) => { + RemoteProcessProcStateMessage::Update(ProcState::Created { + proc_id, + coords, + .. + }) => { let expected_proc_id = format!("test[{}]", i).parse().unwrap(); let expected_coords = spec.shape.slice().coordinates(i).unwrap(); assert_eq!(proc_id, expected_proc_id); @@ -1660,7 +1662,9 @@ mod test_alloc { let proc_state = alloc.next().await.unwrap(); tracing::debug!("test got message: {:?}", proc_state); match proc_state { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, coords, .. + } => { procs.insert(proc_id); proc_coords.insert(coords); } @@ -1904,7 +1908,9 @@ mod test_alloc { let proc_state = alloc.next().await.unwrap(); tracing::debug!("test got message: {:?}", proc_state); match proc_state { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, coords, .. + } => { procs.insert(proc_id); proc_coords.insert(coords); } diff --git a/hyperactor_mesh/src/proc_mesh/mod.rs b/hyperactor_mesh/src/proc_mesh/mod.rs index afbe42881..69a815ac9 100644 --- a/hyperactor_mesh/src/proc_mesh/mod.rs +++ b/hyperactor_mesh/src/proc_mesh/mod.rs @@ -108,7 +108,9 @@ impl ProcMesh { }; match state { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, coords, .. + } => { let rank = shape .slice() .location(&coords) diff --git a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_cleanup.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_cleanup.rs new file mode 100644 index 000000000..6e89d97d2 --- /dev/null +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_cleanup.rs @@ -0,0 +1,161 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Integration test for ProcessAllocator child process cleanup behavior. +//! Tests that when a ProcessAllocator parent process is killed, its children +//! are properly cleaned up. + +use std::process::Command; +use std::process::Stdio; +use std::time::Duration; +use std::time::Instant; + +use hyperactor_mesh::alloc::ProcState; +use nix::sys::signal::Signal; +use nix::sys::signal::{self}; +use nix::unistd::Pid; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::process::Command as TokioCommand; +use tokio::time::sleep; +use tokio::time::timeout; + +/// Test that ProcessAllocator children are cleaned up when parent is killed +#[tokio::test] +async fn test_process_allocator_child_cleanup() { + let test_binary_path = buck_resources::get("monarch/hyperactor_mesh/test_bin").unwrap(); + eprintln!("Starting test process allocator at: {:?}", test_binary_path); + + // Start the test process allocator with JSON output + let mut child = TokioCommand::new(&test_binary_path) + .stdout(Stdio::piped()) + // Let stderr through, for easier debugging + .spawn() + .expect("Failed to start test process allocator"); + + let parent_pid = child.id().expect("Failed to get child PID"); + eprintln!("Parent process started with PID: {}", parent_pid); + + // Set up stdout reader for JSON events + let stdout = child.stdout.take().expect("Failed to get stdout"); + let mut reader = BufReader::new(stdout).lines(); + + let expected_running_count = 4; // We know we're allocating 4 child processes + let mut running_count = 0; + let mut child_pids = Vec::new(); + + // Read events until we have enough running children + loop { + match timeout(Duration::from_secs(30), reader.next_line()).await { + Ok(Ok(Some(line))) => { + if let Ok(proc_state) = serde_json::from_str::(&line) { + eprintln!("Received ProcState: {:?}", proc_state); + + match proc_state { + ProcState::Created { pid, .. } => { + if pid != 0 { + child_pids.push(pid); + eprintln!("Collected child PID: {}", pid); + } + } + ProcState::Running { .. } => { + running_count += 1; + eprintln!( + "Child {} of {} is running", + running_count, expected_running_count + ); + + if running_count >= expected_running_count { + eprintln!("All {} children are running!", expected_running_count); + break; + } + } + ProcState::Failed { description, .. } => { + panic!("Allocation failed: {}", description); + } + _ => {} + } + } + } + Ok(Ok(None)) => { + eprintln!("Child process stdout closed"); + break; + } + Ok(Err(e)) => { + eprintln!("Error reading from child stdout: {}", e); + break; + } + Err(_) => { + eprintln!("Timeout waiting for child events"); + break; + } + } + } + + // Ensure we got all the running children we expected + assert_eq!( + running_count, expected_running_count, + "Expected {} running children but only got {}", + expected_running_count, running_count + ); + + // Ensure we collected PIDs from Created events + eprintln!("Collected child PIDs from Created events: {:?}", child_pids); + assert!( + !child_pids.is_empty(), + "No child PIDs were collected from Created events" + ); + + // Kill the parent process with SIGKILL + eprintln!("Killing parent process with PID: {}", parent_pid); + signal::kill(Pid::from_raw(parent_pid as i32), Signal::SIGKILL) + .expect("Failed to kill parent process"); + + // Wait for the parent to be killed + let wait_result = timeout(Duration::from_secs(5), child.wait()).await; + match wait_result { + Ok(Ok(status)) => eprintln!("Parent process exited with status: {:?}", status), + Ok(Err(e)) => panic!("Error waiting for parent process: {}", e), + Err(_) => { + panic!("Parent process did not exit within timeout"); + } + } + + eprintln!("Waiting longer to see if children eventually exit due to channel hangup..."); + let timeout = Duration::from_secs(60); + let start = Instant::now(); + + loop { + if Instant::now() - start >= timeout { + panic!("ProcessAllocator children not cleaned up after 60s"); + } + + #[allow(clippy::disallowed_methods)] + sleep(Duration::from_secs(2)).await; + + let still_running: Vec<_> = child_pids + .iter() + .filter(|&&pid| is_process_running(pid)) + .cloned() + .collect(); + + if still_running.is_empty() { + eprintln!("All children have exited!"); + return; + } + } +} + +/// Check if a process with the given PID is still running +fn is_process_running(pid: u32) -> bool { + Command::new("kill") + .args(["-0", &pid.to_string()]) + .output() + .map(|output| output.status.success()) + .unwrap_or(false) +} diff --git a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs new file mode 100644 index 000000000..167b0b7f5 --- /dev/null +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs @@ -0,0 +1,56 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +/// Test binary for ProcessAllocator child process cleanup behavior. +/// This binary creates a ProcessAllocator and spawns several child processes, +/// then keeps running until killed. It's designed to test whether child +/// processes are properly cleaned up when the parent process is killed. +use hyperactor_mesh::alloc::Alloc; +use hyperactor_mesh::alloc::AllocConstraints; +use hyperactor_mesh::alloc::AllocSpec; +use hyperactor_mesh::alloc::Allocator; +use hyperactor_mesh::alloc::ProcState; +use hyperactor_mesh::alloc::ProcessAllocator; +use ndslice::shape; +use tokio::process::Command; + +fn emit_proc_state(state: &ProcState) { + if let Ok(json) = serde_json::to_string(state) { + println!("{}", json); + // Flush immediately to ensure parent can read events in real-time + use std::io::Write; + use std::io::{self}; + io::stdout().flush().unwrap(); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing to stderr to avoid interfering with JSON output + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .init(); + + let bootstrap_path = buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap(); + eprintln!("Bootstrap cmd: {:?}", bootstrap_path); + let cmd = Command::new(&bootstrap_path); + let mut allocator = ProcessAllocator::new(cmd); + + // Create an allocation with 4 child processes + let mut alloc = allocator + .allocate(AllocSpec { + shape: shape! { replica = 4 }, + constraints: AllocConstraints::default(), + }) + .await?; + + while let Some(state) = alloc.next().await { + emit_proc_state(&state); + } + Ok(()) +} diff --git a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs new file mode 100644 index 000000000..a9ad2266a --- /dev/null +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs @@ -0,0 +1,29 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +/// A simple bootstrap binary that writes logs out to a file. This is useful for +/// debugging, as normally the ProcessAllocator children logs are piped back to +/// ProcessAllocator. When we are testing what happens when we sigkill +/// ProcessAllocator, we want to see what is happening on the children. +#[tokio::main] +async fn main() { + // Initialize tracing to a separate log file per child + let pid = std::process::id(); + let log_file_path = format!("/tmp/child_log{}", pid); + let log_file = std::fs::File::create(&log_file_path).expect("Failed to create log file"); + + tracing_subscriber::fmt() + .with_writer(log_file) + .with_ansi(false) // No color codes in file + .init(); + + // Let the user know where to find our logs + eprintln!("CHILD_LOG_FILE:{}: {}", pid, log_file_path); + + hyperactor_mesh::bootstrap_or_die().await; +} diff --git a/monarch_hyperactor/src/bin/process_allocator/common.rs b/monarch_hyperactor/src/bin/process_allocator/common.rs index b6e2c2752..b525b7008 100644 --- a/monarch_hyperactor/src/bin/process_allocator/common.rs +++ b/monarch_hyperactor/src/bin/process_allocator/common.rs @@ -139,12 +139,12 @@ mod tests { while created_ranks.len() < world_size || stopped_ranks.len() < world_size { let proc_state = alloc.next().await.unwrap(); match proc_state { - alloc::ProcState::Created { proc_id, coords: _ } => { + alloc::ProcState::Created { proc_id, .. } => { // alloc.next() will keep creating procs and incrementing rank id // so we mod the rank by world_size to map it to its logical rank created_ranks.insert(proc_id.rank() % world_size); } - alloc::ProcState::Stopped { proc_id, reason: _ } => { + alloc::ProcState::Stopped { proc_id, .. } => { stopped_ranks.insert(proc_id.rank() % world_size); } _ => {}