From 4f2b07abbbef77e5c237c4a47f0065945b286347 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 17 Nov 2025 07:30:33 -0800 Subject: [PATCH 1/2] Add InstanceState (#1888) Summary: There is a bug in `Instance`'s current Drop impl: https://www.internalfb.com/code/fbsource/[5b70b5b30638d5db5dfef1cbf929dea6df7d3f56]/fbcode/monarch/hyperactor/src/proc.rs?lines=1594-1596 since we clone a `Instance` object on the python side: https://www.internalfb.com/code/fbsource/[5b70b5b30638d5db5dfef1cbf929dea6df7d3f56]/fbcode/monarch/hyperactor/src/proc.rs?lines=1566 It means the Instance's actor status will be changed to `Stopped` when a clone is dropped, although the actor is still alive. This diff adds a `InstanceState` struct, so the instance's lifetime can tracked by `Arc`'s ref counter. Then we move the `Drop` impl to this `InstanceState`, so it will only be called when the ref count goes to 0. Reviewed By: dulinriley Differential Revision: D87075455 --- hyperactor/src/proc.rs | 192 +++++++++++++++++++++-------------------- 1 file changed, 98 insertions(+), 94 deletions(-) diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 70e2c9466..9fbfa2e5b 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -531,7 +531,7 @@ impl Proc { // from its weak cell. self.state() .ledger - .insert(actor_id.clone(), instance.cell.downgrade())?; + .insert(actor_id.clone(), instance.inner.cell.downgrade())?; Ok(instance .start(actor, actor_loop_receivers.take().unwrap(), work_rx) @@ -554,7 +554,7 @@ impl Proc { ); let (instance, _, _) = Instance::new(self.clone(), actor_id.clone(), true, None); - let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone()); + let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone()); instance.change_status(ActorStatus::Client); @@ -575,7 +575,7 @@ impl Proc { ); let (instance, _, _) = Instance::new(self.clone(), actor_id, false, Some(parent)); - let handle = ActorHandle::new(instance.cell.clone(), instance.ports.clone()); + let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone()); instance.change_status(ActorStatus::Client); Ok((instance, handle)) } @@ -955,6 +955,10 @@ impl Deref for Context<'_, A> { /// its full lifecycle, supervision, signal management, etc. Instances can represent /// a managed actor or a "client" actor that has joined the proc. pub struct Instance { + inner: Arc>, +} + +struct InstanceState { /// The proc that owns this instance. proc: Proc, @@ -976,6 +980,33 @@ pub struct Instance { sequencer: Sequencer, } +impl InstanceState { + fn self_id(&self) -> &ActorId { + self.mailbox.actor_id() + } +} + +impl Drop for InstanceState { + fn drop(&mut self) { + self.status_tx.send_if_modified(|status| { + if status.is_terminal() { + false + } else { + tracing::info!( + name = "ActorStatus", + actor_id = %self.self_id(), + actor_name = self.self_id().name(), + status = "Stopped", + prev_status = status.arm().unwrap_or("unknown"), + "instance is dropped", + ); + *status = ActorStatus::Stopped; + true + } + }); + } +} + impl Instance { /// Create a new actor instance in Created state. fn new( @@ -1025,26 +1056,23 @@ impl Instance { ports.clone(), ); let instance_id = Uuid::now_v7(); - ( - Self { - proc, - cell, - mailbox, - ports, - status_tx, - sequencer: Sequencer::new(instance_id), - id: instance_id, - }, - actor_loop_receivers, - work_rx, - ) + let inner = Arc::new(InstanceState { + proc, + cell, + mailbox, + ports, + status_tx, + sequencer: Sequencer::new(instance_id), + id: instance_id, + }); + (Self { inner }, actor_loop_receivers, work_rx) } /// Notify subscribers of a change in the actors status and bump counters with the duration which /// the last status was active for. #[track_caller] fn change_status(&self, new: ActorStatus) { - let old = self.status_tx.send_replace(new.clone()); + let old = self.inner.status_tx.send_replace(new.clone()); // Actor status changes between Idle and Processing when handling every // message. It creates too many logs if we want to log these 2 states. // Therefore we skip the status changes between them. @@ -1067,22 +1095,22 @@ impl Instance { } fn is_terminal(&self) -> bool { - self.status_tx.borrow().is_terminal() + self.inner.status_tx.borrow().is_terminal() } fn is_stopped(&self) -> bool { - self.status_tx.borrow().is_stopped() + self.inner.status_tx.borrow().is_stopped() } /// This instance's actor ID. pub fn self_id(&self) -> &ActorId { - self.mailbox.actor_id() + self.inner.self_id() } /// Signal the actor to stop. pub fn stop(&self) -> Result<(), ActorError> { - tracing::info!("Instance::stop called, {}", self.cell.actor_id()); - self.cell.signal(Signal::DrainAndStop) + tracing::info!("Instance::stop called, {}", self.inner.cell.actor_id()); + self.inner.cell.signal(Signal::DrainAndStop) } /// Open a new port that accepts M-typed messages. The returned @@ -1090,14 +1118,14 @@ impl Instance { /// returned receiver should only be retained by the actor responsible /// for processing the delivered messages. pub fn open_port(&self) -> (PortHandle, PortReceiver) { - self.mailbox.open_port() + self.inner.mailbox.open_port() } /// Open a new one-shot port that accepts M-typed messages. The /// returned port may be used to send a single message; ditto the /// receiver may receive a single message. pub fn open_once_port(&self) -> (OncePortHandle, OncePortReceiver) { - self.mailbox.open_once_port() + self.inner.mailbox.open_once_port() } /// Send a message to the actor running on the proc. @@ -1113,7 +1141,7 @@ impl Instance { { let port = self.port(); let self_id = self.self_id().clone(); - let clock = self.proc.state().clock.clone(); + let clock = self.inner.proc.state().clock.clone(); tokio::spawn(async move { clock.non_advancing_sleep(delay).await; if let Err(e) = port.send(message) { @@ -1127,16 +1155,16 @@ impl Instance { /// Start an A-typed actor onto this instance with the provided params. When spawn returns, /// the actor has been linked with its parent, if it has one. - #[hyperactor::instrument_infallible(fields(actor_id=self.cell.actor_id().clone().to_string(), actor_name=self.cell.actor_id().name()))] + #[hyperactor::instrument_infallible(fields(actor_id=self.inner.cell.actor_id().clone().to_string(), actor_name=self.inner.cell.actor_id().name()))] async fn start( self, actor: A, actor_loop_receivers: (PortReceiver, PortReceiver), work_rx: mpsc::UnboundedReceiver>, ) -> ActorHandle { - let instance_cell = self.cell.clone(); - let actor_id = self.cell.actor_id().clone(); - let actor_handle = ActorHandle::new(self.cell.clone(), self.ports.clone()); + let instance_cell = self.inner.cell.clone(); + let actor_id = self.inner.cell.actor_id().clone(); + let actor_handle = ActorHandle::new(self.inner.cell.clone(), self.inner.ports.clone()); let actor_task_handle = A::spawn_server_task( panic_handler::with_backtrace_tracking(self.serve( actor, @@ -1191,7 +1219,7 @@ impl Instance { let status = ActorStatus::Failed(error_kind); self.change_status(status.clone()); Some(ActorSupervisionEvent::new( - self.cell.actor_id().clone(), + self.inner.cell.actor_id().clone(), actor.display_name(), status, None, @@ -1201,14 +1229,14 @@ impl Instance { } }; - if let Some(parent) = self.cell.maybe_unlink_parent() { + if let Some(parent) = self.inner.cell.maybe_unlink_parent() { if let Some(event) = event { // Parent exists, failure should be propagated to the parent. parent.send_supervision_event_or_crash(event); } // TODO: we should get rid of this signal, and use *only* supervision events for // the purpose of conveying lifecycle changes - if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) { + if let Err(err) = parent.signal(Signal::ChildStopped(self.inner.cell.pid())) { tracing::error!( "{}: failed to send stop message to parent pid {}: {:?}", self.self_id(), @@ -1223,7 +1251,7 @@ impl Instance { // Note that orphaned actor is unexpected and would only happen if // there is a bug. if let Some(event) = event { - self.proc.handle_supervision_event(event); + self.inner.proc.handle_supervision_event(event); } } } @@ -1282,7 +1310,7 @@ impl Instance { // After this point, we know we won't spawn any more children, // so we can safely read the current child keys. let mut to_unlink = Vec::new(); - for child in self.cell.child_iter() { + for child in self.inner.cell.child_iter() { if let Err(err) = child.value().signal(Signal::Stop) { tracing::error!( "{}: failed to send stop signal to child pid {}: {:?}", @@ -1295,18 +1323,18 @@ impl Instance { } // Manually unlink children that have already been stopped. for child in to_unlink { - self.cell.unlink(&child); + self.inner.cell.unlink(&child); } let (mut signal_receiver, _) = actor_loop_receivers; - while self.cell.child_count() > 0 { + while self.inner.cell.child_count() > 0 { match RealClock .timeout(Duration::from_millis(500), signal_receiver.recv()) .await { Ok(signal) => { if let Signal::ChildStopped(pid) = signal? { - assert!(self.cell.get_child(pid).is_none()); + assert!(self.inner.cell.get_child(pid).is_none()); } } Err(_) => { @@ -1316,7 +1344,7 @@ impl Instance { ); // No more waiting to receive messages. Unlink all remaining // children. - self.cell.unlink_all(); + self.inner.cell.unlink_all(); break; } } @@ -1403,7 +1431,7 @@ impl Instance { break 'messages; }, Signal::ChildStopped(pid) => { - assert!(self.cell.get_child(pid).is_none()); + assert!(self.inner.cell.get_child(pid).is_none()); }, } } @@ -1411,7 +1439,8 @@ impl Instance { self.handle_supervision_event(actor, supervision_event).await?; } } - self.cell + self.inner + .cell .inner .num_processed_messages .fetch_add(1, Ordering::SeqCst); @@ -1504,17 +1533,20 @@ impl Instance { actor.handle(&context, message).await } - // Spawn on child on this instance. Currently used only by cap::CanSpawn. + /// Spawn on child on this instance. Currently used only by cap::CanSpawn. pub(crate) async fn spawn( &self, params: C::Params, ) -> anyhow::Result> { - self.proc.spawn_child(self.cell.clone(), params).await + self.inner + .proc + .spawn_child(self.inner.cell.clone(), params) + .await } /// Create a new direct child instance. pub fn child(&self) -> anyhow::Result<(Instance<()>, ActorHandle<()>)> { - self.proc.child_instance(self.cell.clone()) + self.inner.proc.child_instance(self.inner.cell.clone()) } /// Return a handle port handle representing the actor's message @@ -1523,33 +1555,33 @@ impl Instance { where A: Handler, { - self.ports.get() + self.inner.ports.get() } /// The [`ActorHandle`] corresponding to this instance. pub fn handle(&self) -> ActorHandle { - ActorHandle::new(self.cell.clone(), Arc::clone(&self.ports)) + ActorHandle::new(self.inner.cell.clone(), Arc::clone(&self.inner.ports)) } /// The owning actor ref. pub fn bind>(&self) -> ActorRef { - self.cell.bind(self.ports.as_ref()) + self.inner.cell.bind(self.inner.ports.as_ref()) } // Temporary in order to support python bindings. #[doc(hidden)] pub fn mailbox_for_py(&self) -> &Mailbox { - &self.mailbox + &self.inner.mailbox } /// A reference to the proc's clock pub fn clock(&self) -> &(impl Clock + use) { - &self.proc.state().clock + &self.inner.proc.state().clock } /// The owning proc. pub fn proc(&self) -> &Proc { - &self.proc + &self.inner.proc } /// Clone this Instance to get an owned struct that can be @@ -1558,75 +1590,47 @@ impl Instance { #[doc(hidden)] pub fn clone_for_py(&self) -> Self { Self { - proc: self.proc.clone(), - cell: self.cell.clone(), - mailbox: self.mailbox.clone(), - ports: self.ports.clone(), - status_tx: self.status_tx.clone(), - sequencer: self.sequencer.clone(), - id: self.id, + inner: Arc::clone(&self.inner), } } /// Get the join handle associated with this actor. fn actor_task_handle(&self) -> Option<&JoinHandle<()>> { - self.cell.inner.actor_task_handle.get() + self.inner.cell.inner.actor_task_handle.get() } /// Return this instance's sequencer. pub fn sequencer(&self) -> &Sequencer { - &self.sequencer + &self.inner.sequencer } /// Return this instance's ID. pub fn instance_id(&self) -> Uuid { - self.id - } -} - -impl Drop for Instance { - fn drop(&mut self) { - self.status_tx.send_if_modified(|status| { - if status.is_terminal() { - false - } else { - tracing::info!( - name = "ActorStatus", - actor_id = %self.self_id(), - actor_name = self.self_id().name(), - status = "Stopped", - prev_status = status.arm().unwrap_or("unknown"), - caller = %Location::caller(), - "Instance is dropped", - ); - *status = ActorStatus::Stopped; - true - } - }); + self.inner.id } } impl context::Mailbox for Instance { fn mailbox(&self) -> &Mailbox { - &self.mailbox + &self.inner.mailbox } } impl context::Mailbox for Context<'_, A> { fn mailbox(&self) -> &Mailbox { - &self.mailbox + &self.instance.inner.mailbox } } impl context::Mailbox for &Instance { fn mailbox(&self) -> &Mailbox { - &self.mailbox + &self.inner.mailbox } } impl context::Mailbox for &Context<'_, A> { fn mailbox(&self) -> &Mailbox { - &self.mailbox + &self.instance.inner.mailbox } } @@ -1665,7 +1669,7 @@ impl Instance<()> { self.actor_task_handle().is_none(), "can only bind actor port on instance with no running actor task" ); - self.mailbox.bind_actor_port() + self.inner.mailbox.bind_actor_port() } } @@ -1692,11 +1696,11 @@ impl ActorType { /// InstanceCell is reference counted and cloneable. #[derive(Clone, Debug)] pub struct InstanceCell { - inner: Arc, + inner: Arc, } #[derive(Debug)] -struct InstanceState { +struct InstanceCellState { /// The actor's id. actor_id: ActorId, @@ -1736,7 +1740,7 @@ struct InstanceState { ports: Arc, } -impl InstanceState { +impl InstanceCellState { /// Unlink this instance from its parent, if it has one. If it was unlinked, /// the parent is returned. fn maybe_unlink_parent(&self) -> Option { @@ -1746,7 +1750,7 @@ impl InstanceState { } /// Unlink this instance from a child. - fn unlink(&self, child: &InstanceState) -> bool { + fn unlink(&self, child: &InstanceCellState) -> bool { assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id()); self.children.remove(&child.actor_id.pid()).is_some() } @@ -1766,7 +1770,7 @@ impl InstanceCell { ) -> Self { let _ais = actor_id.to_string(); let cell = Self { - inner: Arc::new(InstanceState { + inner: Arc::new(InstanceCellState { actor_id: actor_id.clone(), actor_type, proc: proc.clone(), @@ -1788,7 +1792,7 @@ impl InstanceCell { cell } - fn wrap(inner: Arc) -> Self { + fn wrap(inner: Arc) -> Self { Self { inner } } @@ -1940,7 +1944,7 @@ impl InstanceCell { } } -impl Drop for InstanceState { +impl Drop for InstanceCellState { fn drop(&mut self) { if let Some(parent) = self.maybe_unlink_parent() { tracing::debug!( @@ -1959,7 +1963,7 @@ impl Drop for InstanceState { /// linkage between actors without creating a strong reference cycle. #[derive(Debug, Clone)] pub struct WeakInstanceCell { - inner: Weak, + inner: Weak, } impl Default for WeakInstanceCell { From 191bf1b6609a315c2d72326743fca47541563710 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 17 Nov 2025 07:30:33 -0800 Subject: [PATCH 2/2] Standardize supervision event logs (#1898) Summary: 1. Use `name = "SupervisionEvent"` so the supervision logs can be searched with this value; 2. Change log levels: `warn` for error and `debug` for others. Reviewed By: mariusae Differential Revision: D87094538 --- hyperactor/src/supervision.rs | 5 ++++ hyperactor_mesh/src/proc_mesh.rs | 9 ++++-- hyperactor_mesh/src/proc_mesh/mesh_agent.rs | 20 +++++++++---- monarch_hyperactor/src/v1/actor_mesh.rs | 33 ++++++++++++++------- 4 files changed, 49 insertions(+), 18 deletions(-) diff --git a/hyperactor/src/supervision.rs b/hyperactor/src/supervision.rs index ff1c141e4..3e4a15f92 100644 --- a/hyperactor/src/supervision.rs +++ b/hyperactor/src/supervision.rs @@ -77,6 +77,11 @@ impl ActorSupervisionEvent { } event } + + /// This event is for a a supervision error. + pub fn is_error(&self) -> bool { + self.actor_status.is_failed() + } } impl std::error::Error for ActorSupervisionEvent {} diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index e06023868..634d97dd5 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -846,7 +846,7 @@ impl ProcEvents { ActorStatus::generic_failure(format!("proc {} is stopped", proc_id)), None, ); - tracing::debug!(name = "ActorSupervisionEventDelivery", event = ?event); + tracing::debug!(name = "SupervisionEvent", %event); if entry.value().send(event.clone()).is_err() { tracing::warn!( name = SupervisionEventState::SupervisionEventTransmitFailed.as_ref(), @@ -856,7 +856,7 @@ impl ProcEvents { } let event = ProcEvent::Stopped(*rank, reason.clone()); - tracing::debug!(name = "delivering proc event", event = %event); + tracing::debug!(name = "SupervisionEvent", %event); break Some(ProcEvent::Stopped(*rank, reason)); } @@ -880,7 +880,10 @@ impl ProcEvents { status = %event.actor_status, "proc supervision: event received with {had_headers} headers" ); - tracing::debug!(?event, "proc supervision: full event"); + tracing::debug!( + name = "SupervisionEvent", + %event, + "proc supervision: full event"); // Normalize events that came via the comm tree. let event = update_event_actor_id(event); diff --git a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs index a6a410637..e96a33caf 100644 --- a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs @@ -452,11 +452,21 @@ impl Handler for ProcMeshAgent { ) -> anyhow::Result<()> { let event = update_event_actor_id(event); if self.record_supervision_events { - tracing::info!( - proc_id = %self.proc.proc_id(), - %event, - "recording supervision event", - ); + if event.is_error() { + tracing::warn!( + name = "SupervisionEvent", + proc_id = %self.proc.proc_id(), + %event, + "recording supervision error", + ); + } else { + tracing::debug!( + name = "SupervisionEvent", + proc_id = %self.proc.proc_id(), + %event, + "recording non-error supervision event", + ); + } self.supervision_events .entry(event.actor_id.clone()) .or_default() diff --git a/monarch_hyperactor/src/v1/actor_mesh.rs b/monarch_hyperactor/src/v1/actor_mesh.rs index 698d7c5c1..e6a5e4317 100644 --- a/monarch_hyperactor/src/v1/actor_mesh.rs +++ b/monarch_hyperactor/src/v1/actor_mesh.rs @@ -401,11 +401,6 @@ fn send_state_change( ) where F: Fn(MeshFailure), { - tracing::info!( - "detected supervision event on monitored mesh: name={}, event={}", - mesh_name, - event, - ); let failure = MeshFailure::new(mesh_name, rank, event.clone()); // Any supervision event that is not a failure should not generate // call "unhandled". @@ -413,20 +408,38 @@ fn send_state_change( // user calls stop() on a proc or actor mesh. // It is not being terminated due to a failure. In this state, new messages // should not be sent, but we don't call unhandled when it is detected. - let is_failed = event.actor_status.is_failed(); + let is_failed = event.is_error(); + if is_failed { + tracing::warn!( + name = "SupervisionEvent", + %mesh_name, + %event, + "detected supervision error on monitored mesh: name={mesh_name}", + ); + } else { + tracing::debug!( + name = "SupervisionEvent", + %mesh_name, + %event, + "detected non-error supervision event on monitored mesh: name={mesh_name}", + ); + } // Send a notification to the owning actor of this mesh, if there is one. if let Some(owner) = owner { - if let Err(e) = owner.send(SupervisionFailureMessage { + if let Err(error) = owner.send(SupervisionFailureMessage { mesh_name: mesh_name.to_string(), rank, event: event.clone(), }) { tracing::warn!( - "failed to send supervision event {} to owner {}: {:?}. dropping event", - event, + name = "SupervisionEvent", + %mesh_name, + %event, + %error, + "failed to send supervision event to owner {}: {}. dropping event", owner.actor_id(), - e + error ); } } else if is_owned && is_failed {