From dde81045bf629968842b6f23358807fde7d177e0 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Thu, 20 Nov 2025 06:28:37 -0800 Subject: [PATCH 1/4] Make logs from `spawn_exit_monitor` part of `ProcStatus` (#1935) Summary: As title. This will be especially helpful when the process is forcefully killed by a signal, and the `Proc` did not get a chance to log any exit status. Reviewed By: shayne-fletcher Differential Revision: D87345385 --- hyperactor_mesh/src/bootstrap.rs | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index b9a4ca61f..15021d587 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -1741,18 +1741,24 @@ impl BootstrapProcManager { if let Some(sig) = status.signal() { let _ = handle.mark_killed(sig, status.core_dumped()); let pid_str = remove_from_pid_table(); - tracing::debug!( + tracing::info!( + name = "ProcStatus", + status = "Exited::KilledBySignal", + %proc_id, tail = tail_str, - "proc {proc_id} killed by signal {sig}; proc's pid: {pid_str}" + "killed by signal {sig}; proc's pid: {pid_str}" ); } else if let Some(code) = status.code() { let _ = handle.mark_stopped(code, stderr_tail); let pid_str = remove_from_pid_table(); - if code == 0 { - tracing::debug!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited; proc's pid: {pid_str}"); - } else { - tracing::info!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited; proc's pid: {pid_str}"); - } + tracing::info!( + name = "ProcStatus", + status = "Exited::ExitWithCode", + %proc_id, + exit_code = code, + tail = tail_str, + "proc exited; proc's pid: {pid_str}" + ); } else { debug_assert!( false, @@ -1760,9 +1766,12 @@ impl BootstrapProcManager { ); let _ = handle.mark_failed("process exited with unknown status"); let pid_str = remove_from_pid_table(); - tracing::error!( + tracing::info!( + name = "ProcStatus", + status = "Exited::Unknown", + %proc_id, tail = tail_str, - "proc {proc_id} unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}" + "unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}" ); } } @@ -1770,6 +1779,9 @@ impl BootstrapProcManager { let _ = handle.mark_failed(format!("wait_inner() failed: {e}")); let pid_str = remove_from_pid_table(); tracing::info!( + name = "ProcStatus", + status = "Exited::WaitFailed", + %proc_id, tail = tail_str, "proc {proc_id} wait failed; proc's pid: {pid_str}" ); From 03eb4ff47d21dcf09b639a029a614974e8ff26be Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Thu, 20 Nov 2025 06:28:37 -0800 Subject: [PATCH 2/4] Add event logs for HostMesh::shutdown and HostMesh::spawn (#1936) Summary: As title. Reviewed By: mariusae Differential Revision: D87366183 --- hyperactor_mesh/src/v1/host_mesh.rs | 60 +++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index b3c3c90b1..fc9b7a4e5 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -475,18 +475,32 @@ impl HostMesh { /// `BootstrapProcManager`. On drop, the manager walks its PID /// table and sends SIGKILL to any procs it spawned—tying proc /// lifetimes to their hosts and preventing leaks. + #[hyperactor::instrument(fields(mesh_name=self.name.to_string()))] pub async fn shutdown(&self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> { - let mut attempted = 0; - let mut ok = 0; + tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt"); + let mut failed_hosts = vec![]; for host in self.current_ref.values() { - attempted += 1; if let Err(e) = host.shutdown(cx).await { - tracing::warn!(host = %host, error = %e, "host shutdown failed"); - } else { - ok += 1; + tracing::warn!( + name = "HostMeshStatus", + status = "Shutdown::Host::Failed", + host = %host, + error = %e, + "host shutdown failed" + ); + failed_hosts.push(host); } } - tracing::info!(attempted, ok, "hostmesh shutdown summary"); + if failed_hosts.is_empty() { + tracing::info!(name = "HostMeshStatus", status = "Shutdown::Success"); + } else { + tracing::error!( + name = "HostMeshStatus", + status = "Shutdown::Failed", + "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}", + failed_hosts + ); + } Ok(()) } } @@ -711,7 +725,30 @@ impl HostMeshRef { name: &str, per_host: Extent, ) -> v1::Result { - self.spawn_inner(cx, Name::new(name), per_host).await + let proc_mesh_name = Name::new(name); + tracing::info!( + name = "HostMeshStatus", + status = "ProcMesh::Spawn::Attempt", + mesh_name = %self.name, + "spawning proc mesh {}", proc_mesh_name + ); + let result = self.spawn_inner(cx, proc_mesh_name.clone(), per_host).await; + if result.is_ok() { + tracing::info!( + name = "HostMeshStatus", + status = "ProcMesh::Spawn::Success", + mesh_name = %self.name, + "spawned proc mesh {}", proc_mesh_name + ); + } else { + tracing::error!( + name = "HostMeshStatus", + status = "ProcMesh::Spawn::Failed", + mesh_name = %self.name, + "failed to spawn proc mesh {}", proc_mesh_name + ); + } + result } #[hyperactor::instrument(fields(mesh_name=mesh_name.to_string()))] @@ -976,9 +1013,10 @@ impl HostMeshRef { mesh_name = %self.name, name = "HostMeshStatus", status = "ProcMesh::Stop::Sent", - "Sending Stop to host mesh {} for {:?} procs", - self.name, - proc_names + "sending Stop to proc mesh {} for {} procs: {}", + proc_mesh_name, + proc_names.len(), + proc_names.iter().map(|n| n.to_string()).collect::>().join(", ") ); let start_time = RealClock.now(); From ccda3959d0eb64f2fec00ac7758529fb37915987 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Thu, 20 Nov 2025 06:28:37 -0800 Subject: [PATCH 3/4] Replace field mesh_name with host_mesh and proc_mesh (#1937) Summary: Right now I use `mesh_name` for both proc and host mesh. I find it is not adequate, since for many logs, I could have both fields. This diff deprecates `mesh_name`, and add two new columns: `proc_mesh` and `host_mesh`. Reviewed By: shayne-fletcher Differential Revision: D87374168 --- hyperactor_mesh/src/v1/host_mesh.rs | 86 ++++++++++++++--------------- hyperactor_mesh/src/v1/proc_mesh.rs | 10 ++-- 2 files changed, 45 insertions(+), 51 deletions(-) diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index fc9b7a4e5..5b5edebb4 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -371,7 +371,7 @@ impl HostMesh { } // Use allocate_inner to set field mesh_name in span - #[hyperactor::instrument(fields(mesh_name=name.to_string()))] + #[hyperactor::instrument(fields(host_mesh=name.to_string()))] async fn allocate_inner( cx: &impl context::Actor, alloc: Box, @@ -475,7 +475,7 @@ impl HostMesh { /// `BootstrapProcManager`. On drop, the manager walks its PID /// table and sends SIGKILL to any procs it spawned—tying proc /// lifetimes to their hosts and preventing leaks. - #[hyperactor::instrument(fields(mesh_name=self.name.to_string()))] + #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))] pub async fn shutdown(&self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> { tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt"); let mut failed_hosts = vec![]; @@ -534,7 +534,7 @@ impl Drop for HostMesh { fn drop(&mut self) { tracing::info!( name = "HostMeshStatus", - mesh_name = %self.name, + host_mesh = %self.name, status = "Dropping", ); // Snapshot the owned hosts we're responsible for. @@ -556,7 +556,7 @@ impl Drop for HostMesh { handle.spawn(async move { let span = tracing::info_span!( "hostmesh_drop_cleanup", - %mesh_name, + host_mesh = %mesh_name, allocation = %allocation_label, hosts = hosts.len(), ); @@ -619,7 +619,7 @@ impl Drop for HostMesh { // No runtime here; PDEATHSIG and manager Drop remain the // last-resort safety net. tracing::warn!( - mesh_name = %self.name, + host_mesh = %self.name, hosts = hosts.len(), "HostMesh dropped without a tokio runtime; skipping best-effort shutdown" ); @@ -627,7 +627,7 @@ impl Drop for HostMesh { tracing::info!( name = "HostMeshStatus", - mesh_name = %self.name, + host_mesh = %self.name, status = "Dropped", ); } @@ -725,37 +725,36 @@ impl HostMeshRef { name: &str, per_host: Extent, ) -> v1::Result { - let proc_mesh_name = Name::new(name); - tracing::info!( - name = "HostMeshStatus", - status = "ProcMesh::Spawn::Attempt", - mesh_name = %self.name, - "spawning proc mesh {}", proc_mesh_name - ); - let result = self.spawn_inner(cx, proc_mesh_name.clone(), per_host).await; - if result.is_ok() { - tracing::info!( - name = "HostMeshStatus", - status = "ProcMesh::Spawn::Success", - mesh_name = %self.name, - "spawned proc mesh {}", proc_mesh_name - ); - } else { - tracing::error!( - name = "HostMeshStatus", - status = "ProcMesh::Spawn::Failed", - mesh_name = %self.name, - "failed to spawn proc mesh {}", proc_mesh_name - ); + self.spawn_inner(cx, Name::new(name), per_host).await + } + + #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))] + async fn spawn_inner( + &self, + cx: &impl context::Actor, + proc_mesh_name: Name, + per_host: Extent, + ) -> v1::Result { + tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt"); + tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",); + let result = self.spawn_inner_inner(cx, proc_mesh_name, per_host).await; + match &result { + Ok(_) => { + tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success"); + tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success"); + } + Err(error) => { + tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error); + tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error); + } } result } - #[hyperactor::instrument(fields(mesh_name=mesh_name.to_string()))] - async fn spawn_inner( + async fn spawn_inner_inner( &self, cx: &impl context::Actor, - mesh_name: Name, + proc_mesh_name: Name, per_host: Extent, ) -> v1::Result { let per_host_labels = per_host.labels().iter().collect::>(); @@ -807,7 +806,7 @@ impl HostMeshRef { for (host_rank, host) in self.ranks.iter().enumerate() { for per_host_rank in 0..per_host.num_ranks() { let create_rank = per_host.num_ranks() * host_rank + per_host_rank; - let proc_name = Name::new(format!("{}_{}", mesh_name.name(), per_host_rank)); + let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank)); proc_names.push(proc_name.clone()); host.mesh_agent() .create_or_update( @@ -939,8 +938,7 @@ impl HostMeshRef { } let mesh = - ProcMesh::create_owned_unchecked(cx, mesh_name, extent, self.clone(), procs).await; - tracing::info!(name = "ProcMeshStatus", status = "Spawn::Created",); + ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await; if let Ok(ref mesh) = mesh { // Spawn a unique mesh controller for each proc mesh, so the type of the // mesh can be preserved. @@ -957,6 +955,7 @@ impl HostMeshRef { &self.name } + #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))] pub(crate) async fn stop_proc_mesh( &self, cx: &impl hyperactor::context::Actor, @@ -1004,19 +1003,20 @@ impl HostMeshRef { tracing::info!( name = "ProcMeshStatus", - mesh_name = %proc_mesh_name, %proc_id, status = "Stop::Sent", ); } tracing::info!( - mesh_name = %self.name, name = "HostMeshStatus", status = "ProcMesh::Stop::Sent", - "sending Stop to proc mesh {} for {} procs: {}", - proc_mesh_name, + "sending Stop to proc mesh for {} procs: {}", proc_names.len(), - proc_names.iter().map(|n| n.to_string()).collect::>().join(", ") + proc_names + .iter() + .map(|n| n.to_string()) + .collect::>() + .join(", ") ); let start_time = RealClock.now(); @@ -1034,7 +1034,6 @@ impl HostMeshRef { if !all_stopped { tracing::error!( name = "ProcMeshStatus", - mesh_name = %proc_mesh_name, status = "FailedToStop", "failed to terminate proc mesh: {:?}", statuses, @@ -1044,11 +1043,7 @@ impl HostMeshRef { statuses, )); } - tracing::info!( - name = "ProcMeshStatus", - mesh_name = %proc_mesh_name, - status = "Stopped", - ); + tracing::info!(name = "ProcMeshStatus", status = "Stopped"); } Err(complete) => { // Fill remaining ranks with a timeout status via the @@ -1061,7 +1056,6 @@ impl HostMeshRef { ); tracing::error!( name = "ProcMeshStatus", - mesh_name = %proc_mesh_name, status = "StoppingTimeout", "failed to terminate proc mesh before timeout: {:?}", legacy, diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 2a0a5103e..09d8ea861 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -306,7 +306,7 @@ impl ProcMesh { } // Use allocate_inner to set field mesh_name in span - #[hyperactor::instrument(fields(mesh_name=name.to_string()))] + #[hyperactor::instrument(fields(proc_mesh=name.to_string()))] async fn allocate_inner( cx: &impl context::Actor, mut alloc: Box, @@ -328,7 +328,7 @@ impl ProcMesh { .instrument(tracing::info_span!( "ProcMeshStatus::Allocate::Initialize", alloc_id, - mesh_name = name.to_string() + proc_mesh = %name )) .await?; @@ -348,7 +348,7 @@ impl ProcMesh { tracing::info!( name = "ProcMeshStatus", status = "Allocate::ChannelServe", - mesh_name = name.to_string(), + proc_mesh = %name, %addr, "proc started listening on addr: {addr}" ); @@ -516,7 +516,7 @@ impl ProcMesh { stop.notify_one(); tracing::info!( name = "ProcMeshStatus", - mesh_name = %self.name, + proc_mesh = %self.name, alloc_name, status = "StoppingAlloc", "sending stop to alloc {alloc_name}; check its log for stop status", @@ -551,7 +551,7 @@ impl Drop for ProcMesh { fn drop(&mut self) { tracing::info!( name = "ProcMeshStatus", - mesh_name = %self.name, + proc_mesh = %self.name, status = "Dropped", ); } From 03e5d62b2d19cb3ab09ae605e6938fc9071b1233 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Thu, 20 Nov 2025 06:28:37 -0800 Subject: [PATCH 4/4] Add events for ActorMesh (#1938) Summary: As title. Reviewed By: mariusae, vidhyav Differential Revision: D87382588 --- hyperactor_mesh/src/v1/actor_mesh.rs | 12 ++++- hyperactor_mesh/src/v1/proc_mesh.rs | 66 ++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/hyperactor_mesh/src/v1/actor_mesh.rs b/hyperactor_mesh/src/v1/actor_mesh.rs index 63704f84a..a0ff67588 100644 --- a/hyperactor_mesh/src/v1/actor_mesh.rs +++ b/hyperactor_mesh/src/v1/actor_mesh.rs @@ -76,7 +76,7 @@ impl ActorMesh { /// Detach this mesh from the lifetime of `self`, and return its reference. pub(crate) fn detach(self) -> ActorMeshRef { - self.current_ref + self.current_ref.clone() } /// Stop actors on this mesh across all procs. @@ -113,6 +113,16 @@ impl Clone for ActorMesh { } } +impl Drop for ActorMesh { + fn drop(&mut self) { + tracing::info!( + name = "ActorMeshStatus", + actor_mesh = %self.name, + status = "Dropped", + ); + } +} + /// Influences paging behavior for the lazy cache. Smaller pages /// reduce over-allocation for sparse access; larger pages reduce the /// number of heap allocations for contiguous scans. diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 09d8ea861..2b3035415 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -688,6 +688,10 @@ impl ProcMeshRef { &self.name } + fn host_mesh_name(&self) -> Option<&Name> { + self.host_mesh.as_ref().map(|h| h.name()) + } + /// Returns the HostMeshRef that this ProcMeshRef might be backed by. /// Returns None if this ProcMeshRef is backed by an Alloc instead of a host mesh. pub fn hosts(&self) -> Option<&HostMeshRef> { @@ -875,12 +879,48 @@ impl ProcMeshRef { /// inside the `ActorMesh`. /// - `A::Params: RemoteMessage` - spawn parameters must be /// serializable and routable. + #[hyperactor::instrument(fields( + host_mesh=self.host_mesh_name().map(|n| n.to_string()), + proc_mesh=self.name.to_string(), + actor_mesh=name.to_string(), + ))] pub(crate) async fn spawn_with_name( &self, cx: &impl context::Actor, name: Name, params: &A::Params, ) -> v1::Result> + where + A::Params: RemoteMessage, + { + tracing::info!( + name = "ProcMeshStatus", + status = "ActorMesh::Spawn::Attempt", + ); + tracing::info!(name = "ActorMeshStatus", status = "Spawn::Attempt"); + let result = self.spawn_with_name_inner(cx, name, params).await; + match &result { + Ok(_) => { + tracing::info!( + name = "ProcMeshStatus", + status = "ActorMesh::Spawn::Success", + ); + tracing::info!(name = "ActorMeshStatus", status = "Spawn::Success"); + } + Err(error) => { + tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Spawn::Failed", %error); + tracing::error!(name = "ActorMeshStatus", status = "Spawn::Failed", %error); + } + } + result + } + + async fn spawn_with_name_inner( + &self, + cx: &impl context::Actor, + name: Name, + params: &A::Params, + ) -> v1::Result> where A::Params: RemoteMessage, { @@ -998,10 +1038,36 @@ impl ProcMeshRef { } /// Send stop actors message to all mesh agents for a specific mesh name + #[hyperactor::instrument(fields( + host_mesh = self.host_mesh_name().map(|n| n.to_string()), + proc_mesh = self.name.to_string(), + actor_mesh = mesh_name.to_string(), + ))] pub(crate) async fn stop_actor_by_name( &self, cx: &impl context::Actor, mesh_name: Name, + ) -> v1::Result<()> { + tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Attempt"); + tracing::info!(name = "ActorMeshStatus", status = "Stop::Attempt"); + let result = self.stop_actor_by_name_inner(cx, mesh_name).await; + match &result { + Ok(_) => { + tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Success"); + tracing::info!(name = "ActorMeshStatus", status = "Stop::Success"); + } + Err(error) => { + tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Failed", %error); + tracing::error!(name = "ActorMeshStatus", status = "Stop::Failed", %error); + } + } + result + } + + async fn stop_actor_by_name_inner( + &self, + cx: &impl context::Actor, + mesh_name: Name, ) -> v1::Result<()> { let region = self.region().clone(); let agent_mesh = self.agent_mesh();