diff --git a/hyperactor/src/actor/mod.rs b/hyperactor/src/actor/mod.rs index cea24f012..c4937afe3 100644 --- a/hyperactor/src/actor/mod.rs +++ b/hyperactor/src/actor/mod.rs @@ -19,7 +19,6 @@ use std::hash::Hash; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; -use std::sync::Weak; use std::time::SystemTime; use async_trait::async_trait; @@ -54,7 +53,6 @@ use crate::proc::Instance; use crate::proc::InstanceCell; use crate::proc::Ports; use crate::proc::Proc; -use crate::proc::WeakInstanceCell; use crate::reference::ActorId; use crate::reference::GangId; use crate::reference::Index; @@ -569,14 +567,6 @@ impl ActorHandle { pub fn bind>(&self) -> ActorRef { self.cell.bind(self.ports.as_ref()) } - - /// Downgrade this ActorHandle to a weak reference. - pub fn downgrade(&self) -> WeakActorHandle { - WeakActorHandle { - cell: self.cell.downgrade(), - ports: Arc::downgrade(&self.ports), - } - } } /// IntoFuture allows users to await the handle to join it. The future @@ -615,43 +605,6 @@ impl Clone for ActorHandle { } } -/// A weak reference to an [`ActorHandle`]. This allows holding references to actors -/// without preventing them from being garbage collected when all strong references -/// are dropped. -#[derive(Debug)] -pub struct WeakActorHandle { - cell: WeakInstanceCell, - ports: Weak>, -} - -impl WeakActorHandle { - /// Create a new weak actor handle that is never upgradeable. - pub fn new() -> Self { - Self { - cell: WeakInstanceCell::new(), - ports: Weak::new(), - } - } - - /// Upgrade this weak actor handle to a strong reference, if possible. - /// Returns `Some(ActorHandle)` if the actor is still alive, `None` otherwise. - pub fn upgrade(&self) -> Option> { - match (self.cell.upgrade(), self.ports.upgrade()) { - (Some(cell), Some(ports)) => Some(ActorHandle::new(cell, ports)), - _ => None, - } - } -} - -impl Clone for WeakActorHandle { - fn clone(&self) -> Self { - Self { - cell: self.cell.clone(), - ports: self.ports.clone(), - } - } -} - /// RemoteActor is a marker trait for types that can be used as /// remote actor references. All [`Actor`]s are thus referencable; /// but other types may also implement this in order to separately @@ -1046,4 +999,49 @@ mod tests { test.sync().await; assert_eq!(test.get_values(), (321u64, "bar".to_string())); } + + #[tokio::test] + async fn test_ref_alias() { + let test = MultiValuesTest::new().await; + + test.send(123u64); + test.send("foo".to_string()); + + hyperactor::alias!(MyActorAlias, u64, String); + + let myref: ActorRef = test.handle.bind(); + myref.port().send(&test.client, "biz".to_string()).unwrap(); + myref.port().send(&test.client, 999u64).unwrap(); + + test.sync().await; + assert_eq!(test.get_values(), (999u64, "biz".to_string())); + } + + #[tokio::test] + async fn test_actor_handle_downcast() { + #[derive(Debug)] + struct NothingActor; + + #[async_trait] + impl Actor for NothingActor { + type Params = (); + + async fn new(_: ()) -> Result { + Ok(Self) + } + } + + // Just test that we can round-trip the handle through a downcast. + + let proc = Proc::local(); + let handle = proc.spawn::("nothing", ()).await.unwrap(); + let cell = handle.cell(); + + // Invalid actor doesn't succeed. + assert!(cell.downcast_handle::().is_none()); + + let handle = cell.downcast_handle::().unwrap(); + handle.drain_and_stop().unwrap(); + handle.await; + } } diff --git a/hyperactor/src/cap.rs b/hyperactor/src/cap.rs index 0876795a2..46155e316 100644 --- a/hyperactor/src/cap.rs +++ b/hyperactor/src/cap.rs @@ -29,13 +29,20 @@ impl CanSplitPort for T {} pub trait CanSpawn: sealed::CanSpawn {} impl CanSpawn for T {} +/// CanResolveActorRef is a capability that confers the ability to resolve +/// an ActorRef to a local ActorHandle if the actor is available locally. +pub trait CanResolveActorRef: sealed::CanResolveActorRef {} +impl CanResolveActorRef for T {} + pub(crate) mod sealed { use async_trait::async_trait; + use crate::ActorRef; use crate::PortId; use crate::accum::ReducerSpec; use crate::actor::Actor; use crate::actor::ActorHandle; + use crate::actor::RemoteActor; use crate::attrs::Attrs; use crate::data::Serialized; use crate::mailbox::Mailbox; @@ -60,4 +67,11 @@ pub(crate) mod sealed { pub trait CanSpawn: Send + Sync { async fn spawn(&self, params: A::Params) -> anyhow::Result>; } + + pub trait CanResolveActorRef: Send + Sync { + fn resolve_actor_ref( + &self, + actor_ref: &ActorRef, + ) -> Option>; + } } diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index c3bc4ceee..13ca2c382 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -61,7 +61,6 @@ use crate::actor::Binds; use crate::actor::RemoteActor; use crate::actor::RemoteHandles; use crate::actor::Signal; -use crate::actor::WeakActorHandle; use crate::attrs::Attrs; use crate::cap; use crate::clock::Clock; @@ -123,9 +122,7 @@ struct ProcState { /// Keep track of all of the active actors in the proc. ledger: ActorLedger, - /// Registry of typed actor handles by name and type, for actor lookup. - /// Maps (actor_name, TypeId) to type-erased ActorHandle. - actor_registry: DashMap<(String, TypeId), Box>, + instances: DashMap, /// Used by root actors to send events to the actor coordinating /// supervision of root actors in this proc. @@ -330,7 +327,7 @@ impl Proc { forwarder, roots: DashMap::new(), ledger: ActorLedger::new(), - actor_registry: DashMap::new(), + instances: DashMap::new(), supervision_coordinator_port: OnceLock::new(), clock, })) @@ -465,16 +462,7 @@ impl Proc { .ledger .insert(actor_id.clone(), instance.cell.downgrade())?; - let handle = instance.start(actor).await?; - - // Register a weak reference to the actor handle in the registry for later lookup - let registry_key = (name.to_string(), TypeId::of::()); - let weak_handle = handle.downgrade(); - self.state() - .actor_registry - .insert(registry_key, Box::new(weak_handle)); - - Ok(handle) + instance.start(actor).await } /// Spawn a child actor from the provided parent on this proc. The parent actor @@ -804,10 +792,12 @@ impl Instance { let cell = InstanceCell::new( actor_id, actor_type, + proc.clone(), signal_port, supervision_port, status_rx, parent, + ports.clone(), ); let start = proc.clock().now(); @@ -922,12 +912,6 @@ impl Instance { Err(err) => ActorStatus::Failed(err.to_string()), }; - // Clean up the actor registry entry for root actors - if self.cell.pid() == 0 { - let registry_key = (self.cell.actor_id().name().to_string(), TypeId::of::()); - self.proc.state().actor_registry.remove(®istry_key); - } - let result = self.cell.maybe_unlink_parent(); if let Some(parent) = result { if let Err(err) = parent.signal(Signal::ChildStopped(self.cell.pid())) { @@ -1195,22 +1179,6 @@ impl Instance { &self.proc } - /// Look up a root actor by name and type. - /// Returns `Some(ActorHandle)` if a root actor with the given name exists and is of type T. - /// Returns `None` if no root actor with that name exists, if it's not of the expected type, - /// or if the actor has been dropped. - pub fn get(&self, name: &str) -> Option> { - let registry_key = (name.to_string(), TypeId::of::()); - - if let Some(entry) = self.proc.state().actor_registry.get(®istry_key) { - if let Some(weak_handle) = entry.downcast_ref::>() { - return weak_handle.upgrade(); - } - } - - None - } - /// Get the current message context, if there is one. pub fn ctx(&self) -> Option<&InstanceMessageContext> { self.message_context.as_ref() @@ -1243,6 +1211,20 @@ impl cap::sealed::CanSpawn for Instance { } } +impl cap::sealed::CanResolveActorRef for Instance { + fn resolve_actor_ref( + &self, + actor_ref: &ActorRef, + ) -> Option> { + self.proc + .0 + .instances + .get(actor_ref.actor_id())? + .upgrade()? + .downcast_handle() + } +} + #[derive(Debug)] enum ActorType { Named(&'static TypeInfo), @@ -1277,6 +1259,9 @@ struct InstanceState { /// Actor info contains the actor's type information. actor_type: ActorType, + /// The proc in which the actor is running. + proc: Proc, + /// The actor's signal port. This is used to send /// signals to the actor. signal: PortHandle, @@ -1306,6 +1291,28 @@ struct InstanceState { /// The log recording associated with this actor. It is used to /// store a 'flight record' of events while the actor is running. recording: Recording, + + /// A type-erased reference to Ports, which allows us to recover + /// an ActorHandle by downcasting. + ports: Arc, +} + +impl InstanceState { + /// Unlink this instance from its parent, if it has one. If it was unlinked, + /// the parent is returned. + fn maybe_unlink_parent(&self) -> Option { + let result = self.parent.upgrade(); + if let Some(parent) = &result { + parent.state.unlink(self); + } + result + } + + /// Unlink this instance from a child. + fn unlink(&self, child: &InstanceState) { + assert_eq!(self.actor_id.proc_id(), child.actor_id.proc_id()); + self.children.remove(&child.actor_id.pid()); + } } impl InstanceCell { @@ -1314,16 +1321,19 @@ impl InstanceCell { fn new( actor_id: ActorId, actor_type: ActorType, + proc: Proc, signal: PortHandle, supervision_port: PortHandle, status: watch::Receiver, parent: Option, + ports: Arc, ) -> Self { let _ais = actor_id.to_string(); let cell = Self { state: Arc::new(InstanceState { - actor_id, + actor_id: actor_id.clone(), actor_type, + proc: proc.clone(), signal, supervision_port, status, @@ -1333,9 +1343,11 @@ impl InstanceCell { exported_named_ports: DashMap::new(), num_processed_messages: AtomicU64::new(0), recording: hyperactor_telemetry::recorder().record(64), + ports, }), }; cell.maybe_link_parent(); + proc.0.instances.insert(actor_id, cell.downgrade()); cell } @@ -1418,11 +1430,7 @@ impl InstanceCell { /// Unlink this instance from its parent, if it has one. If it was unlinked, /// the parent is returned. fn maybe_unlink_parent(&self) -> Option { - let result = self.state.parent.upgrade(); - if let Some(parent) = &result { - parent.unlink(self); - } - result + self.state.maybe_unlink_parent() } /// Get parent instance cell, if it exists. @@ -1460,6 +1468,26 @@ impl InstanceCell { } ActorRef::attest(self.actor_id().clone()) } + + /// Attempt to downcast this cell to a concrete actor handle. + pub(crate) fn downcast_handle(&self) -> Option> { + let ports = Arc::clone(&self.state.ports).downcast::>().ok()?; + Some(ActorHandle::new(self.clone(), ports)) + } +} + +impl Drop for InstanceState { + fn drop(&mut self) { + if self.maybe_unlink_parent().is_some() { + tracing::error!( + "instance {} was dropped with parent still linked", + self.actor_id + ); + } + if self.proc.0.instances.remove(&self.actor_id).is_none() { + tracing::error!("instance {} was dropped but not in proc", self.actor_id); + } + } } /// A weak version of the InstanceCell. This is used to provide cyclical @@ -1599,6 +1627,7 @@ mod tests { use std::assert_matches::assert_matches; use std::sync::atomic::AtomicBool; + use hyperactor_macros::export; use maplit::hashmap; use serde_json::json; use tokio::sync::Barrier; @@ -1647,6 +1676,7 @@ mod tests { } #[derive(Debug)] + #[export] struct TestActor; #[derive(Handler, HandleClient, Debug)] @@ -1806,13 +1836,11 @@ mod tests { } #[derive(Debug)] - struct LookupTestActor { - found_actor: Option>, - } + struct LookupTestActor; #[derive(Handler, HandleClient, Debug)] enum LookupTestMessage { - ActorExists(String, #[reply] OncePortRef), + ActorExists(ActorRef, #[reply] OncePortRef), } #[async_trait] @@ -1820,7 +1848,7 @@ mod tests { type Params = (); async fn new(_params: ()) -> Result { - Ok(Self { found_actor: None }) + Ok(Self) } } @@ -1830,10 +1858,9 @@ mod tests { async fn actor_exists( &mut self, this: &Instance, - name: String, + actor_ref: ActorRef, ) -> Result { - self.found_actor = this.get::(&name); - Ok(self.found_actor.is_some()) + Ok(actor_ref.downcast_handle(this).is_some()) } } @@ -1843,18 +1870,30 @@ mod tests { let client = proc.attach("client").unwrap(); let target_actor = proc.spawn::("target", ()).await.unwrap(); + let target_actor_ref = target_actor.bind(); let lookup_actor = proc.spawn::("lookup", ()).await.unwrap(); assert!( lookup_actor - .actor_exists(&client, "target".to_string()) + .actor_exists(&client, target_actor_ref.clone()) .await .unwrap() ); + // Make up a child actor. It shouldn't exist. assert!( !lookup_actor - .actor_exists(&client, "nonexistent".to_string()) + .actor_exists( + &client, + ActorRef::attest(target_actor.actor_id().child_id(123).clone()) + ) + .await + .unwrap() + ); + // A wrongly-typed actor ref should also not obtain. + assert!( + !lookup_actor + .actor_exists(&client, ActorRef::attest(lookup_actor.actor_id().clone())) .await .unwrap() ); @@ -1864,7 +1903,7 @@ mod tests { assert!( !lookup_actor - .actor_exists(&client, "target".to_string()) + .actor_exists(&client, target_actor_ref) .await .unwrap() ); @@ -1873,32 +1912,6 @@ mod tests { lookup_actor.await; } - #[tokio::test] - async fn test_actor_registry_cleanup() { - let proc = Proc::local(); - - let initial_size = proc.state().actor_registry.len(); - - for i in 0..5 { - let actor_name = format!("temp_actor_{}", i); - let actor = proc.spawn::(&actor_name, ()).await.unwrap(); - - assert_eq!(proc.state().actor_registry.len(), initial_size + 1); - - actor.drain_and_stop().unwrap(); - actor.await; - - assert_eq!( - proc.state().actor_registry.len(), - initial_size, - "Registry should be cleaned up after actor {} stops", - actor_name - ); - } - - assert_eq!(proc.state().actor_registry.len(), initial_size); - } - fn validate_link(child: &InstanceCell, parent: &InstanceCell) { assert_eq!(child.actor_id().proc_id(), parent.actor_id().proc_id()); assert_eq!( diff --git a/hyperactor/src/reference.rs b/hyperactor/src/reference.rs index 78fa68e79..c81abcd88 100644 --- a/hyperactor/src/reference.rs +++ b/hyperactor/src/reference.rs @@ -39,6 +39,8 @@ use serde::Deserialize; use serde::Serialize; use crate as hyperactor; +use crate::Actor; +use crate::ActorHandle; use crate::Named; use crate::RemoteHandles; use crate::RemoteMessage; @@ -651,6 +653,16 @@ impl ActorRef { pub fn into_actor_id(self) -> ActorId { self.actor_id } + + /// Attempt to downcast this reference into a (local) actor handle. + /// This will only succeed when the referenced actor is in the same + /// proc as the caller. + pub fn downcast_handle(&self, cap: &impl cap::CanResolveActorRef) -> Option> + where + A: Actor, + { + cap.resolve_actor_ref(self) + } } impl fmt::Display for ActorRef {