Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions hyperactor_mesh/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1741,35 +1741,47 @@ impl BootstrapProcManager {
if let Some(sig) = status.signal() {
let _ = handle.mark_killed(sig, status.core_dumped());
let pid_str = remove_from_pid_table();
tracing::debug!(
tracing::info!(
name = "ProcStatus",
status = "Exited::KilledBySignal",
%proc_id,
tail = tail_str,
"proc {proc_id} killed by signal {sig}; proc's pid: {pid_str}"
"killed by signal {sig}; proc's pid: {pid_str}"
);
} else if let Some(code) = status.code() {
let _ = handle.mark_stopped(code, stderr_tail);
let pid_str = remove_from_pid_table();
if code == 0 {
tracing::debug!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited; proc's pid: {pid_str}");
} else {
tracing::info!(%proc_id, exit_code = code, tail = tail_str.as_deref(), "proc exited; proc's pid: {pid_str}");
}
tracing::info!(
name = "ProcStatus",
status = "Exited::ExitWithCode",
%proc_id,
exit_code = code,
tail = tail_str,
"proc exited; proc's pid: {pid_str}"
);
} else {
debug_assert!(
false,
"unreachable: process terminated with neither signal nor exit code"
);
let _ = handle.mark_failed("process exited with unknown status");
let pid_str = remove_from_pid_table();
tracing::error!(
tracing::info!(
name = "ProcStatus",
status = "Exited::Unknown",
%proc_id,
tail = tail_str,
"proc {proc_id} unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}"
"unknown exit: unreachable exit status (no code, no signal); proc's pid: {pid_str}"
);
}
}
Err(e) => {
let _ = handle.mark_failed(format!("wait_inner() failed: {e}"));
let pid_str = remove_from_pid_table();
tracing::info!(
name = "ProcStatus",
status = "Exited::WaitFailed",
%proc_id,
tail = tail_str,
"proc {proc_id} wait failed; proc's pid: {pid_str}"
);
Expand Down
12 changes: 11 additions & 1 deletion hyperactor_mesh/src/v1/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<A: Referable> ActorMesh<A> {

/// Detach this mesh from the lifetime of `self`, and return its reference.
pub(crate) fn detach(self) -> ActorMeshRef<A> {
self.current_ref
self.current_ref.clone()
}

/// Stop actors on this mesh across all procs.
Expand Down Expand Up @@ -113,6 +113,16 @@ impl<A: Referable> Clone for ActorMesh<A> {
}
}

impl<A: Referable> Drop for ActorMesh<A> {
fn drop(&mut self) {
tracing::info!(
name = "ActorMeshStatus",
actor_name = %self.name,
status = "Dropped",
);
}
}

/// Influences paging behavior for the lazy cache. Smaller pages
/// reduce over-allocation for sparse access; larger pages reduce the
/// number of heap allocations for contiguous scans.
Expand Down
88 changes: 60 additions & 28 deletions hyperactor_mesh/src/v1/host_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl HostMesh {
}

// Use allocate_inner to set field mesh_name in span
#[hyperactor::instrument(fields(mesh_name=name.to_string()))]
#[hyperactor::instrument(fields(host_mesh=name.to_string()))]
async fn allocate_inner(
cx: &impl context::Actor,
alloc: Box<dyn Alloc + Send + Sync>,
Expand Down Expand Up @@ -475,18 +475,32 @@ impl HostMesh {
/// `BootstrapProcManager`. On drop, the manager walks its PID
/// table and sends SIGKILL to any procs it spawned—tying proc
/// lifetimes to their hosts and preventing leaks.
#[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
pub async fn shutdown(&self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
let mut attempted = 0;
let mut ok = 0;
tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
let mut failed_hosts = vec![];
for host in self.current_ref.values() {
attempted += 1;
if let Err(e) = host.shutdown(cx).await {
tracing::warn!(host = %host, error = %e, "host shutdown failed");
} else {
ok += 1;
tracing::warn!(
name = "HostMeshStatus",
status = "Shutdown::Host::Failed",
host = %host,
error = %e,
"host shutdown failed"
);
failed_hosts.push(host);
}
}
tracing::info!(attempted, ok, "hostmesh shutdown summary");
if failed_hosts.is_empty() {
tracing::info!(name = "HostMeshStatus", status = "Shutdown::Success");
} else {
tracing::error!(
name = "HostMeshStatus",
status = "Shutdown::Failed",
"host mesh shutdown failed; check the logs of the failed hosts for details: {:?}",
failed_hosts
);
}
Ok(())
}
}
Expand Down Expand Up @@ -520,7 +534,7 @@ impl Drop for HostMesh {
fn drop(&mut self) {
tracing::info!(
name = "HostMeshStatus",
mesh_name = %self.name,
host_mesh = %self.name,
status = "Dropping",
);
// Snapshot the owned hosts we're responsible for.
Expand All @@ -542,7 +556,7 @@ impl Drop for HostMesh {
handle.spawn(async move {
let span = tracing::info_span!(
"hostmesh_drop_cleanup",
%mesh_name,
host_mesh = %mesh_name,
allocation = %allocation_label,
hosts = hosts.len(),
);
Expand Down Expand Up @@ -605,15 +619,15 @@ impl Drop for HostMesh {
// No runtime here; PDEATHSIG and manager Drop remain the
// last-resort safety net.
tracing::warn!(
mesh_name = %self.name,
host_mesh = %self.name,
hosts = hosts.len(),
"HostMesh dropped without a tokio runtime; skipping best-effort shutdown"
);
}

tracing::info!(
name = "HostMeshStatus",
mesh_name = %self.name,
host_mesh = %self.name,
status = "Dropped",
);
}
Expand Down Expand Up @@ -714,11 +728,33 @@ impl HostMeshRef {
self.spawn_inner(cx, Name::new(name), per_host).await
}

#[hyperactor::instrument(fields(mesh_name=mesh_name.to_string()))]
#[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
async fn spawn_inner(
&self,
cx: &impl context::Actor,
mesh_name: Name,
proc_mesh_name: Name,
per_host: Extent,
) -> v1::Result<ProcMesh> {
tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
let result = self.spawn_inner_inner(cx, proc_mesh_name, per_host).await;
match &result {
Ok(_) => {
tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
}
Err(error) => {
tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
}
}
result
}

async fn spawn_inner_inner(
&self,
cx: &impl context::Actor,
proc_mesh_name: Name,
per_host: Extent,
) -> v1::Result<ProcMesh> {
let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
Expand Down Expand Up @@ -770,7 +806,7 @@ impl HostMeshRef {
for (host_rank, host) in self.ranks.iter().enumerate() {
for per_host_rank in 0..per_host.num_ranks() {
let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
let proc_name = Name::new(format!("{}_{}", mesh_name.name(), per_host_rank));
let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank));
proc_names.push(proc_name.clone());
host.mesh_agent()
.create_or_update(
Expand Down Expand Up @@ -902,8 +938,7 @@ impl HostMeshRef {
}

let mesh =
ProcMesh::create_owned_unchecked(cx, mesh_name, extent, self.clone(), procs).await;
tracing::info!(name = "ProcMeshStatus", status = "Spawn::Created",);
ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await;
if let Ok(ref mesh) = mesh {
// Spawn a unique mesh controller for each proc mesh, so the type of the
// mesh can be preserved.
Expand All @@ -920,6 +955,7 @@ impl HostMeshRef {
&self.name
}

#[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
pub(crate) async fn stop_proc_mesh(
&self,
cx: &impl hyperactor::context::Actor,
Expand Down Expand Up @@ -967,18 +1003,20 @@ impl HostMeshRef {

tracing::info!(
name = "ProcMeshStatus",
mesh_name = %proc_mesh_name,
%proc_id,
status = "Stop::Sent",
);
}
tracing::info!(
mesh_name = %self.name,
name = "HostMeshStatus",
status = "ProcMesh::Stop::Sent",
"Sending Stop to host mesh {} for {:?} procs",
self.name,
"sending Stop to proc mesh for {} procs: {}",
proc_names.len(),
proc_names
.iter()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(", ")
);

let start_time = RealClock.now();
Expand All @@ -996,7 +1034,6 @@ impl HostMeshRef {
if !all_stopped {
tracing::error!(
name = "ProcMeshStatus",
mesh_name = %proc_mesh_name,
status = "FailedToStop",
"failed to terminate proc mesh: {:?}",
statuses,
Expand All @@ -1006,11 +1043,7 @@ impl HostMeshRef {
statuses,
));
}
tracing::info!(
name = "ProcMeshStatus",
mesh_name = %proc_mesh_name,
status = "Stopped",
);
tracing::info!(name = "ProcMeshStatus", status = "Stopped");
}
Err(complete) => {
// Fill remaining ranks with a timeout status via the
Expand All @@ -1023,7 +1056,6 @@ impl HostMeshRef {
);
tracing::error!(
name = "ProcMeshStatus",
mesh_name = %proc_mesh_name,
status = "StoppingTimeout",
"failed to terminate proc mesh before timeout: {:?}",
legacy,
Expand Down
Loading
Loading