diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 835c7f882..a1ed5bdb1 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use std::time::SystemTime; use async_trait::async_trait; +use enum_as_inner::EnumAsInner; use futures::FutureExt; use futures::future::BoxFuture; use serde::Deserialize; @@ -472,7 +473,16 @@ impl fmt::Display for Signal { } /// The runtime status of an actor. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)] +#[derive( + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Clone, + Named, + EnumAsInner +)] pub enum ActorStatus { /// The actor status is unknown. Unknown, @@ -505,12 +515,7 @@ pub enum ActorStatus { impl ActorStatus { /// Tells whether the status is a terminal state. pub fn is_terminal(&self) -> bool { - matches!(self, Self::Stopped | Self::Failed(_)) - } - - /// Tells whether the status represents a failure. - pub fn is_failed(&self) -> bool { - matches!(self, Self::Failed(_)) + self.is_stopped() || self.is_failed() } /// Create a generic failure status with the provided error message. diff --git a/hyperactor/src/channel.rs b/hyperactor/src/channel.rs index 288a3625a..d463868c6 100644 --- a/hyperactor/src/channel.rs +++ b/hyperactor/src/channel.rs @@ -828,8 +828,9 @@ impl Rx for ChannelRx { /// if the channel cannot be established. The underlying connection is /// dropped whenever the returned Tx is dropped. #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ChannelError`. +#[track_caller] pub fn dial(addr: ChannelAddr) -> Result, ChannelError> { - tracing::debug!(name = "dial", "dialing channel {}", addr); + tracing::debug!(name = "dial", caller = %Location::caller(), %addr, "dialing channel {}", addr); let inner = match addr { ChannelAddr::Local(port) => ChannelTxKind::Local(local::dial(port)?), ChannelAddr::Tcp(addr) => ChannelTxKind::Tcp(net::tcp::dial(addr)), diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 4d1d36c52..1880339ff 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -21,6 +21,7 @@ use std::hash::Hasher; use std::ops::Deref; use std::panic; use std::panic::AssertUnwindSafe; +use std::panic::Location; use std::pin::Pin; use std::sync::Arc; use std::sync::OnceLock; @@ -47,6 +48,7 @@ use tokio::sync::watch; use tokio::task::JoinHandle; use tracing::Instrument; use tracing::Level; +use tracing::Span; use uuid::Uuid; use crate as hyperactor; @@ -54,6 +56,7 @@ use crate::Actor; use crate::ActorRef; use crate::Handler; use crate::Message; +use crate::Named as _; use crate::RemoteMessage; use crate::actor::ActorError; use crate::actor::ActorErrorKind; @@ -149,6 +152,19 @@ struct ProcState { clock: ClockKind, } +impl Drop for ProcState { + fn drop(&mut self) { + // We only want log ProcStatus::Dropped when ProcState is dropped, + // rather than Proc is dropped. This is because we need to wait for + // Proc::inner's ref count becomes 0. + tracing::info!( + proc_id = %self.proc_id, + name = "ProcStatus", + status = "Dropped" + ); + } +} + /// A snapshot view of the proc's actor ledger. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct ActorLedgerSnapshot { @@ -334,7 +350,6 @@ impl Proc { } /// Create a new direct-addressed proc. - #[tracing::instrument] pub async fn direct(addr: ChannelAddr, name: String) -> Result { let (addr, rx) = channel::serve(addr)?; let proc_id = ProcId::Direct(addr, name); @@ -344,7 +359,6 @@ impl Proc { } /// Create a new direct-addressed proc with a default sender for the forwarder. - #[tracing::instrument(skip(default))] pub fn direct_with_default( addr: ChannelAddr, name: String, @@ -366,6 +380,11 @@ impl Proc { forwarder: BoxedMailboxSender, clock: ClockKind, ) -> Self { + tracing::info!( + proc_id = %proc_id, + name = "ProcStatus", + status = "Created" + ); Self { inner: Arc::new(ProcState { proc_id, @@ -789,7 +808,6 @@ impl Proc { } /// Create a root allocation in the proc. - #[hyperactor::instrument(fields(actor_name=name))] fn allocate_root_id(&self, name: &str) -> Result { let name = name.to_string(); match self.state().roots.entry(name.to_string()) { @@ -1024,8 +1042,36 @@ impl Instance { /// Notify subscribers of a change in the actors status and bump counters with the duration which /// the last status was active for. + #[track_caller] fn change_status(&self, new: ActorStatus) { - self.status_tx.send_replace(new.clone()); + let old = self.status_tx.send_replace(new.clone()); + // Actor status changes between Idle and Processing when handling every + // message. It creates too many logs if we want to log these 2 states. + // Therefore we skip the status changes between them. + if !((old.is_idle() && new.is_processing()) || (old.is_processing() && new.is_idle())) { + let new_status = new.arm().unwrap_or("unknown"); + let change_reason = match new { + ActorStatus::Failed(reason) => reason.to_string(), + _ => "".to_string(), + }; + tracing::info!( + name = "ActorStatus", + actor_id = %self.self_id(), + actor_name = self.self_id().name(), + status = new_status, + prev_status = old.arm().unwrap_or("unknown"), + caller = %Location::caller(), + change_reason, + ); + } + } + + fn is_terminal(&self) -> bool { + self.status_tx.borrow().is_terminal() + } + + fn is_stopped(&self) -> bool { + self.status_tx.borrow().is_stopped() } /// This instance's actor ID. @@ -1091,9 +1137,14 @@ impl Instance { let instance_cell = self.cell.clone(); let actor_id = self.cell.actor_id().clone(); let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone()); - let actor_task_handle = A::spawn_server_task(panic_handler::with_backtrace_tracking( - self.serve(actor, actor_loop_receivers, work_rx), - )); + let actor_task_handle = A::spawn_server_task( + panic_handler::with_backtrace_tracking(self.serve( + actor, + actor_loop_receivers, + work_rx, + )) + .instrument(Span::current()), + ); tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle); instance_cell .inner @@ -1119,23 +1170,34 @@ impl Instance { .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx) .await; - let (actor_status, event) = match result { - Ok(_) => (ActorStatus::Stopped, None), - Err(ActorError { - kind: box ActorErrorKind::UnhandledSupervisionEvent(box event), - .. - }) => (event.actor_status.clone(), Some(event)), + let event = match result { + Ok(_) => { + // actor should have been stopped by run_actor_tree + assert!(self.is_stopped()); + None + } Err(err) => { - let error_kind = ActorErrorKind::Generic(err.kind.to_string()); - ( - ActorStatus::Failed(error_kind.clone()), - Some(ActorSupervisionEvent::new( - self.cell.actor_id().clone(), - ActorStatus::Failed(error_kind), - None, - None, - )), - ) + match *err.kind { + ActorErrorKind::UnhandledSupervisionEvent(box event) => { + // Currently only terminated actors are allowed to raise supervision events. + // If we want to change that in the future, we need to modify the exit + // status here too, because we use event's actor_status as this actor's + // terminal status. + assert!(event.actor_status.is_terminal()); + self.change_status(event.actor_status.clone()); + Some(event) + } + _ => { + let error_kind = ActorErrorKind::Generic(err.kind.to_string()); + self.change_status(ActorStatus::Failed(error_kind.clone())); + Some(ActorSupervisionEvent::new( + self.cell.actor_id().clone(), + ActorStatus::Failed(error_kind), + None, + None, + )) + } + } } }; @@ -1164,7 +1226,6 @@ impl Instance { self.proc.handle_supervision_event(event); } } - self.change_status(actor_status); } /// Runs the actor, and manages its supervision tree. When the function returns, @@ -1207,10 +1268,16 @@ impl Instance { } }; - if let Err(ref err) = result { - tracing::error!("{}: actor failure: {}", self.self_id(), err); + match &result { + Ok(_) => assert!(self.is_stopped()), + Err(err) => { + tracing::error!("{}: actor failure: {}", self.self_id(), err); + assert!(!self.is_terminal()); + // Send Stopping instead of Failed, because we still need to + // unlink child actors. + self.change_status(ActorStatus::Stopping); + } } - self.change_status(ActorStatus::Stopping); // After this point, we know we won't spawn any more children, // so we can safely read the current child keys. @@ -1292,7 +1359,6 @@ impl Instance { } /// Initialize and run the actor until it fails or is stopped. - #[tracing::instrument(level = "info", skip_all, fields(actor_id = %self.self_id()))] async fn run( &mut self, actor: &mut A, @@ -1407,6 +1473,7 @@ impl Instance { } } + #[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))] async unsafe fn handle_message( &mut self, actor: &mut A, @@ -1436,18 +1503,12 @@ impl Instance { &headers, self.self_id().to_string(), ); - let span = tracing::debug_span!( - "actor_status", - actor_id = self.self_id().to_string(), - actor_name = self.self_id().name(), - name = self.cell.status().borrow().to_string(), - ); let context = Context::new(self, headers); // Pass a reference to the context to the handler, so that deref // coercion allows the `this` argument to be treated exactly like // &Instance. - actor.handle(&context, message).instrument(span).await + actor.handle(&context, message).await } // Spawn on child on this instance. Currently used only by cap::CanSpawn. @@ -1536,6 +1597,15 @@ impl Drop for Instance { if status.is_terminal() { false } else { + tracing::info!( + name = "ActorStatus", + actor_id = %self.self_id(), + actor_name = self.self_id().name(), + status = "Stopped", + prev_status = status.arm().unwrap_or("unknown"), + caller = %Location::caller(), + "Instance is dropped", + ); *status = ActorStatus::Stopped; true } diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index d11864266..84b9fdc6b 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -418,7 +418,11 @@ impl AllocExt for A { return Err(AllocatorError::Incomplete(self.extent().clone())); }; - let name = state.arm().unwrap_or("unknown"); + let name = tracing::Span::current() + .metadata() + .map(|m| m.name()) + .unwrap_or("initialize"); + let status = format!("ProcState:{}", state.arm().unwrap_or("unknown")); match state { ProcState::Created { @@ -427,17 +431,20 @@ impl AllocExt for A { let rank = point.rank(); if let Some(old_create_key) = created.insert(rank, create_key.clone()) { tracing::warn!( + name, + status, + rank, "rank {rank} reassigned from {old_create_key} to {create_key}" ); } tracing::info!( - name = name, - rank = rank, + name, + status, + rank, "proc with create key {}, rank {}: created", create_key, rank ); - // tracing::info!("created: {} rank {}: created", create_key, rank); } ProcState::Running { create_key, @@ -447,7 +454,9 @@ impl AllocExt for A { } => { let Some(rank) = created.rank(&create_key) else { tracing::warn!( - name = name, + name, + %proc_id, + status, "proc id {proc_id} with create key {create_key} \ is running, but was not created" ); @@ -463,14 +472,19 @@ impl AllocExt for A { if let Some(old_allocated_proc) = running.insert(*rank, allocated_proc.clone()) { tracing::warn!( - name = name, + name, + %proc_id, + status, + rank, "duplicate running notifications for {rank}: \ old:{old_allocated_proc}; \ new:{allocated_proc}" ) } tracing::info!( - name = name, + name, + %proc_id, + status, "proc {} rank {}: running at addr:{addr} mesh_agent:{mesh_agent}", proc_id, rank @@ -481,7 +495,8 @@ impl AllocExt for A { // ProcState::Failed to fail the whole allocation. ProcState::Stopped { create_key, reason } => { tracing::error!( - name = name, + name, + status, "allocation failed for proc with create key {}: {}", create_key, reason @@ -493,7 +508,8 @@ impl AllocExt for A { description, } => { tracing::error!( - name = name, + name, + status, "allocation failed for world {}: {}", world_id, description diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index a4342cdb8..ecf90af58 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -670,6 +670,16 @@ impl HostMeshRef { cx: &impl context::Actor, name: &str, per_host: Extent, + ) -> v1::Result { + self.spawn_inner(cx, Name::new(name), per_host).await + } + + #[hyperactor::instrument(fields(mesh_name=mesh_name.to_string()))] + async fn spawn_inner( + &self, + cx: &impl context::Actor, + mesh_name: Name, + per_host: Extent, ) -> v1::Result { let per_host_labels = per_host.labels().iter().collect::>(); let host_labels = self.region.labels().iter().collect::>(); @@ -690,7 +700,13 @@ impl HostMeshRef { .map_err(|err| v1::Error::ConfigurationError(err.into()))?; let region: Region = extent.clone().into(); - let mesh_name = Name::new(name); + + tracing::info!( + name = "ProcMeshStatus", + status = "Spawn::Attempt", + %region, + "spawning proc mesh" + ); let mut procs = Vec::new(); let num_ranks = region.num_ranks(); @@ -714,7 +730,7 @@ impl HostMeshRef { for (host_rank, host) in self.ranks.iter().enumerate() { for per_host_rank in 0..per_host.num_ranks() { let create_rank = per_host.num_ranks() * host_rank + per_host_rank; - let proc_name = Name::new(format!("{}_{}", name, per_host_rank)); + let proc_name = Name::new(format!("{}_{}", mesh_name.name(), per_host_rank)); proc_names.push(proc_name.clone()); host.mesh_agent() .create_or_update( @@ -743,8 +759,15 @@ impl HostMeshRef { format!("failed while querying proc status: {}", e), ) })?; + let proc_id = host.named_proc(&proc_name); + tracing::info!( + name = "ProcMeshStatus", + status = "Spawn::CreatingProc", + %proc_id, + rank = create_rank, + ); procs.push(ProcRef::new( - host.named_proc(&proc_name), + proc_id, create_rank, // TODO: specify or retrieve from state instead, to avoid attestation. ActorRef::attest(host.named_proc(&proc_name).actor_id("agent", 0)), @@ -803,6 +826,15 @@ impl HostMeshRef { }, }; + tracing::error!( + name = "ProcMeshStatus", + status = "Spawn::GetRankStatus", + rank = host_rank, + "rank {} is terminating with state: {}", + host_rank, + state + ); + return Err(v1::Error::ProcCreationError { state, host_rank, @@ -811,6 +843,12 @@ impl HostMeshRef { } } Err(complete) => { + tracing::error!( + name = "ProcMeshStatus", + status = "Spawn::GetRankStatus", + "timeout after {:?} when waiting for procs being created", + config::global::get(PROC_SPAWN_MAX_IDLE), + ); // Fill remaining ranks with a timeout status via the // legacy shim. let legacy = mesh_to_rankedvalues_with_default( @@ -823,7 +861,10 @@ impl HostMeshRef { } } - ProcMesh::create_owned_unchecked(cx, mesh_name, extent, self.clone(), procs).await + let mesh = + ProcMesh::create_owned_unchecked(cx, mesh_name, extent, self.clone(), procs).await; + tracing::info!(name = "ProcMeshStatus", status = "Spawn::Created",); + mesh } /// The name of the referenced host mesh. diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index ce8f0bc22..57d10ddd1 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -51,8 +51,6 @@ use serde::Deserialize; use serde::Serialize; use tokio::sync::Notify; use tracing::Instrument; -use tracing::Level; -use tracing::span; use crate::CommActor; use crate::alloc::Alloc; @@ -295,28 +293,40 @@ impl ProcMesh { /// Allocate a new ProcMesh from the provided alloc. /// Allocate does not require an owning actor because references are not owned. - #[tracing::instrument(skip_all)] #[track_caller] pub async fn allocate( cx: &impl context::Actor, - mut alloc: Box, + alloc: Box, name: &str, + ) -> v1::Result { + let caller = Location::caller(); + Self::allocate_inner(cx, alloc, Name::new(name), caller).await + } + + // Use allocate_inner to set field mesh_name in span + #[hyperactor::instrument(fields(mesh_name=name.to_string()))] + async fn allocate_inner( + cx: &impl context::Actor, + mut alloc: Box, + name: Name, + caller: &'static Location<'static>, ) -> v1::Result { let alloc_id = Self::alloc_counter().fetch_add(1, Ordering::Relaxed) + 1; tracing::info!( - name = "ProcMesh::Allocate::Attempt", + name = "ProcMeshStatus", + status = "Allocate::Attempt", + %caller, alloc_id, - caller = %Location::caller(), shape = ?alloc.shape(), "allocating proc mesh" ); let running = alloc .initialize() - .instrument(span!( - Level::INFO, - "ProcMesh::Allocate::Initialize", - alloc_id + .instrument(tracing::info_span!( + "ProcMeshStatus::Allocate::Initialize", + alloc_id, + mesh_name = name.to_string() )) .await?; @@ -330,17 +340,18 @@ impl ProcMesh { // First make sure we can serve the proc: let proc_channel_addr = { let _guard = - tracing::span!(Level::INFO, "allocate_serve_proc", proc_id = %proc.proc_id()) - .entered(); + tracing::info_span!("allocate_serve_proc", proc_id = %proc.proc_id()).entered(); let (addr, rx) = channel::serve(ChannelAddr::any(alloc.transport()))?; proc.clone().serve(rx); + tracing::info!( + name = "ProcMeshStatus", + status = "Allocate::ChannelServe", + mesh_name = name.to_string(), + %addr, + "proc started listening on addr: {addr}" + ); addr }; - tracing::info!( - name = "ProcMesh::Allocate::ChannelServe", - alloc_id = alloc_id, - "proc started listening on addr: {proc_channel_addr}" - ); let bind_allocated_procs = |router: &DialMailboxRouter| { // Route all of the allocated procs: @@ -423,7 +434,6 @@ impl ProcMesh { .collect(); let stop = Arc::new(Notify::new()); - let name = Name::new(name); let extent = alloc.extent().clone(); { @@ -453,10 +463,10 @@ impl ProcMesh { } } } - }); + }.instrument(tracing::info_span!("alloc_monitor"))); } - Self::create( + let mesh = Self::create( cx, name, ProcMeshAllocation::Allocated { @@ -466,14 +476,21 @@ impl ProcMesh { }, true, // alloc-based meshes support comm actors ) - .await + .await; + match &mesh { + Ok(_) => tracing::info!(name = "ProcMeshStatus", status = "Allocate::Created"), + Err(error) => { + tracing::info!(name = "ProcMeshStatus", status = "Allocate::Failed", %error) + } + } + mesh } /// Detach the proc mesh from the lifetime of `self`, and return its reference. - #[allow(dead_code)] + #[cfg(test)] pub(crate) fn detach(self) -> ProcMeshRef { // This also keeps the ProcMeshAllocation::Allocated alloc task alive. - self.current_ref + self.current_ref.clone() } /// Stop this mesh gracefully. @@ -508,6 +525,16 @@ impl Deref for ProcMesh { } } +impl Drop for ProcMesh { + fn drop(&mut self) { + tracing::info!( + name = "ProcMeshStatus", + mesh_name = %self.name, + status = "Dropped", + ); + } +} + /// Represents different ways ProcMeshes can be allocated. enum ProcMeshAllocation { /// A mesh that has been allocated from an `Alloc`.