diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 835c7f882..a1ed5bdb1 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use std::time::SystemTime; use async_trait::async_trait; +use enum_as_inner::EnumAsInner; use futures::FutureExt; use futures::future::BoxFuture; use serde::Deserialize; @@ -472,7 +473,16 @@ impl fmt::Display for Signal { } /// The runtime status of an actor. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)] +#[derive( + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Clone, + Named, + EnumAsInner +)] pub enum ActorStatus { /// The actor status is unknown. Unknown, @@ -505,12 +515,7 @@ pub enum ActorStatus { impl ActorStatus { /// Tells whether the status is a terminal state. pub fn is_terminal(&self) -> bool { - matches!(self, Self::Stopped | Self::Failed(_)) - } - - /// Tells whether the status represents a failure. - pub fn is_failed(&self) -> bool { - matches!(self, Self::Failed(_)) + self.is_stopped() || self.is_failed() } /// Create a generic failure status with the provided error message. diff --git a/hyperactor/src/channel.rs b/hyperactor/src/channel.rs index 288a3625a..d463868c6 100644 --- a/hyperactor/src/channel.rs +++ b/hyperactor/src/channel.rs @@ -828,8 +828,9 @@ impl Rx for ChannelRx { /// if the channel cannot be established. The underlying connection is /// dropped whenever the returned Tx is dropped. #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ChannelError`. +#[track_caller] pub fn dial(addr: ChannelAddr) -> Result, ChannelError> { - tracing::debug!(name = "dial", "dialing channel {}", addr); + tracing::debug!(name = "dial", caller = %Location::caller(), %addr, "dialing channel {}", addr); let inner = match addr { ChannelAddr::Local(port) => ChannelTxKind::Local(local::dial(port)?), ChannelAddr::Tcp(addr) => ChannelTxKind::Tcp(net::tcp::dial(addr)), diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 4d1d36c52..6417f4251 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -21,6 +21,7 @@ use std::hash::Hasher; use std::ops::Deref; use std::panic; use std::panic::AssertUnwindSafe; +use std::panic::Location; use std::pin::Pin; use std::sync::Arc; use std::sync::OnceLock; @@ -47,6 +48,7 @@ use tokio::sync::watch; use tokio::task::JoinHandle; use tracing::Instrument; use tracing::Level; +use tracing::Span; use uuid::Uuid; use crate as hyperactor; @@ -54,6 +56,7 @@ use crate::Actor; use crate::ActorRef; use crate::Handler; use crate::Message; +use crate::Named as _; use crate::RemoteMessage; use crate::actor::ActorError; use crate::actor::ActorErrorKind; @@ -789,7 +792,6 @@ impl Proc { } /// Create a root allocation in the proc. - #[hyperactor::instrument(fields(actor_name=name))] fn allocate_root_id(&self, name: &str) -> Result { let name = name.to_string(); match self.state().roots.entry(name.to_string()) { @@ -1024,8 +1026,36 @@ impl Instance { /// Notify subscribers of a change in the actors status and bump counters with the duration which /// the last status was active for. + #[track_caller] fn change_status(&self, new: ActorStatus) { - self.status_tx.send_replace(new.clone()); + let old = self.status_tx.send_replace(new.clone()); + // Actor status changes between Idle and Processing when handling every + // message. It creates too many logs if we want to log these 2 states. + // Therefore we skip the status changes between them. + if !((old.is_idle() && new.is_processing()) || (old.is_processing() && new.is_idle())) { + let new_status = new.arm().unwrap_or("unknown"); + let change_reason = match new { + ActorStatus::Failed(reason) => reason.to_string(), + _ => "".to_string(), + }; + tracing::info!( + name = "ActorStatus", + actor_id = %self.self_id(), + actor_name = self.self_id().name(), + status = new_status, + prev_status = old.arm().unwrap_or("unknown"), + caller = %Location::caller(), + change_reason, + ); + } + } + + fn is_terminal(&self) -> bool { + self.status_tx.borrow().is_terminal() + } + + fn is_stopped(&self) -> bool { + self.status_tx.borrow().is_stopped() } /// This instance's actor ID. @@ -1091,9 +1121,14 @@ impl Instance { let instance_cell = self.cell.clone(); let actor_id = self.cell.actor_id().clone(); let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone()); - let actor_task_handle = A::spawn_server_task(panic_handler::with_backtrace_tracking( - self.serve(actor, actor_loop_receivers, work_rx), - )); + let actor_task_handle = A::spawn_server_task( + panic_handler::with_backtrace_tracking(self.serve( + actor, + actor_loop_receivers, + work_rx, + )) + .instrument(Span::current()), + ); tracing::debug!("{}: spawned with {:?}", actor_id, actor_task_handle); instance_cell .inner @@ -1119,23 +1154,34 @@ impl Instance { .run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx) .await; - let (actor_status, event) = match result { - Ok(_) => (ActorStatus::Stopped, None), - Err(ActorError { - kind: box ActorErrorKind::UnhandledSupervisionEvent(box event), - .. - }) => (event.actor_status.clone(), Some(event)), + let event = match result { + Ok(_) => { + // actor should have been stopped by run_actor_tree + assert!(self.is_stopped()); + None + } Err(err) => { - let error_kind = ActorErrorKind::Generic(err.kind.to_string()); - ( - ActorStatus::Failed(error_kind.clone()), - Some(ActorSupervisionEvent::new( - self.cell.actor_id().clone(), - ActorStatus::Failed(error_kind), - None, - None, - )), - ) + match *err.kind { + ActorErrorKind::UnhandledSupervisionEvent(box event) => { + // Currently only terminated actors are allowed to raise supervision events. + // If we want to change that in the future, we need to modify the exit + // status here too, because we use event's actor_status as this actor's + // terminal status. + assert!(event.actor_status.is_terminal()); + self.change_status(event.actor_status.clone()); + Some(event) + } + _ => { + let error_kind = ActorErrorKind::Generic(err.kind.to_string()); + self.change_status(ActorStatus::Failed(error_kind.clone())); + Some(ActorSupervisionEvent::new( + self.cell.actor_id().clone(), + ActorStatus::Failed(error_kind), + None, + None, + )) + } + } } }; @@ -1164,7 +1210,6 @@ impl Instance { self.proc.handle_supervision_event(event); } } - self.change_status(actor_status); } /// Runs the actor, and manages its supervision tree. When the function returns, @@ -1207,10 +1252,16 @@ impl Instance { } }; - if let Err(ref err) = result { - tracing::error!("{}: actor failure: {}", self.self_id(), err); + match &result { + Ok(_) => assert!(self.is_stopped()), + Err(err) => { + tracing::error!("{}: actor failure: {}", self.self_id(), err); + assert!(!self.is_terminal()); + // Send Stopping instead of Failed, because we still need to + // unlink child actors. + self.change_status(ActorStatus::Stopping); + } } - self.change_status(ActorStatus::Stopping); // After this point, we know we won't spawn any more children, // so we can safely read the current child keys. @@ -1292,7 +1343,6 @@ impl Instance { } /// Initialize and run the actor until it fails or is stopped. - #[tracing::instrument(level = "info", skip_all, fields(actor_id = %self.self_id()))] async fn run( &mut self, actor: &mut A, @@ -1407,6 +1457,7 @@ impl Instance { } } + #[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))] async unsafe fn handle_message( &mut self, actor: &mut A, @@ -1436,18 +1487,12 @@ impl Instance { &headers, self.self_id().to_string(), ); - let span = tracing::debug_span!( - "actor_status", - actor_id = self.self_id().to_string(), - actor_name = self.self_id().name(), - name = self.cell.status().borrow().to_string(), - ); let context = Context::new(self, headers); // Pass a reference to the context to the handler, so that deref // coercion allows the `this` argument to be treated exactly like // &Instance. - actor.handle(&context, message).instrument(span).await + actor.handle(&context, message).await } // Spawn on child on this instance. Currently used only by cap::CanSpawn. @@ -1536,6 +1581,15 @@ impl Drop for Instance { if status.is_terminal() { false } else { + tracing::info!( + name = "ActorStatus", + actor_id = %self.self_id(), + actor_name = self.self_id().name(), + status = "Stopped", + prev_status = status.arm().unwrap_or("unknown"), + caller = %Location::caller(), + "Instance is dropped", + ); *status = ActorStatus::Stopped; true } diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index d11864266..84b9fdc6b 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -418,7 +418,11 @@ impl AllocExt for A { return Err(AllocatorError::Incomplete(self.extent().clone())); }; - let name = state.arm().unwrap_or("unknown"); + let name = tracing::Span::current() + .metadata() + .map(|m| m.name()) + .unwrap_or("initialize"); + let status = format!("ProcState:{}", state.arm().unwrap_or("unknown")); match state { ProcState::Created { @@ -427,17 +431,20 @@ impl AllocExt for A { let rank = point.rank(); if let Some(old_create_key) = created.insert(rank, create_key.clone()) { tracing::warn!( + name, + status, + rank, "rank {rank} reassigned from {old_create_key} to {create_key}" ); } tracing::info!( - name = name, - rank = rank, + name, + status, + rank, "proc with create key {}, rank {}: created", create_key, rank ); - // tracing::info!("created: {} rank {}: created", create_key, rank); } ProcState::Running { create_key, @@ -447,7 +454,9 @@ impl AllocExt for A { } => { let Some(rank) = created.rank(&create_key) else { tracing::warn!( - name = name, + name, + %proc_id, + status, "proc id {proc_id} with create key {create_key} \ is running, but was not created" ); @@ -463,14 +472,19 @@ impl AllocExt for A { if let Some(old_allocated_proc) = running.insert(*rank, allocated_proc.clone()) { tracing::warn!( - name = name, + name, + %proc_id, + status, + rank, "duplicate running notifications for {rank}: \ old:{old_allocated_proc}; \ new:{allocated_proc}" ) } tracing::info!( - name = name, + name, + %proc_id, + status, "proc {} rank {}: running at addr:{addr} mesh_agent:{mesh_agent}", proc_id, rank @@ -481,7 +495,8 @@ impl AllocExt for A { // ProcState::Failed to fail the whole allocation. ProcState::Stopped { create_key, reason } => { tracing::error!( - name = name, + name, + status, "allocation failed for proc with create key {}: {}", create_key, reason @@ -493,7 +508,8 @@ impl AllocExt for A { description, } => { tracing::error!( - name = name, + name, + status, "allocation failed for world {}: {}", world_id, description diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index ce8f0bc22..57d10ddd1 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -51,8 +51,6 @@ use serde::Deserialize; use serde::Serialize; use tokio::sync::Notify; use tracing::Instrument; -use tracing::Level; -use tracing::span; use crate::CommActor; use crate::alloc::Alloc; @@ -295,28 +293,40 @@ impl ProcMesh { /// Allocate a new ProcMesh from the provided alloc. /// Allocate does not require an owning actor because references are not owned. - #[tracing::instrument(skip_all)] #[track_caller] pub async fn allocate( cx: &impl context::Actor, - mut alloc: Box, + alloc: Box, name: &str, + ) -> v1::Result { + let caller = Location::caller(); + Self::allocate_inner(cx, alloc, Name::new(name), caller).await + } + + // Use allocate_inner to set field mesh_name in span + #[hyperactor::instrument(fields(mesh_name=name.to_string()))] + async fn allocate_inner( + cx: &impl context::Actor, + mut alloc: Box, + name: Name, + caller: &'static Location<'static>, ) -> v1::Result { let alloc_id = Self::alloc_counter().fetch_add(1, Ordering::Relaxed) + 1; tracing::info!( - name = "ProcMesh::Allocate::Attempt", + name = "ProcMeshStatus", + status = "Allocate::Attempt", + %caller, alloc_id, - caller = %Location::caller(), shape = ?alloc.shape(), "allocating proc mesh" ); let running = alloc .initialize() - .instrument(span!( - Level::INFO, - "ProcMesh::Allocate::Initialize", - alloc_id + .instrument(tracing::info_span!( + "ProcMeshStatus::Allocate::Initialize", + alloc_id, + mesh_name = name.to_string() )) .await?; @@ -330,17 +340,18 @@ impl ProcMesh { // First make sure we can serve the proc: let proc_channel_addr = { let _guard = - tracing::span!(Level::INFO, "allocate_serve_proc", proc_id = %proc.proc_id()) - .entered(); + tracing::info_span!("allocate_serve_proc", proc_id = %proc.proc_id()).entered(); let (addr, rx) = channel::serve(ChannelAddr::any(alloc.transport()))?; proc.clone().serve(rx); + tracing::info!( + name = "ProcMeshStatus", + status = "Allocate::ChannelServe", + mesh_name = name.to_string(), + %addr, + "proc started listening on addr: {addr}" + ); addr }; - tracing::info!( - name = "ProcMesh::Allocate::ChannelServe", - alloc_id = alloc_id, - "proc started listening on addr: {proc_channel_addr}" - ); let bind_allocated_procs = |router: &DialMailboxRouter| { // Route all of the allocated procs: @@ -423,7 +434,6 @@ impl ProcMesh { .collect(); let stop = Arc::new(Notify::new()); - let name = Name::new(name); let extent = alloc.extent().clone(); { @@ -453,10 +463,10 @@ impl ProcMesh { } } } - }); + }.instrument(tracing::info_span!("alloc_monitor"))); } - Self::create( + let mesh = Self::create( cx, name, ProcMeshAllocation::Allocated { @@ -466,14 +476,21 @@ impl ProcMesh { }, true, // alloc-based meshes support comm actors ) - .await + .await; + match &mesh { + Ok(_) => tracing::info!(name = "ProcMeshStatus", status = "Allocate::Created"), + Err(error) => { + tracing::info!(name = "ProcMeshStatus", status = "Allocate::Failed", %error) + } + } + mesh } /// Detach the proc mesh from the lifetime of `self`, and return its reference. - #[allow(dead_code)] + #[cfg(test)] pub(crate) fn detach(self) -> ProcMeshRef { // This also keeps the ProcMeshAllocation::Allocated alloc task alive. - self.current_ref + self.current_ref.clone() } /// Stop this mesh gracefully. @@ -508,6 +525,16 @@ impl Deref for ProcMesh { } } +impl Drop for ProcMesh { + fn drop(&mut self) { + tracing::info!( + name = "ProcMeshStatus", + mesh_name = %self.name, + status = "Dropped", + ); + } +} + /// Represents different ways ProcMeshes can be allocated. enum ProcMeshAllocation { /// A mesh that has been allocated from an `Alloc`.