From ad26e6879a4c723bda655897da0c93abc5ca3388 Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Thu, 26 Jun 2025 08:14:31 -0700 Subject: [PATCH 1/3] [hyperactor] general in-proc ActorRef -> ActorHandle downcasting Provide a general purpose mechanism for downcasting any local (in-proc) ActorRef to its corresponding ActorHandle. This allows these references to be coordinated externally, but for the actors to communicate locally (including using handlers that aren't exported, e.g., because they may not contain serializable messages). The intent is to use this in the tensor engine, to allow the controller to instruct actors to coordinate locally, passing objects like tensor references directly. This required building a general instance registration mechanism, which we can use to replace the ledger in a follow-up change. Differential Revision: [D77377382](https://our.internmc.facebook.com/intern/diff/D77377382/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D77377382/)! [ghstack-poisoned] --- hyperactor/src/actor/mod.rs | 73 ++++++---------- hyperactor/src/cap.rs | 14 +++ hyperactor/src/proc.rs | 167 ++++++++++++++++++++---------------- hyperactor/src/reference.rs | 12 +++ 4 files changed, 145 insertions(+), 121 deletions(-) diff --git a/hyperactor/src/actor/mod.rs b/hyperactor/src/actor/mod.rs index 5fc042403..917249a5a 100644 --- a/hyperactor/src/actor/mod.rs +++ b/hyperactor/src/actor/mod.rs @@ -569,14 +569,6 @@ impl ActorHandle { pub fn bind>(&self) -> ActorRef { self.cell.bind(self.ports.as_ref()) } - - /// Downgrade this ActorHandle to a weak reference. - pub fn downgrade(&self) -> WeakActorHandle { - WeakActorHandle { - cell: self.cell.downgrade(), - ports: Arc::downgrade(&self.ports), - } - } } /// IntoFuture allows users to await the handle to join it. The future @@ -615,43 +607,6 @@ impl Clone for ActorHandle { } } -/// A weak reference to an [`ActorHandle`]. This allows holding references to actors -/// without preventing them from being garbage collected when all strong references -/// are dropped. -#[derive(Debug)] -pub struct WeakActorHandle { - cell: WeakInstanceCell, - ports: Weak>, -} - -impl WeakActorHandle { - /// Create a new weak actor handle that is never upgradeable. - pub fn new() -> Self { - Self { - cell: WeakInstanceCell::new(), - ports: Weak::new(), - } - } - - /// Upgrade this weak actor handle to a strong reference, if possible. - /// Returns `Some(ActorHandle)` if the actor is still alive, `None` otherwise. - pub fn upgrade(&self) -> Option> { - match (self.cell.upgrade(), self.ports.upgrade()) { - (Some(cell), Some(ports)) => Some(ActorHandle::new(cell, ports)), - _ => None, - } - } -} - -impl Clone for WeakActorHandle { - fn clone(&self) -> Self { - Self { - cell: self.cell.clone(), - ports: self.ports.clone(), - } - } -} - /// RemoteActor is a marker trait for types that can be used as /// remote actor references. All [`Actor`]s are thus referencable; /// but other types may also implement this in order to separately @@ -1091,4 +1046,32 @@ mod tests { test.sync().await; assert_eq!(test.get_values(), (999u64, "biz".to_string())); } + + #[tokio::test] + async fn test_actor_handle_downcast() { + #[derive(Debug)] + struct NothingActor; + + #[async_trait] + impl Actor for NothingActor { + type Params = (); + + async fn new(_: ()) -> Result { + Ok(Self) + } + } + + // Just test that we can round-trip the handle through a downcast. + + let proc = Proc::local(); + let handle = proc.spawn::("nothing", ()).await.unwrap(); + let cell = handle.cell(); + + // Invalid actor doesn't succeed. + assert!(cell.downcast_handle::().is_none()); + + let handle = cell.downcast_handle::().unwrap(); + handle.drain_and_stop().unwrap(); + handle.await; + } } diff --git a/hyperactor/src/cap.rs b/hyperactor/src/cap.rs index 0876795a2..46155e316 100644 --- a/hyperactor/src/cap.rs +++ b/hyperactor/src/cap.rs @@ -29,13 +29,20 @@ impl CanSplitPort for T {} pub trait CanSpawn: sealed::CanSpawn {} impl CanSpawn for T {} +/// CanResolveActorRef is a capability that confers the ability to resolve +/// an ActorRef to a local ActorHandle if the actor is available locally. +pub trait CanResolveActorRef: sealed::CanResolveActorRef {} +impl CanResolveActorRef for T {} + pub(crate) mod sealed { use async_trait::async_trait; + use crate::ActorRef; use crate::PortId; use crate::accum::ReducerSpec; use crate::actor::Actor; use crate::actor::ActorHandle; + use crate::actor::RemoteActor; use crate::attrs::Attrs; use crate::data::Serialized; use crate::mailbox::Mailbox; @@ -60,4 +67,11 @@ pub(crate) mod sealed { pub trait CanSpawn: Send + Sync { async fn spawn(&self, params: A::Params) -> anyhow::Result>; } + + pub trait CanResolveActorRef: Send + Sync { + fn resolve_actor_ref( + &self, + actor_ref: &ActorRef, + ) -> Option>; + } } diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index c3bc4ceee..4cb1bb4e7 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -61,7 +61,6 @@ use crate::actor::Binds; use crate::actor::RemoteActor; use crate::actor::RemoteHandles; use crate::actor::Signal; -use crate::actor::WeakActorHandle; use crate::attrs::Attrs; use crate::cap; use crate::clock::Clock; @@ -123,9 +122,7 @@ struct ProcState { /// Keep track of all of the active actors in the proc. ledger: ActorLedger, - /// Registry of typed actor handles by name and type, for actor lookup. - /// Maps (actor_name, TypeId) to type-erased ActorHandle. - actor_registry: DashMap<(String, TypeId), Box>, + instances: DashMap, /// Used by root actors to send events to the actor coordinating /// supervision of root actors in this proc. @@ -330,7 +327,7 @@ impl Proc { forwarder, roots: DashMap::new(), ledger: ActorLedger::new(), - actor_registry: DashMap::new(), + instances: DashMap::new(), supervision_coordinator_port: OnceLock::new(), clock, })) @@ -465,16 +462,7 @@ impl Proc { .ledger .insert(actor_id.clone(), instance.cell.downgrade())?; - let handle = instance.start(actor).await?; - - // Register a weak reference to the actor handle in the registry for later lookup - let registry_key = (name.to_string(), TypeId::of::()); - let weak_handle = handle.downgrade(); - self.state() - .actor_registry - .insert(registry_key, Box::new(weak_handle)); - - Ok(handle) + instance.start(actor).await } /// Spawn a child actor from the provided parent on this proc. The parent actor @@ -804,10 +792,12 @@ impl Instance { let cell = InstanceCell::new( actor_id, actor_type, + proc.clone(), signal_port, supervision_port, status_rx, parent, + ports.clone(), ); let start = proc.clock().now(); @@ -922,12 +912,6 @@ impl Instance { Err(err) => ActorStatus::Failed(err.to_string()), }; - // Clean up the actor registry entry for root actors - if self.cell.pid() == 0 { - let registry_key = (self.cell.actor_id().name().to_string(), TypeId::of::()); - self.proc.state().actor_registry.remove(®istry_key); - } - let result = self.cell.maybe_unlink_parent(); if let Some(parent) = result { if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) { @@ -1195,22 +1179,6 @@ impl Instance { &self.proc } - /// Look up a root actor by name and type. - /// Returns `Some(ActorHandle)` if a root actor with the given name exists and is of type T. - /// Returns `None` if no root actor with that name exists, if it's not of the expected type, - /// or if the actor has been dropped. - pub fn get(&self, name: &str) -> Option> { - let registry_key = (name.to_string(), TypeId::of::()); - - if let Some(entry) = self.proc.state().actor_registry.get(®istry_key) { - if let Some(weak_handle) = entry.downcast_ref::>() { - return weak_handle.upgrade(); - } - } - - None - } - /// Get the current message context, if there is one. pub fn ctx(&self) -> Option<&InstanceMessageContext> { self.message_context.as_ref() @@ -1243,6 +1211,20 @@ impl cap::sealed::CanSpawn for Instance { } } +impl cap::sealed::CanResolveActorRef for Instance { + fn resolve_actor_ref( + &self, + actor_ref: &ActorRef, + ) -> Option> { + self.proc + .0 + .instances + .get(actor_ref.actor_id())? + .upgrade()? + .downcast_handle() + } +} + #[derive(Debug)] enum ActorType { Named(&'static TypeInfo), @@ -1277,6 +1259,9 @@ struct InstanceState { /// Actor info contains the actor's type information. actor_type: ActorType, + /// The proc in which the actor is running. + proc: Proc, + /// The actor's signal port. This is used to send /// signals to the actor. signal: PortHandle, @@ -1306,6 +1291,28 @@ struct InstanceState { /// The log recording associated with this actor. It is used to /// store a 'flight record' of events while the actor is running. recording: Recording, + + /// A type-erased reference to Ports, which allows us to recover + /// an ActorHandle by downcasting. + ports: Arc, +} + +impl InstanceState { + /// Unlink this instance from its parent, if it has one. If it was unlinked, + /// the parent is returned. + fn maybe_unlink_parent(&self) -> Option { + let result = self.parent.upgrade(); + if let Some(parent) = &result { + parent.state.unlink(self); + } + result + } + + /// Unlink this instance from a child. + fn unlink(&self, child: &InstanceState) { + assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id()); + self.children.remove(&child.actor_id.pid()); + } } impl InstanceCell { @@ -1314,16 +1321,19 @@ impl InstanceCell { fn new( actor_id: ActorId, actor_type: ActorType, + proc: Proc, signal: PortHandle, supervision_port: PortHandle, status: watch::Receiver, parent: Option, + ports: Arc, ) -> Self { let _ais = actor_id.to_string(); let cell = Self { state: Arc::new(InstanceState { - actor_id, + actor_id: actor_id.clone(), actor_type, + proc: proc.clone(), signal, supervision_port, status, @@ -1333,9 +1343,11 @@ impl InstanceCell { exported_named_ports: DashMap::new(), num_processed_messages: AtomicU64::new(0), recording: hyperactor_telemetry::recorder().record(64), + ports, }), }; cell.maybe_link_parent(); + proc.0.instances.insert(actor_id, cell.downgrade()); cell } @@ -1418,11 +1430,7 @@ impl InstanceCell { /// Unlink this instance from its parent, if it has one. If it was unlinked, /// the parent is returned. fn maybe_unlink_parent(&self) -> Option { - let result = self.state.parent.upgrade(); - if let Some(parent) = &result { - parent.unlink(self); - } - result + self.state.maybe_unlink_parent() } /// Get parent instance cell, if it exists. @@ -1460,6 +1468,26 @@ impl InstanceCell { } ActorRef::attest(self.actor_id().clone()) } + + /// Attempt to downcast this cell to a concrete actor handle. + pub(crate) fn downcast_handle(&self) -> Option> { + let ports = Arc::clone(&self.state.ports).downcast::>().ok()?; + Some(ActorHandle::new(self.clone(), ports)) + } +} + +impl Drop for InstanceState { + fn drop(&mut self) { + if self.maybe_unlink_parent().is_some() { + tracing::error!( + "instance {} was dropped with parent still linked", + self.actor_id + ); + } + if self.proc.0.instances.remove(&self.actor_id).is_none() { + tracing::error!("instance {} was dropped but not in proc", self.actor_id); + } + } } /// A weak version of the InstanceCell. This is used to provide cyclical @@ -1599,6 +1627,7 @@ mod tests { use std::assert_matches::assert_matches; use std::sync::atomic::AtomicBool; + use hyperactor_macros::export; use maplit::hashmap; use serde_json::json; use tokio::sync::Barrier; @@ -1647,6 +1676,7 @@ mod tests { } #[derive(Debug)] + #[export] struct TestActor; #[derive(Handler, HandleClient, Debug)] @@ -1812,7 +1842,7 @@ mod tests { #[derive(Handler, HandleClient, Debug)] enum LookupTestMessage { - ActorExists(String, #[reply] OncePortRef), + ActorExists(ActorRef, #[reply] OncePortRef), } #[async_trait] @@ -1830,10 +1860,9 @@ mod tests { async fn actor_exists( &mut self, this: &Instance, - name: String, + actor_ref: ActorRef, ) -> Result { - self.found_actor = this.get::(&name); - Ok(self.found_actor.is_some()) + Ok(actor_ref.downcast_handle(this).is_some()) } } @@ -1843,18 +1872,30 @@ mod tests { let client = proc.attach("client").unwrap(); let target_actor = proc.spawn::("target", ()).await.unwrap(); + let target_actor_ref = target_actor.bind(); let lookup_actor = proc.spawn::("lookup", ()).await.unwrap(); assert!( lookup_actor - .actor_exists(&client, "target".to_string()) + .actor_exists(&client, target_actor_ref.clone()) .await .unwrap() ); + // Make up a child actor. It shouldn't exist. + assert!( + !lookup_actor + .actor_exists( + &client, + ActorRef::attest(target_actor.actor_id().child_id(123).clone()) + ) + .await + .unwrap() + ); + // A wrongly-typed actor ref should also not obtain. assert!( !lookup_actor - .actor_exists(&client, "nonexistent".to_string()) + .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone())) .await .unwrap() ); @@ -1864,7 +1905,7 @@ mod tests { assert!( !lookup_actor - .actor_exists(&client, "target".to_string()) + .actor_exists(&client, target_actor_ref) .await .unwrap() ); @@ -1873,32 +1914,6 @@ mod tests { lookup_actor.await; } - #[tokio::test] - async fn test_actor_registry_cleanup() { - let proc = Proc::local(); - - let initial_size = proc.state().actor_registry.len(); - - for i in 0..5 { - let actor_name = format!("temp_actor_{}", i); - let actor = proc.spawn::(&actor_name, ()).await.unwrap(); - - assert_eq!(proc.state().actor_registry.len(), initial_size + 1); - - actor.drain_and_stop().unwrap(); - actor.await; - - assert_eq!( - proc.state().actor_registry.len(), - initial_size, - "Registry should be cleaned up after actor {} stops", - actor_name - ); - } - - assert_eq!(proc.state().actor_registry.len(), initial_size); - } - fn validate_link(child: &InstanceCell, parent: &InstanceCell) { assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id()); assert_eq!( diff --git a/hyperactor/src/reference.rs b/hyperactor/src/reference.rs index 78fa68e79..c81abcd88 100644 --- a/hyperactor/src/reference.rs +++ b/hyperactor/src/reference.rs @@ -39,6 +39,8 @@ use serde::Deserialize; use serde::Serialize; use crate as hyperactor; +use crate::Actor; +use crate::ActorHandle; use crate::Named; use crate::RemoteHandles; use crate::RemoteMessage; @@ -651,6 +653,16 @@ impl ActorRef { pub fn into_actor_id(self) -> ActorId { self.actor_id } + + /// Attempt to downcast this reference into a (local) actor handle. + /// This will only succeed when the referenced actor is in the same + /// proc as the caller. + pub fn downcast_handle(&self, cap: &impl cap::CanResolveActorRef) -> Option> + where + A: Actor, + { + cap.resolve_actor_ref(self) + } } impl fmt::Display for ActorRef { From ec34dc3565254a4efd5f6fb4411774ca4b4c3d43 Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Thu, 26 Jun 2025 12:33:18 -0700 Subject: [PATCH 2/3] Update on "[hyperactor] general in-proc ActorRef -> ActorHandle downcasting" Provide a general purpose mechanism for downcasting any local (in-proc) ActorRef to its corresponding ActorHandle. This allows these references to be coordinated externally, but for the actors to communicate locally (including using handlers that aren't exported, e.g., because they may not contain serializable messages). The intent is to use this in the tensor engine, to allow the controller to instruct actors to coordinate locally, passing objects like tensor references directly. This required building a general instance registration mechanism, which we can use to replace the ledger in a follow-up change. Differential Revision: [D77377382](https://our.internmc.facebook.com/intern/diff/D77377382/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D77377382/)! [ghstack-poisoned] --- hyperactor/src/actor/mod.rs | 4 ++-- hyperactor/src/proc.rs | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/hyperactor/src/actor/mod.rs b/hyperactor/src/actor/mod.rs index 917249a5a..913aa3154 100644 --- a/hyperactor/src/actor/mod.rs +++ b/hyperactor/src/actor/mod.rs @@ -19,7 +19,7 @@ use std::hash::Hash; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; -use std::sync::Weak; + use std::time::SystemTime; use async_trait::async_trait; @@ -54,7 +54,7 @@ use crate::proc::Instance; use crate::proc::InstanceCell; use crate::proc::Ports; use crate::proc::Proc; -use crate::proc::WeakInstanceCell; + use crate::reference::ActorId; use crate::reference::GangId; use crate::reference::Index; diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 4cb1bb4e7..13ca2c382 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -1836,9 +1836,7 @@ mod tests { } #[derive(Debug)] - struct LookupTestActor { - found_actor: Option>, - } + struct LookupTestActor; #[derive(Handler, HandleClient, Debug)] enum LookupTestMessage { @@ -1850,7 +1848,7 @@ mod tests { type Params = (); async fn new(_params: ()) -> Result { - Ok(Self { found_actor: None }) + Ok(Self) } } From 7e7cca1d69f6a54e39944132171c2e1a1372d606 Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Thu, 26 Jun 2025 12:34:01 -0700 Subject: [PATCH 3/3] Update on "[hyperactor] general in-proc ActorRef -> ActorHandle downcasting" Provide a general purpose mechanism for downcasting any local (in-proc) ActorRef to its corresponding ActorHandle. This allows these references to be coordinated externally, but for the actors to communicate locally (including using handlers that aren't exported, e.g., because they may not contain serializable messages). The intent is to use this in the tensor engine, to allow the controller to instruct actors to coordinate locally, passing objects like tensor references directly. This required building a general instance registration mechanism, which we can use to replace the ledger in a follow-up change. Differential Revision: [D77377382](https://our.internmc.facebook.com/intern/diff/D77377382/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D77377382/)! [ghstack-poisoned] --- hyperactor/src/actor/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/hyperactor/src/actor/mod.rs b/hyperactor/src/actor/mod.rs index 913aa3154..4b2455f33 100644 --- a/hyperactor/src/actor/mod.rs +++ b/hyperactor/src/actor/mod.rs @@ -19,7 +19,6 @@ use std::hash::Hash; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; - use std::time::SystemTime; use async_trait::async_trait; @@ -54,7 +53,6 @@ use crate::proc::Instance; use crate::proc::InstanceCell; use crate::proc::Ports; use crate::proc::Proc; - use crate::reference::ActorId; use crate::reference::GangId; use crate::reference::Index;