From 1d2dc576d989b562ec00b565e349a53fc8d9c3b4 Mon Sep 17 00:00:00 2001 From: suo Date: Wed, 25 Jun 2025 19:28:55 -0700 Subject: [PATCH 1/6] [monarch] add test for process allocator child cleanup Test that if we kill the ProcessAllocator process, children will get cleaned up. This will be used to confirm our removal of HYPERACTOR_MANAGED_SUBPROCESS is ok. Differential Revision: [D77348271](https://our.internmc.facebook.com/intern/diff/D77348271/) [ghstack-poisoned] --- hyperactor_mesh/Cargo.toml | 13 +- hyperactor_mesh/test/process_allocator.rs | 91 ++++++++ .../test/process_allocator_cleanup.rs | 194 ++++++++++++++++++ 3 files changed, 296 insertions(+), 2 deletions(-) create mode 100644 hyperactor_mesh/test/process_allocator.rs create mode 100644 hyperactor_mesh/test/process_allocator_cleanup.rs diff --git a/hyperactor_mesh/Cargo.toml b/hyperactor_mesh/Cargo.toml index 9f6385e7b..cb52f1140 100644 --- a/hyperactor_mesh/Cargo.toml +++ b/hyperactor_mesh/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/hyperactor_mesh:[hyperactor_mesh,hyperactor_mesh_test_bootstrap] +# @generated by autocargo from //monarch/hyperactor_mesh:[hyperactor_mesh,hyperactor_mesh_test_bootstrap,process_allocator_cleanup,process_allocator_test_bin] [package] name = "hyperactor_mesh" @@ -11,11 +11,20 @@ license = "BSD-3-Clause" name = "hyperactor_mesh_test_bootstrap" path = "test/bootstrap.rs" +[[bin]] +name = "process_allocator_test_bin" +path = "test/process_allocator.rs" + +[[test]] +name = "process_allocator_cleanup" +path = "test/process_allocator_cleanup.rs" + [dependencies] anyhow = "1.0.98" async-trait = "0.1.86" bincode = "1.3.3" bitmaps = "3.2.1" +buck-resources = "1" enum-as-inner = "0.6.0" erased-serde = "0.3.27" futures = { version = "0.3.30", features = ["async-await", "compat"] } @@ -34,9 +43,9 @@ tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] } tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal", "sync", "time"] } tokio-util = { version = "0.7.15", features = ["full"] } tracing = { version = "0.1.41", features = ["attributes", "valuable"] } +tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] } [dev-dependencies] -buck-resources = "1" dir-diff = "0.3" maplit = "1.0" timed_test = { version = "0.0.0", path = "../timed_test" } diff --git a/hyperactor_mesh/test/process_allocator.rs b/hyperactor_mesh/test/process_allocator.rs new file mode 100644 index 000000000..8d612742b --- /dev/null +++ b/hyperactor_mesh/test/process_allocator.rs @@ -0,0 +1,91 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +/// Test binary for ProcessAllocator child process cleanup behavior. +/// This binary creates a ProcessAllocator and spawns several child processes, +/// then keeps running until killed. It's designed to test whether child +/// processes are properly cleaned up when the parent process is killed. +use std::time::Duration; + +use hyperactor_mesh::alloc::Alloc; +use hyperactor_mesh::alloc::AllocConstraints; +use hyperactor_mesh::alloc::AllocSpec; +use hyperactor_mesh::alloc::Allocator; +use hyperactor_mesh::alloc::ProcState; +use hyperactor_mesh::alloc::ProcessAllocator; +use ndslice::shape; +use tokio::process::Command; +use tokio::time::sleep; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing + tracing_subscriber::fmt::init(); + + println!("ProcessAllocator test binary starting..."); + + // Create a ProcessAllocator using the bootstrap binary + let bootstrap_path = buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap(); + + let cmd = Command::new(&bootstrap_path); + let mut allocator = ProcessAllocator::new(cmd); + + // Create an allocation with 4 child processes + let mut alloc = allocator + .allocate(AllocSpec { + shape: shape! { replica = 4 }, + constraints: AllocConstraints::default(), + }) + .await?; + + println!("Allocation created, waiting for children to start..."); + + // Wait for all children to be running + let mut running_count = 0; + while running_count < 4 { + match alloc.next().await { + Some(ProcState::Created { proc_id, coords }) => { + println!("Child process created: {:?} at {:?}", proc_id, coords); + } + Some(ProcState::Running { proc_id, addr, .. }) => { + println!("Child process running: {:?} at {:?}", proc_id, addr); + running_count += 1; + } + Some(ProcState::Stopped { proc_id, reason }) => { + println!("Child process stopped: {:?}, reason: {:?}", proc_id, reason); + } + Some(ProcState::Failed { + world_id, + description, + }) => { + println!( + "Allocation failed: {:?}, description: {}", + world_id, description + ); + return Err(format!("Allocation failed: {}", description).into()); + } + None => { + println!("No more allocation events"); + break; + } + } + } + + println!( + "All {} children are running. Parent PID: {}", + running_count, + std::process::id() + ); + + // Keep the process running indefinitely + // In the test, we'll kill this process and check if children are cleaned up + loop { + sleep(Duration::from_secs(1)).await; + println!("Parent process still alive, children should be running..."); + } +} diff --git a/hyperactor_mesh/test/process_allocator_cleanup.rs b/hyperactor_mesh/test/process_allocator_cleanup.rs new file mode 100644 index 000000000..814a5dfd3 --- /dev/null +++ b/hyperactor_mesh/test/process_allocator_cleanup.rs @@ -0,0 +1,194 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +use std::process::Command; +use std::process::Stdio; +use std::time::Duration; + +use nix::sys::signal::Signal; +use nix::sys::signal::{self}; +use nix::unistd::Pid; +use tokio::time::sleep; +use tokio::time::timeout; + +/// Integration test for ProcessAllocator child process cleanup behavior. +/// Tests that when a ProcessAllocator parent process is killed, its children +/// are properly cleaned up. +#[tokio::test] +async fn test_process_allocator_child_cleanup() { + let test_binary_path = buck_resources::get("monarch/hyperactor_mesh/test_bin").unwrap(); + println!("Starting test process allocator at: {:?}", test_binary_path); + + // Start the test process allocator + let mut child = Command::new(&test_binary_path) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("Failed to start test process allocator"); + + let parent_pid = child.id(); + println!("Parent process started with PID: {}", parent_pid); + + // Give the parent time to spawn its children + // We'll monitor the output to see when children are running + #[allow(clippy::disallowed_methods)] + sleep(Duration::from_secs(10)).await; + + // Get the list of child processes before killing the parent + let children_before = get_child_processes(parent_pid); + println!("Children before kill: {:?}", children_before); + + // Ensure we have some children processes + assert!( + !children_before.is_empty(), + "Expected child processes to be spawned, but none were found" + ); + + // Kill the parent process with SIGKILL + println!("Killing parent process with PID: {}", parent_pid); + child.kill().unwrap(); + + // Wait for the parent to be killed (in a blocking task since std::process::Child::wait is blocking) + let wait_future = tokio::task::spawn_blocking(move || child.wait()); + + let result = timeout(Duration::from_secs(5), wait_future).await; + match result { + Ok(Ok(exit_status)) => match exit_status { + Ok(status) => println!("Parent process exited with status: {:?}", status), + Err(e) => println!("Error waiting for parent process: {}", e), + }, + Ok(Err(_)) => println!("Error in spawn_blocking task"), + Err(_) => { + println!("Parent process did not exit within timeout"); + } + } + + // Give some time for children to be cleaned up + #[allow(clippy::disallowed_methods)] + sleep(Duration::from_secs(2)).await; + + // Check if children are still running + let children_after = get_child_processes(parent_pid); + println!("Children after kill: {:?}", children_after); + + // Check detailed process information for each child + for child_pid in &children_before { + let process_info = get_detailed_process_info(*child_pid); + println!("Child {} detailed info: {:?}", child_pid, process_info); + } + + // Wait much longer to see if children eventually exit due to channel hangup + println!("Waiting longer to see if children eventually exit due to channel hangup..."); + for i in 1..=12 { + // Wait up to 60 more seconds (5 second intervals) + #[allow(clippy::disallowed_methods)] + sleep(Duration::from_secs(5)).await; + let remaining = children_before + .iter() + .filter(|&&pid| is_process_running(pid)) + .count(); + println!( + "After {} seconds: {} children still running", + i * 5, + remaining + ); + + if remaining == 0 { + println!("SUCCESS: All children eventually exited due to channel hangup!"); + break; + } + } + + // Final check + let children_final = get_child_processes(parent_pid); + println!("Children final check: {:?}", children_final); + + // Verify that all children have been cleaned up + for child_pid in &children_before { + let is_running = is_process_running(*child_pid); + if is_running { + println!( + "WARNING: Child process {} is still running after parent was killed", + child_pid + ); + let process_info = get_detailed_process_info(*child_pid); + println!( + "Child {} detailed info after wait: {:?}", + child_pid, process_info + ); + } + } + + // The test passes if children are cleaned up + // If children are still running, we'll print warnings but not fail immediately + // since cleanup might take a bit more time + let remaining_children: Vec<_> = children_before + .iter() + .filter(|&&pid| is_process_running(pid)) + .collect(); + + if !remaining_children.is_empty() { + println!("=== TEST RESULT ==="); + println!("ProcessAllocator child cleanup test FAILED:"); + println!("Expected all child processes to be cleaned up when parent is killed,"); + println!( + "but {} children are still running after 65+ seconds: {:?}", + remaining_children.len(), + remaining_children + ); + + // Clean up the remaining children manually to be good citizens + for &&pid in &remaining_children { + println!("Manually killing child process: {}", pid); + let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL); + } + + panic!( + "ProcessAllocator child cleanup test failed - children were not cleaned up even after extended wait" + ); + } +} + +/// Get the list of child processes for a given parent PID +fn get_child_processes(parent_pid: u32) -> Vec { + let output = Command::new("pgrep") + .args(["-P", &parent_pid.to_string()]) + .output(); + + match output { + Ok(output) if output.status.success() => String::from_utf8_lossy(&output.stdout) + .lines() + .filter_map(|line| line.trim().parse::().ok()) + .collect(), + _ => Vec::new(), + } +} + +/// Check if a process with the given PID is still running +fn is_process_running(pid: u32) -> bool { + Command::new("kill") + .args(["-0", &pid.to_string()]) + .output() + .map(|output| output.status.success()) + .unwrap_or(false) +} + +/// Get detailed process information for debugging +fn get_detailed_process_info(pid: u32) -> Option { + Command::new("ps") + .args(["-p", &pid.to_string(), "-o", "pid,ppid,command"]) + .output() + .ok() + .and_then(|output| { + if output.status.success() { + Some(String::from_utf8_lossy(&output.stdout).to_string()) + } else { + None + } + }) +} From 5e34e809b899f6651b5153c91646017e18818fa6 Mon Sep 17 00:00:00 2001 From: suo Date: Thu, 26 Jun 2025 11:35:11 -0700 Subject: [PATCH 2/6] Update on "[monarch] add test for process allocator child cleanup" Test that if we kill the ProcessAllocator process, children will get cleaned up. This will be used to confirm our removal of HYPERACTOR_MANAGED_SUBPROCESS is ok. Differential Revision: [D77348271](https://our.internmc.facebook.com/intern/diff/D77348271/) [ghstack-poisoned] --- hyperactor_mesh/src/alloc/local.rs | 1 + hyperactor_mesh/src/alloc/mod.rs | 17 +- hyperactor_mesh/src/alloc/process.rs | 37 ++-- hyperactor_mesh/src/alloc/remoteprocess.rs | 30 ++- hyperactor_mesh/src/proc_mesh/mod.rs | 4 +- .../test/process_allocator_cleanup.rs | 194 ------------------ .../process_allocator_cleanup.rs | 161 +++++++++++++++ .../process_allocator_test_bin.rs} | 65 +++--- .../process_allocator_test_bootstrap.rs | 28 +++ 9 files changed, 279 insertions(+), 258 deletions(-) delete mode 100644 hyperactor_mesh/test/process_allocator_cleanup.rs create mode 100644 hyperactor_mesh/test/process_allocator_cleanup/process_allocator_cleanup.rs rename hyperactor_mesh/test/{process_allocator.rs => process_allocator_cleanup/process_allocator_test_bin.rs} (55%) create mode 100644 hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs diff --git a/hyperactor_mesh/src/alloc/local.rs b/hyperactor_mesh/src/alloc/local.rs index 6333c8627..6595e52e6 100644 --- a/hyperactor_mesh/src/alloc/local.rs +++ b/hyperactor_mesh/src/alloc/local.rs @@ -204,6 +204,7 @@ impl Alloc for LocalAlloc { let created = ProcState::Created { proc_id: proc_id.clone(), coords, + pid: 0, // Local allocator doesn't have real system PIDs }; self.queue.push_back(ProcState::Running { proc_id, diff --git a/hyperactor_mesh/src/alloc/mod.rs b/hyperactor_mesh/src/alloc/mod.rs index 56981ad81..da6b9aa34 100644 --- a/hyperactor_mesh/src/alloc/mod.rs +++ b/hyperactor_mesh/src/alloc/mod.rs @@ -96,6 +96,8 @@ pub enum ProcState { proc_id: ProcId, /// Its assigned coordinates (in the alloc's shape). coords: Vec, + /// The system process ID of the created child process. + pid: u32, }, /// A proc was started. Running { @@ -130,16 +132,21 @@ pub enum ProcState { impl fmt::Display for ProcState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, + coords, + pid, + } => { write!( f, - "{}: created at ({})", + "{}: created at ({}) with PID {}", proc_id, coords .iter() .map(|c| c.to_string()) .collect::>() - .join(",") + .join(","), + pid ) } ProcState::Running { proc_id, addr, .. } => { @@ -334,7 +341,9 @@ pub(crate) mod testing { let mut running = HashSet::new(); while running.len() != 4 { match alloc.next().await.unwrap() { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, coords, .. + } => { procs.insert(proc_id, coords); } ProcState::Running { proc_id, .. } => { diff --git a/hyperactor_mesh/src/alloc/process.rs b/hyperactor_mesh/src/alloc/process.rs index 862e72ad8..9401953ea 100644 --- a/hyperactor_mesh/src/alloc/process.rs +++ b/hyperactor_mesh/src/alloc/process.rs @@ -329,22 +329,29 @@ impl ProcessAlloc { description: message, }) } - Ok(mut process) => match self.ranks.assign(index) { - Err(_index) => { - tracing::info!("could not assign rank to {}", proc_id); - let _ = process.kill().await; - None - } - Ok(rank) => { - let (handle, monitor) = Child::monitored(process); - self.children.spawn(async move { (index, monitor.await) }); - self.active.insert(index, handle); - // Adjust for shape slice offset for non-zero shapes (sub-shapes). - let rank = rank + self.spec.shape.slice().offset(); - let coords = self.spec.shape.slice().coordinates(rank).unwrap(); - Some(ProcState::Created { proc_id, coords }) + Ok(mut process) => { + let pid = process.id().unwrap_or(0); + match self.ranks.assign(index) { + Err(_index) => { + tracing::info!("could not assign rank to {}", proc_id); + let _ = process.kill().await; + None + } + Ok(rank) => { + let (handle, monitor) = Child::monitored(process); + self.children.spawn(async move { (index, monitor.await) }); + self.active.insert(index, handle); + // Adjust for shape slice offset for non-zero shapes (sub-shapes). + let rank = rank + self.spec.shape.slice().offset(); + let coords = self.spec.shape.slice().coordinates(rank).unwrap(); + Some(ProcState::Created { + proc_id, + coords, + pid, + }) + } } - }, + } } } diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index 1cb1350df..470e0c338 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -936,7 +936,9 @@ impl Alloc for RemoteProcessAlloc { } break match update { - Some(ProcState::Created { proc_id, coords }) => { + Some(ProcState::Created { + proc_id, coords, .. + }) => { match self.project_proc_into_global_shape(&proc_id, &coords) { Ok(global_coords) => { tracing::debug!( @@ -947,6 +949,7 @@ impl Alloc for RemoteProcessAlloc { Some(ProcState::Created { proc_id, coords: global_coords, + pid: 0, // Remote processes don't track real PIDs in this context }) } Err(e) => { @@ -1059,10 +1062,13 @@ mod test { for i in 0..alloc_len { let proc_id = format!("test[{}]", i).parse().unwrap(); let coords = shape.slice().coordinates(i).unwrap(); - alloc - .expect_next() - .times(1) - .return_once(|| Some(ProcState::Created { proc_id, coords })); + alloc.expect_next().times(1).return_once(|| { + Some(ProcState::Created { + proc_id, + coords, + pid: 0, + }) + }); } for i in 0..alloc_len { let proc_id = format!("test[{}]", i).parse().unwrap(); @@ -1155,7 +1161,11 @@ mod test { while i < alloc_len { let m = rx.recv().await.unwrap(); match m { - RemoteProcessProcStateMessage::Update(ProcState::Created { proc_id, coords }) => { + RemoteProcessProcStateMessage::Update(ProcState::Created { + proc_id, + coords, + .. + }) => { let expected_proc_id = format!("test[{}]", i).parse().unwrap(); let expected_coords = spec.shape.slice().coordinates(i).unwrap(); assert_eq!(proc_id, expected_proc_id); @@ -1660,7 +1670,9 @@ mod test_alloc { let proc_state = alloc.next().await.unwrap(); tracing::debug!("test got message: {:?}", proc_state); match proc_state { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, coords, .. + } => { procs.insert(proc_id); proc_coords.insert(coords); } @@ -1904,7 +1916,9 @@ mod test_alloc { let proc_state = alloc.next().await.unwrap(); tracing::debug!("test got message: {:?}", proc_state); match proc_state { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, coords, .. + } => { procs.insert(proc_id); proc_coords.insert(coords); } diff --git a/hyperactor_mesh/src/proc_mesh/mod.rs b/hyperactor_mesh/src/proc_mesh/mod.rs index afbe42881..69a815ac9 100644 --- a/hyperactor_mesh/src/proc_mesh/mod.rs +++ b/hyperactor_mesh/src/proc_mesh/mod.rs @@ -108,7 +108,9 @@ impl ProcMesh { }; match state { - ProcState::Created { proc_id, coords } => { + ProcState::Created { + proc_id, coords, .. + } => { let rank = shape .slice() .location(&coords) diff --git a/hyperactor_mesh/test/process_allocator_cleanup.rs b/hyperactor_mesh/test/process_allocator_cleanup.rs deleted file mode 100644 index 814a5dfd3..000000000 --- a/hyperactor_mesh/test/process_allocator_cleanup.rs +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * All rights reserved. - * - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. - */ - -use std::process::Command; -use std::process::Stdio; -use std::time::Duration; - -use nix::sys::signal::Signal; -use nix::sys::signal::{self}; -use nix::unistd::Pid; -use tokio::time::sleep; -use tokio::time::timeout; - -/// Integration test for ProcessAllocator child process cleanup behavior. -/// Tests that when a ProcessAllocator parent process is killed, its children -/// are properly cleaned up. -#[tokio::test] -async fn test_process_allocator_child_cleanup() { - let test_binary_path = buck_resources::get("monarch/hyperactor_mesh/test_bin").unwrap(); - println!("Starting test process allocator at: {:?}", test_binary_path); - - // Start the test process allocator - let mut child = Command::new(&test_binary_path) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .expect("Failed to start test process allocator"); - - let parent_pid = child.id(); - println!("Parent process started with PID: {}", parent_pid); - - // Give the parent time to spawn its children - // We'll monitor the output to see when children are running - #[allow(clippy::disallowed_methods)] - sleep(Duration::from_secs(10)).await; - - // Get the list of child processes before killing the parent - let children_before = get_child_processes(parent_pid); - println!("Children before kill: {:?}", children_before); - - // Ensure we have some children processes - assert!( - !children_before.is_empty(), - "Expected child processes to be spawned, but none were found" - ); - - // Kill the parent process with SIGKILL - println!("Killing parent process with PID: {}", parent_pid); - child.kill().unwrap(); - - // Wait for the parent to be killed (in a blocking task since std::process::Child::wait is blocking) - let wait_future = tokio::task::spawn_blocking(move || child.wait()); - - let result = timeout(Duration::from_secs(5), wait_future).await; - match result { - Ok(Ok(exit_status)) => match exit_status { - Ok(status) => println!("Parent process exited with status: {:?}", status), - Err(e) => println!("Error waiting for parent process: {}", e), - }, - Ok(Err(_)) => println!("Error in spawn_blocking task"), - Err(_) => { - println!("Parent process did not exit within timeout"); - } - } - - // Give some time for children to be cleaned up - #[allow(clippy::disallowed_methods)] - sleep(Duration::from_secs(2)).await; - - // Check if children are still running - let children_after = get_child_processes(parent_pid); - println!("Children after kill: {:?}", children_after); - - // Check detailed process information for each child - for child_pid in &children_before { - let process_info = get_detailed_process_info(*child_pid); - println!("Child {} detailed info: {:?}", child_pid, process_info); - } - - // Wait much longer to see if children eventually exit due to channel hangup - println!("Waiting longer to see if children eventually exit due to channel hangup..."); - for i in 1..=12 { - // Wait up to 60 more seconds (5 second intervals) - #[allow(clippy::disallowed_methods)] - sleep(Duration::from_secs(5)).await; - let remaining = children_before - .iter() - .filter(|&&pid| is_process_running(pid)) - .count(); - println!( - "After {} seconds: {} children still running", - i * 5, - remaining - ); - - if remaining == 0 { - println!("SUCCESS: All children eventually exited due to channel hangup!"); - break; - } - } - - // Final check - let children_final = get_child_processes(parent_pid); - println!("Children final check: {:?}", children_final); - - // Verify that all children have been cleaned up - for child_pid in &children_before { - let is_running = is_process_running(*child_pid); - if is_running { - println!( - "WARNING: Child process {} is still running after parent was killed", - child_pid - ); - let process_info = get_detailed_process_info(*child_pid); - println!( - "Child {} detailed info after wait: {:?}", - child_pid, process_info - ); - } - } - - // The test passes if children are cleaned up - // If children are still running, we'll print warnings but not fail immediately - // since cleanup might take a bit more time - let remaining_children: Vec<_> = children_before - .iter() - .filter(|&&pid| is_process_running(pid)) - .collect(); - - if !remaining_children.is_empty() { - println!("=== TEST RESULT ==="); - println!("ProcessAllocator child cleanup test FAILED:"); - println!("Expected all child processes to be cleaned up when parent is killed,"); - println!( - "but {} children are still running after 65+ seconds: {:?}", - remaining_children.len(), - remaining_children - ); - - // Clean up the remaining children manually to be good citizens - for &&pid in &remaining_children { - println!("Manually killing child process: {}", pid); - let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL); - } - - panic!( - "ProcessAllocator child cleanup test failed - children were not cleaned up even after extended wait" - ); - } -} - -/// Get the list of child processes for a given parent PID -fn get_child_processes(parent_pid: u32) -> Vec { - let output = Command::new("pgrep") - .args(["-P", &parent_pid.to_string()]) - .output(); - - match output { - Ok(output) if output.status.success() => String::from_utf8_lossy(&output.stdout) - .lines() - .filter_map(|line| line.trim().parse::().ok()) - .collect(), - _ => Vec::new(), - } -} - -/// Check if a process with the given PID is still running -fn is_process_running(pid: u32) -> bool { - Command::new("kill") - .args(["-0", &pid.to_string()]) - .output() - .map(|output| output.status.success()) - .unwrap_or(false) -} - -/// Get detailed process information for debugging -fn get_detailed_process_info(pid: u32) -> Option { - Command::new("ps") - .args(["-p", &pid.to_string(), "-o", "pid,ppid,command"]) - .output() - .ok() - .and_then(|output| { - if output.status.success() { - Some(String::from_utf8_lossy(&output.stdout).to_string()) - } else { - None - } - }) -} diff --git a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_cleanup.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_cleanup.rs new file mode 100644 index 000000000..6e89d97d2 --- /dev/null +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_cleanup.rs @@ -0,0 +1,161 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Integration test for ProcessAllocator child process cleanup behavior. +//! Tests that when a ProcessAllocator parent process is killed, its children +//! are properly cleaned up. + +use std::process::Command; +use std::process::Stdio; +use std::time::Duration; +use std::time::Instant; + +use hyperactor_mesh::alloc::ProcState; +use nix::sys::signal::Signal; +use nix::sys::signal::{self}; +use nix::unistd::Pid; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::process::Command as TokioCommand; +use tokio::time::sleep; +use tokio::time::timeout; + +/// Test that ProcessAllocator children are cleaned up when parent is killed +#[tokio::test] +async fn test_process_allocator_child_cleanup() { + let test_binary_path = buck_resources::get("monarch/hyperactor_mesh/test_bin").unwrap(); + eprintln!("Starting test process allocator at: {:?}", test_binary_path); + + // Start the test process allocator with JSON output + let mut child = TokioCommand::new(&test_binary_path) + .stdout(Stdio::piped()) + // Let stderr through, for easier debugging + .spawn() + .expect("Failed to start test process allocator"); + + let parent_pid = child.id().expect("Failed to get child PID"); + eprintln!("Parent process started with PID: {}", parent_pid); + + // Set up stdout reader for JSON events + let stdout = child.stdout.take().expect("Failed to get stdout"); + let mut reader = BufReader::new(stdout).lines(); + + let expected_running_count = 4; // We know we're allocating 4 child processes + let mut running_count = 0; + let mut child_pids = Vec::new(); + + // Read events until we have enough running children + loop { + match timeout(Duration::from_secs(30), reader.next_line()).await { + Ok(Ok(Some(line))) => { + if let Ok(proc_state) = serde_json::from_str::(&line) { + eprintln!("Received ProcState: {:?}", proc_state); + + match proc_state { + ProcState::Created { pid, .. } => { + if pid != 0 { + child_pids.push(pid); + eprintln!("Collected child PID: {}", pid); + } + } + ProcState::Running { .. } => { + running_count += 1; + eprintln!( + "Child {} of {} is running", + running_count, expected_running_count + ); + + if running_count >= expected_running_count { + eprintln!("All {} children are running!", expected_running_count); + break; + } + } + ProcState::Failed { description, .. } => { + panic!("Allocation failed: {}", description); + } + _ => {} + } + } + } + Ok(Ok(None)) => { + eprintln!("Child process stdout closed"); + break; + } + Ok(Err(e)) => { + eprintln!("Error reading from child stdout: {}", e); + break; + } + Err(_) => { + eprintln!("Timeout waiting for child events"); + break; + } + } + } + + // Ensure we got all the running children we expected + assert_eq!( + running_count, expected_running_count, + "Expected {} running children but only got {}", + expected_running_count, running_count + ); + + // Ensure we collected PIDs from Created events + eprintln!("Collected child PIDs from Created events: {:?}", child_pids); + assert!( + !child_pids.is_empty(), + "No child PIDs were collected from Created events" + ); + + // Kill the parent process with SIGKILL + eprintln!("Killing parent process with PID: {}", parent_pid); + signal::kill(Pid::from_raw(parent_pid as i32), Signal::SIGKILL) + .expect("Failed to kill parent process"); + + // Wait for the parent to be killed + let wait_result = timeout(Duration::from_secs(5), child.wait()).await; + match wait_result { + Ok(Ok(status)) => eprintln!("Parent process exited with status: {:?}", status), + Ok(Err(e)) => panic!("Error waiting for parent process: {}", e), + Err(_) => { + panic!("Parent process did not exit within timeout"); + } + } + + eprintln!("Waiting longer to see if children eventually exit due to channel hangup..."); + let timeout = Duration::from_secs(60); + let start = Instant::now(); + + loop { + if Instant::now() - start >= timeout { + panic!("ProcessAllocator children not cleaned up after 60s"); + } + + #[allow(clippy::disallowed_methods)] + sleep(Duration::from_secs(2)).await; + + let still_running: Vec<_> = child_pids + .iter() + .filter(|&&pid| is_process_running(pid)) + .cloned() + .collect(); + + if still_running.is_empty() { + eprintln!("All children have exited!"); + return; + } + } +} + +/// Check if a process with the given PID is still running +fn is_process_running(pid: u32) -> bool { + Command::new("kill") + .args(["-0", &pid.to_string()]) + .output() + .map(|output| output.status.success()) + .unwrap_or(false) +} diff --git a/hyperactor_mesh/test/process_allocator.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs similarity index 55% rename from hyperactor_mesh/test/process_allocator.rs rename to hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs index 8d612742b..56b2af5ab 100644 --- a/hyperactor_mesh/test/process_allocator.rs +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs @@ -1,6 +1,6 @@ /* * Copyright (c) Meta Platforms, Inc. and affiliates. - * All rights reserved. +// * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. @@ -22,16 +22,25 @@ use ndslice::shape; use tokio::process::Command; use tokio::time::sleep; +fn emit_proc_state(state: &ProcState) { + if let Ok(json) = serde_json::to_string(state) { + println!("{}", json); + // Flush immediately to ensure parent can read events in real-time + use std::io::Write; + use std::io::{self}; + io::stdout().flush().unwrap(); + } +} + #[tokio::main] async fn main() -> Result<(), Box> { - // Initialize tracing - tracing_subscriber::fmt::init(); + // Initialize tracing to stderr to avoid interfering with JSON output + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .init(); - println!("ProcessAllocator test binary starting..."); - - // Create a ProcessAllocator using the bootstrap binary let bootstrap_path = buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap(); - + eprintln!("Bootstrap cmd: {:?}", bootstrap_path); let cmd = Command::new(&bootstrap_path); let mut allocator = ProcessAllocator::new(cmd); @@ -43,49 +52,33 @@ async fn main() -> Result<(), Box> { }) .await?; - println!("Allocation created, waiting for children to start..."); - // Wait for all children to be running let mut running_count = 0; while running_count < 4 { match alloc.next().await { - Some(ProcState::Created { proc_id, coords }) => { - println!("Child process created: {:?} at {:?}", proc_id, coords); - } - Some(ProcState::Running { proc_id, addr, .. }) => { - println!("Child process running: {:?} at {:?}", proc_id, addr); - running_count += 1; - } - Some(ProcState::Stopped { proc_id, reason }) => { - println!("Child process stopped: {:?}, reason: {:?}", proc_id, reason); - } - Some(ProcState::Failed { - world_id, - description, - }) => { - println!( - "Allocation failed: {:?}, description: {}", - world_id, description - ); - return Err(format!("Allocation failed: {}", description).into()); + Some(state) => { + emit_proc_state(&state); + + match &state { + ProcState::Running { .. } => { + running_count += 1; + } + ProcState::Failed { description, .. } => { + return Err(format!("Allocation failed: {}", description).into()); + } + _ => {} + } } None => { - println!("No more allocation events"); break; } } } - println!( - "All {} children are running. Parent PID: {}", - running_count, - std::process::id() - ); - // Keep the process running indefinitely // In the test, we'll kill this process and check if children are cleaned up loop { + #[allow(clippy::disallowed_methods)] sleep(Duration::from_secs(1)).await; - println!("Parent process still alive, children should be running..."); } } diff --git a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs new file mode 100644 index 000000000..75ed0c25f --- /dev/null +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs @@ -0,0 +1,28 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +/// A simple bootstrap binary that writes logs out to a file. This is useful for +/// debugging, as normally the ProcessAllocator children logs are piped back to +/// ProcessAllocator. When we are testing what happens when we sigkill ProcessAllocator, we want to see what is happening on the chilre +#[tokio::main] +async fn main() { + // Initialize tracing to a separate log file per child + let pid = std::process::id(); + let log_file_path = format!("/tmp/child_log{}", pid); + let log_file = std::fs::File::create(&log_file_path).expect("Failed to create log file"); + + tracing_subscriber::fmt() + .with_writer(log_file) + .with_ansi(false) // No color codes in file + .init(); + + // Let the test know where to find our logs + eprintln!("CHILD_LOG_FILE:{}: {}", pid, log_file_path); + + hyperactor_mesh::bootstrap_or_die().await; +} From f7567e39ab203fdbe2f896146e22ccb557571c8b Mon Sep 17 00:00:00 2001 From: suo Date: Thu, 26 Jun 2025 11:36:41 -0700 Subject: [PATCH 3/6] Update on "[monarch] add test for process allocator child cleanup" Test that if we kill the ProcessAllocator process, children will get cleaned up. This will be used to confirm our removal of HYPERACTOR_MANAGED_SUBPROCESS is ok. Differential Revision: [D77348271](https://our.internmc.facebook.com/intern/diff/D77348271/) [ghstack-poisoned] --- hyperactor_mesh/Cargo.toml | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/hyperactor_mesh/Cargo.toml b/hyperactor_mesh/Cargo.toml index cb52f1140..232fcf740 100644 --- a/hyperactor_mesh/Cargo.toml +++ b/hyperactor_mesh/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/hyperactor_mesh:[hyperactor_mesh,hyperactor_mesh_test_bootstrap,process_allocator_cleanup,process_allocator_test_bin] +# @generated by autocargo from //monarch/hyperactor_mesh:[hyperactor_mesh,hyperactor_mesh_test_bootstrap,process_allocator_cleanup,process_allocator_test_bin,process_allocator_test_bootstrap] [package] name = "hyperactor_mesh" @@ -13,11 +13,15 @@ path = "test/bootstrap.rs" [[bin]] name = "process_allocator_test_bin" -path = "test/process_allocator.rs" +path = "test/process_allocator_cleanup/process_allocator_test_bin.rs" + +[[bin]] +name = "process_allocator_test_bootstrap" +path = "test/process_allocator_cleanup/process_allocator_test_bootstrap.rs" [[test]] name = "process_allocator_cleanup" -path = "test/process_allocator_cleanup.rs" +path = "test/process_allocator_cleanup/process_allocator_cleanup.rs" [dependencies] anyhow = "1.0.98" @@ -37,6 +41,7 @@ nix = { version = "0.29.0", features = ["dir", "event", "hostname", "inotify", " rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1.0.185", features = ["derive", "rc"] } serde_bytes = "0.11" +serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] } tempfile = "3.15" thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] } From f154e44b8250723c270418ec56eb04fc8bf03b52 Mon Sep 17 00:00:00 2001 From: suo Date: Thu, 26 Jun 2025 12:24:58 -0700 Subject: [PATCH 4/6] Update on "[monarch] add test for process allocator child cleanup" Test that if we kill the ProcessAllocator process, children will get cleaned up. This will be used to confirm our removal of HYPERACTOR_MANAGED_SUBPROCESS is ok. Differential Revision: [D77348271](https://our.internmc.facebook.com/intern/diff/D77348271/) [ghstack-poisoned] --- .../process_allocator_cleanup/process_allocator_test_bin.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs index 56b2af5ab..8784dd2c1 100644 --- a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs @@ -1,6 +1,6 @@ /* * Copyright (c) Meta Platforms, Inc. and affiliates. -// * All rights reserved. + * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. From 5dbc3026180bb6d0162ecab5f8ec28be73618a79 Mon Sep 17 00:00:00 2001 From: suo Date: Thu, 26 Jun 2025 12:51:57 -0700 Subject: [PATCH 5/6] Update on "[monarch] add test for process allocator child cleanup" Test that if we kill the ProcessAllocator process, children will get cleaned up. This will be used to confirm our removal of HYPERACTOR_MANAGED_SUBPROCESS is ok. Differential Revision: [D77348271](https://our.internmc.facebook.com/intern/diff/D77348271/) [ghstack-poisoned] --- hyperactor_mesh/src/alloc/local.rs | 2 +- hyperactor_mesh/src/alloc/remoteprocess.rs | 40 ++++++++----------- .../process_allocator_test_bin.rs | 34 ++-------------- .../process_allocator_test_bootstrap.rs | 5 ++- 4 files changed, 23 insertions(+), 58 deletions(-) diff --git a/hyperactor_mesh/src/alloc/local.rs b/hyperactor_mesh/src/alloc/local.rs index 6595e52e6..5950eaa33 100644 --- a/hyperactor_mesh/src/alloc/local.rs +++ b/hyperactor_mesh/src/alloc/local.rs @@ -204,7 +204,7 @@ impl Alloc for LocalAlloc { let created = ProcState::Created { proc_id: proc_id.clone(), coords, - pid: 0, // Local allocator doesn't have real system PIDs + pid: std::process::id(), }; self.queue.push_back(ProcState::Running { proc_id, diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index 470e0c338..632a2d5a3 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -937,31 +937,23 @@ impl Alloc for RemoteProcessAlloc { break match update { Some(ProcState::Created { - proc_id, coords, .. - }) => { - match self.project_proc_into_global_shape(&proc_id, &coords) { - Ok(global_coords) => { - tracing::debug!( - "reprojected coords: {:?} -> {:?}", - coords, - global_coords - ); - Some(ProcState::Created { - proc_id, - coords: global_coords, - pid: 0, // Remote processes don't track real PIDs in this context - }) - } - Err(e) => { - tracing::error!( - "failed to project coords for proc: {}: {}", - proc_id, - e - ); - None - } + proc_id, + coords, + pid, + }) => match self.project_proc_into_global_shape(&proc_id, &coords) { + Ok(global_coords) => { + tracing::debug!("reprojected coords: {:?} -> {:?}", coords, global_coords); + Some(ProcState::Created { + proc_id, + coords: global_coords, + pid, + }) } - } + Err(e) => { + tracing::error!("failed to project coords for proc: {}: {}", proc_id, e); + None + } + }, Some(ProcState::Failed { world_id: _, diff --git a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs index 8784dd2c1..167b0b7f5 100644 --- a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs @@ -10,8 +10,6 @@ /// This binary creates a ProcessAllocator and spawns several child processes, /// then keeps running until killed. It's designed to test whether child /// processes are properly cleaned up when the parent process is killed. -use std::time::Duration; - use hyperactor_mesh::alloc::Alloc; use hyperactor_mesh::alloc::AllocConstraints; use hyperactor_mesh::alloc::AllocSpec; @@ -20,7 +18,6 @@ use hyperactor_mesh::alloc::ProcState; use hyperactor_mesh::alloc::ProcessAllocator; use ndslice::shape; use tokio::process::Command; -use tokio::time::sleep; fn emit_proc_state(state: &ProcState) { if let Ok(json) = serde_json::to_string(state) { @@ -52,33 +49,8 @@ async fn main() -> Result<(), Box> { }) .await?; - // Wait for all children to be running - let mut running_count = 0; - while running_count < 4 { - match alloc.next().await { - Some(state) => { - emit_proc_state(&state); - - match &state { - ProcState::Running { .. } => { - running_count += 1; - } - ProcState::Failed { description, .. } => { - return Err(format!("Allocation failed: {}", description).into()); - } - _ => {} - } - } - None => { - break; - } - } - } - - // Keep the process running indefinitely - // In the test, we'll kill this process and check if children are cleaned up - loop { - #[allow(clippy::disallowed_methods)] - sleep(Duration::from_secs(1)).await; + while let Some(state) = alloc.next().await { + emit_proc_state(&state); } + Ok(()) } diff --git a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs index 75ed0c25f..a9ad2266a 100644 --- a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bootstrap.rs @@ -8,7 +8,8 @@ /// A simple bootstrap binary that writes logs out to a file. This is useful for /// debugging, as normally the ProcessAllocator children logs are piped back to -/// ProcessAllocator. When we are testing what happens when we sigkill ProcessAllocator, we want to see what is happening on the chilre +/// ProcessAllocator. When we are testing what happens when we sigkill +/// ProcessAllocator, we want to see what is happening on the children. #[tokio::main] async fn main() { // Initialize tracing to a separate log file per child @@ -21,7 +22,7 @@ async fn main() { .with_ansi(false) // No color codes in file .init(); - // Let the test know where to find our logs + // Let the user know where to find our logs eprintln!("CHILD_LOG_FILE:{}: {}", pid, log_file_path); hyperactor_mesh::bootstrap_or_die().await; From cc154cdd64f0e17a8f19019717e77e1b30fdc481 Mon Sep 17 00:00:00 2001 From: suo Date: Thu, 26 Jun 2025 13:18:34 -0700 Subject: [PATCH 6/6] Update on "[monarch] add test for process allocator child cleanup" Test that if we kill the ProcessAllocator process, children will get cleaned up. This will be used to confirm our removal of HYPERACTOR_MANAGED_SUBPROCESS is ok. Differential Revision: [D77348271](https://our.internmc.facebook.com/intern/diff/D77348271/) [ghstack-poisoned] --- monarch_hyperactor/src/bin/process_allocator/common.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monarch_hyperactor/src/bin/process_allocator/common.rs b/monarch_hyperactor/src/bin/process_allocator/common.rs index b6e2c2752..b525b7008 100644 --- a/monarch_hyperactor/src/bin/process_allocator/common.rs +++ b/monarch_hyperactor/src/bin/process_allocator/common.rs @@ -139,12 +139,12 @@ mod tests { while created_ranks.len() < world_size || stopped_ranks.len() < world_size { let proc_state = alloc.next().await.unwrap(); match proc_state { - alloc::ProcState::Created { proc_id, coords: _ } => { + alloc::ProcState::Created { proc_id, .. } => { // alloc.next() will keep creating procs and incrementing rank id // so we mod the rank by world_size to map it to its logical rank created_ranks.insert(proc_id.rank() % world_size); } - alloc::ProcState::Stopped { proc_id, reason: _ } => { + alloc::ProcState::Stopped { proc_id, .. } => { stopped_ranks.insert(proc_id.rank() % world_size); } _ => {}