diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index 84b9fdc6b..439c7b276 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -359,10 +359,23 @@ pub trait Alloc { /// Stop this alloc and wait for all procs to stop. Call will /// block until all ProcState events have been drained. async fn stop_and_wait(&mut self) -> Result<(), AllocatorError> { + tracing::error!( + name = "AllocStatus", + alloc_name = %self.world_id(), + status = "StopAndWait", + ); self.stop().await?; while let Some(event) = self.next().await { - tracing::debug!("drained event: {:?}", event); + tracing::debug!( + alloc_name = %self.world_id(), + "drained event: {event:?}" + ); } + tracing::error!( + name = "AllocStatus", + alloc_name = %self.world_id(), + status = "Stopped", + ); Ok(()) } diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index 8557fb1bd..c56570b26 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -900,6 +900,7 @@ impl HostMeshRef { pub(crate) async fn stop_proc_mesh( &self, cx: &impl hyperactor::context::Actor, + proc_mesh_name: &Name, procs: impl IntoIterator, region: Region, ) -> anyhow::Result<()> { @@ -940,8 +941,18 @@ impl HostMeshRef { host.mesh_agent() .get_rank_status(cx, proc_name, port.bind()) .await?; + + tracing::info!( + name = "ProcMeshStatus", + mesh_name = %proc_mesh_name, + %proc_id, + status = "Stop::Sent", + ); } tracing::info!( + mesh_name = %self.name, + name = "HostMeshStatus", + status = "ProcMesh::Stop::Sent", "Sending Stop to host mesh {} for {:?} procs", self.name, proc_names @@ -960,11 +971,23 @@ impl HostMeshRef { Ok(statuses) => { let failed = statuses.values().any(|s| s.is_failure()); if failed { + tracing::error!( + name = "ProcMeshStatus", + mesh_name = %proc_mesh_name, + status = "FailedToStop", + "failed to terminate proc mesh: {:?}", + statuses, + ); return Err(anyhow::anyhow!( "failed to terminate proc mesh: {:?}", statuses, )); } + tracing::info!( + name = "ProcMeshStatus", + mesh_name = %proc_mesh_name, + status = "Stopped", + ); } Err(complete) => { // Fill remaining ranks with a timeout status via the @@ -975,8 +998,16 @@ impl HostMeshRef { Status::is_not_exist, num_ranks, ); + tracing::error!( + name = "ProcMeshStatus", + mesh_name = %proc_mesh_name, + status = "StoppingTimeout", + "failed to terminate proc mesh before timeout: {:?}", + legacy, + ); return Err(anyhow::anyhow!( - "failed to terminate proc mesh: {:?}", + "failed to terminate proc mesh {} before timeout: {:?}", + proc_mesh_name, legacy )); } diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 909a86315..6665a33d1 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -435,41 +435,52 @@ impl ProcMesh { let stop = Arc::new(Notify::new()); let extent = alloc.extent().clone(); + let alloc_name = alloc.world_id().to_string(); { let stop = Arc::clone(&stop); - let name = name.clone(); - - tokio::spawn(async move { - loop { - tokio::select! { - _ = stop.notified() => { - // If we are explicitly stopped, the alloc is torn down. - if let Err(e) = alloc.stop_and_wait().await { - tracing::error!("alloc {}: failed to stop: {}", name, e); - } - break; - } - // We are mostly just using this to drive allocation events. - proc_state = alloc.next() => { - match proc_state { - // The alloc was stopped. - None => break, - Some(proc_state) => { - tracing::info!("unmonitored allocation event for {}: {}", name, proc_state); + + tokio::spawn( + async move { + loop { + tokio::select! { + _ = stop.notified() => { + // If we are explicitly stopped, the alloc is torn down. + if let Err(error) = alloc.stop_and_wait().await { + tracing::error!( + name = "ProcMeshStatus", + alloc_name = %alloc.world_id(), + status = "FailedToStopAlloc", + %error, + ); } + break; } + // We are mostly just using this to drive allocation events. + proc_state = alloc.next() => { + match proc_state { + // The alloc was stopped. + None => break, + Some(proc_state) => { + tracing::debug!( + alloc_name = %alloc.world_id(), + "unmonitored allocation event: {}", proc_state); + } + } + } } } } - }.instrument(tracing::info_span!("alloc_monitor"))); + .instrument(tracing::info_span!("alloc_monitor")), + ); } let mesh = Self::create( cx, name, ProcMeshAllocation::Allocated { + alloc_name, stop, extent, ranks: Arc::new(ranks), @@ -497,15 +508,24 @@ impl ProcMesh { pub async fn stop(&mut self, cx: &impl context::Actor) -> anyhow::Result<()> { let region = self.region.clone(); match &mut self.allocation { - ProcMeshAllocation::Allocated { stop, .. } => { + ProcMeshAllocation::Allocated { + stop, alloc_name, .. + } => { stop.notify_one(); + tracing::info!( + name = "ProcMeshStatus", + mesh_name = %self.name, + alloc_name, + status = "StoppingAlloc", + "sending stop to alloc {alloc_name}; check its log for stop status", + ); Ok(()) } ProcMeshAllocation::Owned { hosts, .. } => { - let names = self.current_ref.proc_ids().collect::>(); + let procs = self.current_ref.proc_ids().collect::>(); // We use the proc mesh region rather than the host mesh region // because the host agent stores one entry per proc, not per host. - hosts.stop_proc_mesh(cx, names, region).await + hosts.stop_proc_mesh(cx, &self.name, procs, region).await } } } @@ -539,6 +559,9 @@ impl Drop for ProcMesh { enum ProcMeshAllocation { /// A mesh that has been allocated from an `Alloc`. Allocated { + // The name of the alloc from which this mesh was allocated. + alloc_name: String, + // A cancellation token used to stop the task keeping the alloc alive. stop: Arc,