diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index b9a4ca61f..15021d587 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -1741,18 +1741,24 @@ impl BootstrapProcManager { if let Some(sig) = status.signal() { let _ = handle.mark_killed(sig, status.core_dumped()); let pid_str = remove_from_pid_table(); - tracing::debug!( + tracing::info!( + name = "ProcStatus", + status = "Exited::KilledBySignal", + %proc_id, tail = tail_str, - "proc {proc_id} killed by signal {sig}; proc's pid: {pid_str}" + "killed by signal {sig}; proc's pid: {pid_str}" ); } else if let Some(code) = status.code() { let _ = handle.mark_stopped(code, stderr_tail); let pid_str = remove_from_pid_table(); - if code == 0 { - tracing::debug!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited; proc's pid: {pid_str}"); - } else { - tracing::info!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited; proc's pid: {pid_str}"); - } + tracing::info!( + name = "ProcStatus", + status = "Exited::ExitWithCode", + %proc_id, + exit_code = code, + tail = tail_str, + "proc exited; proc's pid: {pid_str}" + ); } else { debug_assert!( false, @@ -1760,9 +1766,12 @@ impl BootstrapProcManager { ); let _ = handle.mark_failed("process exited with unknown status"); let pid_str = remove_from_pid_table(); - tracing::error!( + tracing::info!( + name = "ProcStatus", + status = "Exited::Unknown", + %proc_id, tail = tail_str, - "proc {proc_id} unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}" + "unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}" ); } } @@ -1770,6 +1779,9 @@ impl BootstrapProcManager { let _ = handle.mark_failed(format!("wait_inner() failed: {e}")); let pid_str = remove_from_pid_table(); tracing::info!( + name = "ProcStatus", + status = "Exited::WaitFailed", + %proc_id, tail = tail_str, "proc {proc_id} wait failed; proc's pid: {pid_str}" ); diff --git a/hyperactor_mesh/src/v1/actor_mesh.rs b/hyperactor_mesh/src/v1/actor_mesh.rs index 63704f84a..a0ff67588 100644 --- a/hyperactor_mesh/src/v1/actor_mesh.rs +++ b/hyperactor_mesh/src/v1/actor_mesh.rs @@ -76,7 +76,7 @@ impl ActorMesh { /// Detach this mesh from the lifetime of `self`, and return its reference. pub(crate) fn detach(self) -> ActorMeshRef { - self.current_ref + self.current_ref.clone() } /// Stop actors on this mesh across all procs. @@ -113,6 +113,16 @@ impl Clone for ActorMesh { } } +impl Drop for ActorMesh { + fn drop(&mut self) { + tracing::info!( + name = "ActorMeshStatus", + actor_mesh = %self.name, + status = "Dropped", + ); + } +} + /// Influences paging behavior for the lazy cache. Smaller pages /// reduce over-allocation for sparse access; larger pages reduce the /// number of heap allocations for contiguous scans. diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index b3c3c90b1..5b5edebb4 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -371,7 +371,7 @@ impl HostMesh { } // Use allocate_inner to set field mesh_name in span - #[hyperactor::instrument(fields(mesh_name=name.to_string()))] + #[hyperactor::instrument(fields(host_mesh=name.to_string()))] async fn allocate_inner( cx: &impl context::Actor, alloc: Box, @@ -475,18 +475,32 @@ impl HostMesh { /// `BootstrapProcManager`. On drop, the manager walks its PID /// table and sends SIGKILL to any procs it spawned—tying proc /// lifetimes to their hosts and preventing leaks. + #[hyperactor::instrument(fields(host_mesh=self.name.to_string()))] pub async fn shutdown(&self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> { - let mut attempted = 0; - let mut ok = 0; + tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt"); + let mut failed_hosts = vec![]; for host in self.current_ref.values() { - attempted += 1; if let Err(e) = host.shutdown(cx).await { - tracing::warn!(host = %host, error = %e, "host shutdown failed"); - } else { - ok += 1; + tracing::warn!( + name = "HostMeshStatus", + status = "Shutdown::Host::Failed", + host = %host, + error = %e, + "host shutdown failed" + ); + failed_hosts.push(host); } } - tracing::info!(attempted, ok, "hostmesh shutdown summary"); + if failed_hosts.is_empty() { + tracing::info!(name = "HostMeshStatus", status = "Shutdown::Success"); + } else { + tracing::error!( + name = "HostMeshStatus", + status = "Shutdown::Failed", + "host mesh shutdown failed; check the logs of the failed hosts for details: {:?}", + failed_hosts + ); + } Ok(()) } } @@ -520,7 +534,7 @@ impl Drop for HostMesh { fn drop(&mut self) { tracing::info!( name = "HostMeshStatus", - mesh_name = %self.name, + host_mesh = %self.name, status = "Dropping", ); // Snapshot the owned hosts we're responsible for. @@ -542,7 +556,7 @@ impl Drop for HostMesh { handle.spawn(async move { let span = tracing::info_span!( "hostmesh_drop_cleanup", - %mesh_name, + host_mesh = %mesh_name, allocation = %allocation_label, hosts = hosts.len(), ); @@ -605,7 +619,7 @@ impl Drop for HostMesh { // No runtime here; PDEATHSIG and manager Drop remain the // last-resort safety net. tracing::warn!( - mesh_name = %self.name, + host_mesh = %self.name, hosts = hosts.len(), "HostMesh dropped without a tokio runtime; skipping best-effort shutdown" ); @@ -613,7 +627,7 @@ impl Drop for HostMesh { tracing::info!( name = "HostMeshStatus", - mesh_name = %self.name, + host_mesh = %self.name, status = "Dropped", ); } @@ -714,11 +728,33 @@ impl HostMeshRef { self.spawn_inner(cx, Name::new(name), per_host).await } - #[hyperactor::instrument(fields(mesh_name=mesh_name.to_string()))] + #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))] async fn spawn_inner( &self, cx: &impl context::Actor, - mesh_name: Name, + proc_mesh_name: Name, + per_host: Extent, + ) -> v1::Result { + tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt"); + tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",); + let result = self.spawn_inner_inner(cx, proc_mesh_name, per_host).await; + match &result { + Ok(_) => { + tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success"); + tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success"); + } + Err(error) => { + tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error); + tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error); + } + } + result + } + + async fn spawn_inner_inner( + &self, + cx: &impl context::Actor, + proc_mesh_name: Name, per_host: Extent, ) -> v1::Result { let per_host_labels = per_host.labels().iter().collect::>(); @@ -770,7 +806,7 @@ impl HostMeshRef { for (host_rank, host) in self.ranks.iter().enumerate() { for per_host_rank in 0..per_host.num_ranks() { let create_rank = per_host.num_ranks() * host_rank + per_host_rank; - let proc_name = Name::new(format!("{}_{}", mesh_name.name(), per_host_rank)); + let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank)); proc_names.push(proc_name.clone()); host.mesh_agent() .create_or_update( @@ -902,8 +938,7 @@ impl HostMeshRef { } let mesh = - ProcMesh::create_owned_unchecked(cx, mesh_name, extent, self.clone(), procs).await; - tracing::info!(name = "ProcMeshStatus", status = "Spawn::Created",); + ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await; if let Ok(ref mesh) = mesh { // Spawn a unique mesh controller for each proc mesh, so the type of the // mesh can be preserved. @@ -920,6 +955,7 @@ impl HostMeshRef { &self.name } + #[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))] pub(crate) async fn stop_proc_mesh( &self, cx: &impl hyperactor::context::Actor, @@ -967,18 +1003,20 @@ impl HostMeshRef { tracing::info!( name = "ProcMeshStatus", - mesh_name = %proc_mesh_name, %proc_id, status = "Stop::Sent", ); } tracing::info!( - mesh_name = %self.name, name = "HostMeshStatus", status = "ProcMesh::Stop::Sent", - "Sending Stop to host mesh {} for {:?} procs", - self.name, + "sending Stop to proc mesh for {} procs: {}", + proc_names.len(), proc_names + .iter() + .map(|n| n.to_string()) + .collect::>() + .join(", ") ); let start_time = RealClock.now(); @@ -996,7 +1034,6 @@ impl HostMeshRef { if !all_stopped { tracing::error!( name = "ProcMeshStatus", - mesh_name = %proc_mesh_name, status = "FailedToStop", "failed to terminate proc mesh: {:?}", statuses, @@ -1006,11 +1043,7 @@ impl HostMeshRef { statuses, )); } - tracing::info!( - name = "ProcMeshStatus", - mesh_name = %proc_mesh_name, - status = "Stopped", - ); + tracing::info!(name = "ProcMeshStatus", status = "Stopped"); } Err(complete) => { // Fill remaining ranks with a timeout status via the @@ -1023,7 +1056,6 @@ impl HostMeshRef { ); tracing::error!( name = "ProcMeshStatus", - mesh_name = %proc_mesh_name, status = "StoppingTimeout", "failed to terminate proc mesh before timeout: {:?}", legacy, diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 2a0a5103e..5b3c55328 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -306,7 +306,7 @@ impl ProcMesh { } // Use allocate_inner to set field mesh_name in span - #[hyperactor::instrument(fields(mesh_name=name.to_string()))] + #[hyperactor::instrument(fields(proc_mesh=name.to_string()))] async fn allocate_inner( cx: &impl context::Actor, mut alloc: Box, @@ -328,7 +328,7 @@ impl ProcMesh { .instrument(tracing::info_span!( "ProcMeshStatus::Allocate::Initialize", alloc_id, - mesh_name = name.to_string() + proc_mesh = %name )) .await?; @@ -348,7 +348,7 @@ impl ProcMesh { tracing::info!( name = "ProcMeshStatus", status = "Allocate::ChannelServe", - mesh_name = name.to_string(), + proc_mesh = %name, %addr, "proc started listening on addr: {addr}" ); @@ -516,7 +516,7 @@ impl ProcMesh { stop.notify_one(); tracing::info!( name = "ProcMeshStatus", - mesh_name = %self.name, + proc_mesh = %self.name, alloc_name, status = "StoppingAlloc", "sending stop to alloc {alloc_name}; check its log for stop status", @@ -551,7 +551,7 @@ impl Drop for ProcMesh { fn drop(&mut self) { tracing::info!( name = "ProcMeshStatus", - mesh_name = %self.name, + proc_mesh = %self.name, status = "Dropped", ); } @@ -688,6 +688,10 @@ impl ProcMeshRef { &self.name } + pub fn host_mesh_name(&self) -> Option<&Name> { + self.host_mesh.as_ref().map(|h| h.name()) + } + /// Returns the HostMeshRef that this ProcMeshRef might be backed by. /// Returns None if this ProcMeshRef is backed by an Alloc instead of a host mesh. pub fn hosts(&self) -> Option<&HostMeshRef> { @@ -875,12 +879,48 @@ impl ProcMeshRef { /// inside the `ActorMesh`. /// - `A::Params: RemoteMessage` - spawn parameters must be /// serializable and routable. + #[hyperactor::instrument(fields( + host_mesh=self.host_mesh_name().map(|n| n.to_string()), + proc_mesh=self.name.to_string(), + actor_mesh=name.to_string(), + ))] pub(crate) async fn spawn_with_name( &self, cx: &impl context::Actor, name: Name, params: &A::Params, ) -> v1::Result> + where + A::Params: RemoteMessage, + { + tracing::info!( + name = "ProcMeshStatus", + status = "ActorMesh::Spawn::Attempt", + ); + tracing::info!(name = "ActorMeshStatus", status = "Spawn::Attempt"); + let result = self.spawn_with_name_inner(cx, name, params).await; + match &result { + Ok(_) => { + tracing::info!( + name = "ProcMeshStatus", + status = "ActorMesh::Spawn::Success", + ); + tracing::info!(name = "ActorMeshStatus", status = "Spawn::Success"); + } + Err(error) => { + tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Spawn::Failed", %error); + tracing::error!(name = "ActorMeshStatus", status = "Spawn::Failed", %error); + } + } + result + } + + async fn spawn_with_name_inner( + &self, + cx: &impl context::Actor, + name: Name, + params: &A::Params, + ) -> v1::Result> where A::Params: RemoteMessage, { @@ -998,10 +1038,36 @@ impl ProcMeshRef { } /// Send stop actors message to all mesh agents for a specific mesh name + #[hyperactor::instrument(fields( + host_mesh = self.host_mesh_name().map(|n| n.to_string()), + proc_mesh = self.name.to_string(), + actor_mesh = mesh_name.to_string(), + ))] pub(crate) async fn stop_actor_by_name( &self, cx: &impl context::Actor, mesh_name: Name, + ) -> v1::Result<()> { + tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Attempt"); + tracing::info!(name = "ActorMeshStatus", status = "Stop::Attempt"); + let result = self.stop_actor_by_name_inner(cx, mesh_name).await; + match &result { + Ok(_) => { + tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Success"); + tracing::info!(name = "ActorMeshStatus", status = "Stop::Success"); + } + Err(error) => { + tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Failed", %error); + tracing::error!(name = "ActorMeshStatus", status = "Stop::Failed", %error); + } + } + result + } + + async fn stop_actor_by_name_inner( + &self, + cx: &impl context::Actor, + mesh_name: Name, ) -> v1::Result<()> { let region = self.region().clone(); let agent_mesh = self.agent_mesh(); diff --git a/monarch_hyperactor/src/v1/actor_mesh.rs b/monarch_hyperactor/src/v1/actor_mesh.rs index 7a71dae39..62f223a7c 100644 --- a/monarch_hyperactor/src/v1/actor_mesh.rs +++ b/monarch_hyperactor/src/v1/actor_mesh.rs @@ -406,7 +406,7 @@ fn actor_state_to_supervision_events( fn send_state_change( rank: usize, event: ActorSupervisionEvent, - mesh_name: &Name, + actor_mesh_name: &Name, owner: &Option>, is_owned: bool, is_proc_stopped: bool, @@ -416,7 +416,7 @@ fn send_state_change( ) where F: Fn(MeshFailure), { - let failure = MeshFailure::new(mesh_name, rank, event.clone()); + let failure = MeshFailure::new(actor_mesh_name, rank, event.clone()); // Any supervision event that is not a failure should not generate // call "unhandled". // This includes the Stopped status, which is a state that occurs when the @@ -426,30 +426,30 @@ fn send_state_change( let is_failed = event.is_error(); if is_failed { tracing::warn!( - name = "SupervisionEvent", - %mesh_name, + name = "ActorMeshStatus", + status = "SupervisionError", %event, - "detected supervision error on monitored mesh: name={mesh_name}", + "detected supervision error on monitored mesh: name={actor_mesh_name}", ); } else { tracing::debug!( - name = "SupervisionEvent", - %mesh_name, + name = "ActorMeshStatus", + status = "SupervisionEvent", %event, - "detected non-error supervision event on monitored mesh: name={mesh_name}", + "detected non-error supervision event on monitored mesh: name={actor_mesh_name}", ); } // Send a notification to the owning actor of this mesh, if there is one. if let Some(owner) = owner { if let Err(error) = owner.send(SupervisionFailureMessage { - mesh_name: mesh_name.to_string(), + mesh_name: actor_mesh_name.to_string(), rank, event: event.clone(), }) { tracing::warn!( - name = "SupervisionEvent", - %mesh_name, + name = "ActorMeshStatus", + status = "SupervisionError", %event, %error, "failed to send supervision event to owner {}: {}. dropping event", @@ -497,9 +497,14 @@ fn send_state_change( /// a message will be sent to "owner" if it is not None. If owner is None, /// then a panic will be raised instead to crash the client. /// * time_between_tasks 1trols how frequently to poll. +#[hyperactor::instrument_infallible(fields( + host_mesh=actor_mesh.proc_mesh().host_mesh_name().map(|n| n.to_string()), + proc_mesh=actor_mesh.proc_mesh().name().to_string(), + actor_mesh=actor_mesh.name().to_string(), +))] async fn actor_states_monitor( cx: &impl context::Actor, - mesh: ActorMeshRef, + actor_mesh: ActorMeshRef, owner: Option>, is_owned: bool, unhandled: F, @@ -526,7 +531,7 @@ async fn actor_states_monitor( _ = canceled.cancelled() => break, } // First check if the proc mesh is dead before trying to query their agents. - let proc_states = mesh.proc_mesh().proc_states(cx).await; + let proc_states = actor_mesh.proc_mesh().proc_states(cx).await; if let Err(e) = proc_states { send_state_change( 0, @@ -539,7 +544,7 @@ async fn actor_states_monitor( )), None, ), - mesh.name(), + actor_mesh.name(), &owner, is_owned, false, @@ -590,12 +595,12 @@ async fn actor_states_monitor( ActorSupervisionEvent::new( // Attribute this to the monitored actor, even if the underlying // cause is a proc_failure. We propagate the cause explicitly. - mesh.get(point.rank()).unwrap().actor_id().clone(), + actor_mesh.get(point.rank()).unwrap().actor_id().clone(), Some(display_name), actor_status, None, ), - mesh.name(), + actor_mesh.name(), &owner, is_owned, true, @@ -608,7 +613,7 @@ async fn actor_states_monitor( } // Now that we know the proc mesh is alive, check for actor state changes. - let events = mesh.actor_states(cx).await; + let events = actor_mesh.actor_states(cx).await; if let Err(e) = events { send_state_change( 0, @@ -621,7 +626,7 @@ async fn actor_states_monitor( )), None, ), - mesh.name(), + actor_mesh.name(), &owner, is_owned, false, @@ -649,7 +654,7 @@ async fn actor_states_monitor( send_state_change( rank, events[0].clone(), - mesh.name(), + actor_mesh.name(), &owner, is_owned, false, @@ -673,7 +678,7 @@ async fn actor_states_monitor( send_state_change( rank, events[0].clone(), - mesh.name(), + actor_mesh.name(), &owner, is_owned, false,