From 3c60a7170fa13f3fecd9c658d082471292f6d2f5 Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Thu, 20 Nov 2025 15:18:25 -0800 Subject: [PATCH 1/2] [hyperactor] remove `new` from actor We shouldn't mandate how an actor is actually instantiated. Rather, 'spawn' should just take over ownership of an `Actor` object directly, and run its loop from there. This helps to separate concerns and simplify the implementation. In this change: 1) We remote `new` from `Actor`, and only require it in `RemoteSpawn` (renamed from `RemotableActor`), in order to enable remote spawning 2) Change the spawn APIs to take ownership over an actor object 3) Remote the `derive(Actor)` macro, this can now be implemented with a simple `impl Actor for Foo{}` marker trait when no additonal behavior needs to be customized. 4) Simplify a bunch of actor construction, esp. in tests, where we like to just use simple objects. Using this, in the next change, we will make `spawn` fully synchronous. This leaves the #[export] attribute macro somewhat schizophrenic: `spawn = true` now only registers the actor in the remote registry. We should think about how to simplify this, too. Differential Revision: [D87575629](https://our.internmc.facebook.com/intern/diff/D87575629/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D87575629/)! [ghstack-poisoned] --- .../books/hyperactor-book/src/actors/index.md | 2 +- .../src/actors/remotable_actor.md | 53 ++---- .../hyperactor-book/src/macros/export.md | 4 +- hyperactor/example/derive.rs | 11 +- hyperactor/example/stream.rs | 28 ++- hyperactor/src/actor.rs | 168 ++++++------------ hyperactor/src/actor/remote.rs | 10 +- hyperactor/src/host.rs | 13 +- hyperactor/src/lib.rs | 3 +- hyperactor/src/mailbox.rs | 6 +- hyperactor/src/proc.rs | 165 ++++++++++------- hyperactor/src/test_utils/pingpong.rs | 54 +++--- hyperactor/src/test_utils/proc_supervison.rs | 11 +- hyperactor/test/host_bootstrap.rs | 3 +- hyperactor_macros/src/lib.rs | 90 ---------- hyperactor_mesh/src/actor_mesh.rs | 86 ++++++--- hyperactor_mesh/src/alloc.rs | 14 +- hyperactor_mesh/src/alloc/remoteprocess.rs | 2 +- hyperactor_mesh/src/bootstrap.rs | 22 ++- hyperactor_mesh/src/comm.rs | 30 ++-- hyperactor_mesh/src/connect.rs | 7 +- hyperactor_mesh/src/logging.rs | 78 ++++---- hyperactor_mesh/src/proc_mesh.rs | 10 +- hyperactor_mesh/src/proc_mesh/mesh_agent.rs | 6 - hyperactor_mesh/src/reference.rs | 5 +- hyperactor_mesh/src/resource/mesh.rs | 4 +- hyperactor_mesh/src/test_utils.rs | 16 +- hyperactor_mesh/src/v1/host_mesh.rs | 7 +- .../src/v1/host_mesh/mesh_agent.rs | 78 ++++---- hyperactor_mesh/src/v1/proc_mesh.rs | 8 +- hyperactor_mesh/src/v1/testactor.rs | 34 +++- hyperactor_multiprocess/src/ping_pong.rs | 4 +- hyperactor_multiprocess/src/proc_actor.rs | 99 ++++++----- hyperactor_multiprocess/src/system_actor.rs | 40 +++-- monarch_hyperactor/src/actor.rs | 42 +++-- .../src/code_sync/auto_reload.rs | 13 +- .../src/code_sync/conda_sync.rs | 7 +- monarch_hyperactor/src/code_sync/manager.rs | 14 +- monarch_hyperactor/src/code_sync/rsync.rs | 7 +- monarch_hyperactor/src/local_state_broker.rs | 14 +- monarch_hyperactor/src/logging.rs | 4 +- monarch_hyperactor/src/proc.rs | 14 +- monarch_hyperactor/src/proc_mesh.rs | 10 +- monarch_hyperactor/src/v1/actor_mesh.rs | 4 +- monarch_hyperactor/src/v1/logging.rs | 5 +- 45 files changed, 676 insertions(+), 629 deletions(-) diff --git a/docs/source/books/hyperactor-book/src/actors/index.md b/docs/source/books/hyperactor-book/src/actors/index.md index 3e732e0e9..1da0317fb 100644 --- a/docs/source/books/hyperactor-book/src/actors/index.md +++ b/docs/source/books/hyperactor-book/src/actors/index.md @@ -8,7 +8,7 @@ This chapter introduces the actor system in hyperactor. We'll cover: - The [`Actor`](./actor.md) trait and its lifecycle hooks - The [`Handler`](./handler.md) trait for defining message-handling behavior -- The [`RemotableActor`](./remotable_actor.md) trait for enabling remote spawning +- The [`RemoteSpawn`](./remotable_actor.md) trait for enabling remote spawning - The [`Checkpointable`](./checkpointable.md) trait for supporting actor persistence and recovery - The [`Referable`](./remote_actor.md) marker trait for remotely referencable types - The [`Binds`](./binds.md) trait for wiring exported ports to reference types diff --git a/docs/source/books/hyperactor-book/src/actors/remotable_actor.md b/docs/source/books/hyperactor-book/src/actors/remotable_actor.md index d060cfb45..5bd151393 100644 --- a/docs/source/books/hyperactor-book/src/actors/remotable_actor.md +++ b/docs/source/books/hyperactor-book/src/actors/remotable_actor.md @@ -1,26 +1,30 @@ # The `RemoteableActor` Trait ```rust -pub trait RemotableActor: Actor -where - Self::Params: RemoteMessage, -{ +pub trait RemoteSpawn: Actor + Referable + Binds { + /// The type of parameters used to instantiate the actor remotely. + type Params: RemoteMessage; + + /// Creates a new actor instance given its instantiation parameters. + async fn new(params: Self::Params) -> anyhow::Result; + fn gspawn( proc: &Proc, name: &str, serialized_params: Data, - ) -> Pin> + Send>>; + ) -> Pin> + Send>> { /* default impl. */} fn get_type_id() -> TypeId { TypeId::of::() } } ``` -The `RemotableActor` trait marks an actor type as spawnable across process boundaries. It enables hyperactor's remote spawning and registration system, allowing actors to be created from serialized parameters in a different `Proc`. +The `RemoteSpawn` trait marks an actor type as spawnable across process boundaries. It enables hyperactor's remote spawning and registration system, allowing actors to be created from serialized parameters in a different `Proc`. ## Requirements - The actor type must also implement `Actor`. -- Its `Params` type (used in `Actor::new`) must implement `RemoteMessage`, so it can be serialized and transmitted over the network. +- Its `Params` type (used in `RemoteSpawn::new`) must implement `RemoteMessage`, so it can be serialized and transmitted over the network. +- `new` creates a new instance of the actor given its parameters ## `gspawn` ```rust @@ -39,41 +43,8 @@ The method deserializes the parameters, creates the actor, and returns its `Acto This is used internally by hyperactor's remote actor registry and `spawn` services. Ordinary users generally don't call this directly. -> **Note:** This is not an `async fn` because `RemotableActor` must be object-safe. +> **Note:** This is not an `async fn` because `RemoteSpawn` must be object-safe. ## `get_type_id` Returns a stable `TypeId` for the actor type. Used to identify actor types at runtime—e.g., in registration tables or type-based routing logic. - -## Blanket Implementation - -The RemotableActor trait is automatically implemented for any actor type `A` that: -- implements `Actor` and `Referable`, -- and whose `Params` type implements `RemoteMessage`. - -This allows `A` to be remotely registered and instantiated from serialized data, typically via the runtime's registration mechanism. - -```rust -impl RemotableActor for A -where - A: Actor + Referable, - A: Binds, - A::Params: RemoteMessage, -{ - fn gspawn( - proc: &Proc, - name: &str, - serialized_params: Data, - ) -> Pin> + Send>> { - let proc = proc.clone(); - let name = name.to_string(); - Box::pin(async move { - let handle = proc - .spawn::(&name, bincode::deserialize(&serialized_params)?) - .await?; - Ok(handle.bind::().actor_id) - }) - } -} -``` -Note the `Binds` bound: this trait specifies how an actor's ports are wired determining which message types the actor can receive remotely. The resulting `ActorId` corresponds to a port-bound, remotely callable version of the actor. diff --git a/docs/source/books/hyperactor-book/src/macros/export.md b/docs/source/books/hyperactor-book/src/macros/export.md index 4b5260937..73f6045b2 100644 --- a/docs/source/books/hyperactor-book/src/macros/export.md +++ b/docs/source/books/hyperactor-book/src/macros/export.md @@ -18,7 +18,7 @@ The macro expands to include: - A `Binds` implementation that registers supported message types - Implementations of `RemoteHandles` for each type in the `handlers = [...]` list - A `Referable` marker implementation - - If `spawn = true`, a `RemotableActor` implementation and an inventory registration of the `spawn` function. + - If `spawn = true`, the actor's `RemoteSpawn` implementation is registered in the remote actor inventory. This enables the actor to be: - Spawned dynamically by name @@ -46,7 +46,7 @@ impl Named for ShoppingListActor { ``` If `spawn = true`, the macro also emits: ```rust -impl RemotableActor for ShoppingListActor {} +impl RemoteSpawn for ShoppingListActor {} ``` This enables remote spawning via the default `gspawn` provided by a blanket implementation. diff --git a/hyperactor/example/derive.rs b/hyperactor/example/derive.rs index e1a8d6cc6..03d56102e 100644 --- a/hyperactor/example/derive.rs +++ b/hyperactor/example/derive.rs @@ -50,14 +50,7 @@ struct GetItemCount { // Define an actor. #[derive(Debug, Actor, Default)] -#[hyperactor::export( - spawn = true, - handlers = [ - ShoppingList, - ClearList, - GetItemCount, - ], -)] +hyperactor::proc::Ports::bind() struct ShoppingListActor(HashSet); // ShoppingListHandler is the trait generated by derive(Handler) above. @@ -140,7 +133,7 @@ async fn main() -> Result<(), anyhow::Error> { // Spawn our actor, and get a handle for rank 0. let shopping_list_actor: hyperactor::ActorHandle = - proc.spawn("shopping", ()).await?; + proc.spawn("shopping", ShoppingListActor::default()).await?; let shopping_api: hyperactor::ActorRef = shopping_list_actor.bind(); // We join the system, so that we can send messages to actors. let (client, _) = proc.instance("client").unwrap(); diff --git a/hyperactor/example/stream.rs b/hyperactor/example/stream.rs index 11bdd39c8..9c8a80987 100644 --- a/hyperactor/example/stream.rs +++ b/hyperactor/example/stream.rs @@ -18,6 +18,7 @@ use hyperactor::Handler; use hyperactor::Instance; use hyperactor::Named; use hyperactor::PortRef; +use hyperactor::RemoteSpawn; use hyperactor::proc::Proc; use serde::Deserialize; use serde::Serialize; @@ -54,13 +55,6 @@ struct CountClient { #[async_trait] impl Actor for CountClient { - // Where to send subscribe messages. - type Params = PortRef; - - async fn new(counter: PortRef) -> Result { - Ok(Self { counter }) - } - async fn init(&mut self, this: &Instance) -> Result<(), anyhow::Error> { // Subscribe to the counter on initialization. We give it our u64 port to report // messages back to. @@ -69,6 +63,16 @@ impl Actor for CountClient { } } +#[async_trait] +impl RemoteableActor for CountClient { + // Where to send subscribe messages. + type Params = PortRef; + + async fn new(counter: PortRef) -> Result { + Ok(Self { counter }) + } +} + #[async_trait] impl Handler for CountClient { async fn handle(&mut self, cx: &Context, count: u64) -> Result<(), anyhow::Error> { @@ -81,13 +85,19 @@ impl Handler for CountClient { async fn main() { let proc = Proc::local(); - let counter_actor: ActorHandle = proc.spawn("counter", ()).await.unwrap(); + let counter_actor: ActorHandle = proc + .spawn("counter", CounterActor::default()) + .await + .unwrap(); for i in 0..10 { // Spawn new "countees". Every time each subscribes, the counter broadcasts // the count to everyone. let _countee_actor: ActorHandle = proc - .spawn(&format!("countee_{}", i), counter_actor.port().bind()) + .spawn( + &format!("countee_{}", i), + CountClient::new(counter_actor.port().bind()).unwrap(), + ) .await .unwrap(); #[allow(clippy::disallowed_methods)] diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 745641a68..4996ab538 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -68,12 +68,6 @@ pub mod remote; /// actor is determined by the set (and order) of messages it receives. #[async_trait] pub trait Actor: Sized + Send + Debug + 'static { - /// The type of initialization parameters accepted by this actor. - type Params: Send + 'static; - - /// Creates a new actor instance given its instantiation parameters. - async fn new(params: Self::Params) -> Result; - /// Initialize the actor, after the runtime has been fully initialized. /// Init thus provides a mechanism by which an actor can reliably and always /// receive some initial event that can be used to kick off further @@ -104,11 +98,8 @@ pub trait Actor: Sized + Send + Debug + 'static { /// Spawn a child actor, given a spawning capability (usually given by [`Instance`]). /// The spawned actor will be supervised by the parent (spawning) actor. - async fn spawn( - cx: &impl context::Actor, - params: Self::Params, - ) -> anyhow::Result> { - cx.instance().spawn(params).await + async fn spawn(self, cx: &impl context::Actor) -> anyhow::Result> { + cx.instance().spawn(self).await } /// Spawns this actor in a detached state, handling its messages @@ -117,8 +108,8 @@ pub trait Actor: Sized + Send + Debug + 'static { /// /// Actors spawned through `spawn_detached` are not attached to a supervision /// hierarchy, and not managed by a [`Proc`]. - async fn spawn_detached(params: Self::Params) -> Result, anyhow::Error> { - Proc::local().spawn("anon", params).await + async fn spawn_detached(self) -> Result, anyhow::Error> { + Proc::local().spawn("anon", self).await } /// This method is used by the runtime to spawn the actor server. It can be @@ -175,13 +166,7 @@ pub fn handle_undeliverable_message( /// An actor that does nothing. It is used to represent "client only" actors, /// returned by [`Proc::instance`]. #[async_trait] -impl Actor for () { - type Params = (); - - async fn new(params: Self::Params) -> Result { - Ok(params) - } -} +impl Actor for () {} impl Referable for () {} @@ -240,57 +225,31 @@ where /// An `Actor` that can be spawned remotely. /// -/// Blanket-implemented for actors that opt in to remote spawn by also -/// implementing `Referable` and `Binds`, with serializable -/// params: -/// -/// ```rust,ignore -/// impl RemotableActor for A -/// where -/// A: Actor + Referable + Binds, -/// A::Params: RemoteMessage, -/// {} -/// ``` -/// /// Bounds explained: +/// - `Actor`: only actors may be remotely spawned. /// - `Referable`: marks the type as eligible for typed remote /// references (`ActorRef`); required because remote spawn /// ultimately hands back an `ActorId` that higher-level APIs may /// re-type as `ActorRef`. -/// - `Binds`: lets the runtime wire this actor's message ports -/// when it is spawned (the blanket impl calls `handle.bind::()`). -/// - `A::Params: RemoteMessage`: constructor params must be -/// (de)serializable to cross a process boundary. +/// - `Binds`: lets the runtime wire this actor's message ports +/// when it is spawned (the blanket impl calls `handle.bind::()`). /// /// `gspawn` is a type-erased entry point used by the remote /// spawn/registry machinery. It takes serialized params and returns -/// the new actor’s `ActorId`; application code shouldn’t call it +/// the new actor's `ActorId`; application code shouldn't call it /// directly. -pub trait RemotableActor: Actor -where - Self::Params: RemoteMessage, -{ +#[async_trait] +pub trait RemoteSpawn: Actor + Referable + Binds { + /// The type of parameters used to instantiate the actor remotely. + type Params: RemoteMessage; + + /// Creates a new actor instance given its instantiation parameters. + async fn new(params: Self::Params) -> anyhow::Result; + /// A type-erased entry point to spawn this actor. This is /// primarily used by hyperactor's remote actor registration /// mechanism. // TODO: consider making this 'private' -- by moving it into a non-public trait as in [`cap`]. - fn gspawn( - proc: &Proc, - name: &str, - serialized_params: Data, - ) -> Pin> + Send>>; - - /// The type ID of this actor. - fn get_type_id() -> TypeId { - TypeId::of::() - } -} - -impl RemotableActor for A -where - A: Actor + Referable + Binds, - A::Params: RemoteMessage, -{ fn gspawn( proc: &Proc, name: &str, @@ -299,9 +258,9 @@ where let proc = proc.clone(); let name = name.to_string(); Box::pin(async move { - let handle = proc - .spawn::(&name, bincode::deserialize(&serialized_params)?) - .await?; + let params = bincode::deserialize(&serialized_params)?; + let actor = Self::new(params).await?; + let handle = proc.spawn(&name, actor).await?; // We return only the ActorId, not a typed ActorRef. // Callers that hold this ID can interact with the actor // only via the serialized/opaque messaging path, which @@ -313,9 +272,14 @@ where // // This will be replaced by a proper export/registry // mechanism. - Ok(handle.bind::().actor_id) + Ok(handle.bind::().actor_id) }) } + + /// The type ID of this actor. + fn get_type_id() -> TypeId { + TypeId::of::() + } } #[async_trait] @@ -722,9 +686,9 @@ impl Clone for ActorHandle { /// - and can be carried in [`ActorRef`] values across process /// boundaries. /// -/// In contrast, [`RemotableActor`] is the trait that marks *actors* +/// In contrast, [`RemoteSpawn`] is the trait that marks *actors* /// that can actually be **spawned remotely**. A behavior may be a -/// `Referable` but is never a `RemotableActor`. +/// `Referable` but is never a `RemoteSpawn`. pub trait Referable: Named + Send + Sync {} /// Binds determines how an actor's ports are bound to a specific @@ -744,13 +708,16 @@ pub trait RemoteHandles: Referable {} /// # use serde::Serialize; /// # use serde::Deserialize; /// # use hyperactor::Named; +/// # use hyperactor::Actor; /// /// // First, define a behavior, based on handling a single message type `()`. /// hyperactor::behavior!(UnitBehavior, ()); /// -/// #[derive(hyperactor::Actor, Debug, Default)] +/// #[derive(Debug, Default)] /// struct MyActor; /// +/// impl Actor for MyActor {} +/// /// #[async_trait::async_trait] /// impl hyperactor::Handler<()> for MyActor { /// async fn handle( @@ -790,7 +757,6 @@ mod tests { use crate::checkpoint::CheckpointError; use crate::checkpoint::Checkpointable; use crate::test_utils::pingpong::PingPongActor; - use crate::test_utils::pingpong::PingPongActorParams; use crate::test_utils::pingpong::PingPongMessage; use crate::test_utils::proc_supervison::ProcSupervisionCoordinator; // for macros @@ -798,13 +764,7 @@ mod tests { struct EchoActor(PortRef); #[async_trait] - impl Actor for EchoActor { - type Params = PortRef; - - async fn new(params: PortRef) -> Result { - Ok(Self(params)) - } - } + impl Actor for EchoActor {} #[async_trait] impl Handler for EchoActor { @@ -820,7 +780,8 @@ mod tests { let proc = Proc::local(); let client = proc.attach("client").unwrap(); let (tx, mut rx) = client.open_port(); - let handle = proc.spawn::("echo", tx.bind()).await.unwrap(); + let actor = EchoActor(tx.bind()); + let handle = proc.spawn::("echo", actor).await.unwrap(); handle.send(123u64).unwrap(); handle.drain_and_stop().unwrap(); handle.await; @@ -834,14 +795,14 @@ mod tests { let client = proc.attach("client").unwrap(); let (undeliverable_msg_tx, _) = client.open_port(); - let ping_pong_actor_params = - PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None); + let ping_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None); + let pong_actor = PingPongActor::new(Some(undeliverable_msg_tx.bind()), None, None); let ping_handle = proc - .spawn::("ping", ping_pong_actor_params.clone()) + .spawn::("ping", ping_actor) .await .unwrap(); let pong_handle = proc - .spawn::("pong", ping_pong_actor_params) + .spawn::("pong", pong_actor) .await .unwrap(); @@ -865,14 +826,17 @@ mod tests { ProcSupervisionCoordinator::set(&proc).await.unwrap(); let error_ttl = 66; - let ping_pong_actor_params = - PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl)); + + let ping_actor = + PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None); + let pong_actor = + PingPongActor::new(Some(undeliverable_msg_tx.bind()), Some(error_ttl), None); let ping_handle = proc - .spawn::("ping", ping_pong_actor_params.clone()) + .spawn::("ping", ping_actor) .await .unwrap(); let pong_handle = proc - .spawn::("pong", ping_pong_actor_params) + .spawn::("pong", pong_actor) .await .unwrap(); @@ -898,12 +862,6 @@ mod tests { #[async_trait] impl Actor for InitActor { - type Params = (); - - async fn new(_params: ()) -> Result { - Ok(Self(false)) - } - async fn init(&mut self, _this: &Instance) -> Result<(), anyhow::Error> { self.0 = true; Ok(()) @@ -925,7 +883,8 @@ mod tests { #[tokio::test] async fn test_init() { let proc = Proc::local(); - let handle = proc.spawn::("init", ()).await.unwrap(); + let actor = InitActor(false); + let handle = proc.spawn::("init", actor).await.unwrap(); let client = proc.attach("client").unwrap(); let (port, receiver) = client.open_once_port(); @@ -944,16 +903,7 @@ mod tests { } #[async_trait] - impl Actor for CheckpointActor { - type Params = PortRef; - - async fn new(params: PortRef) -> Result { - Ok(Self { - sum: 0, - port: params, - }) - } - } + impl Actor for CheckpointActor {} #[async_trait] impl Handler for CheckpointActor { @@ -992,10 +942,8 @@ mod tests { async fn new() -> Self { let proc = Proc::local(); let values: MultiValues = Arc::new(Mutex::new((0, "".to_string()))); - let handle = proc - .spawn::("myactor", values.clone()) - .await - .unwrap(); + let actor = MultiActor(values.clone()); + let handle = proc.spawn::("myactor", actor).await.unwrap(); let (client, client_handle) = proc.instance("client").unwrap(); Self { proc, @@ -1030,13 +978,7 @@ mod tests { struct MultiActor(MultiValues); #[async_trait] - impl Actor for MultiActor { - type Params = MultiValues; - - async fn new(init: Self::Params) -> Result { - Ok(Self(init)) - } - } + impl Actor for MultiActor {} #[async_trait] impl Handler for MultiActor { @@ -1111,13 +1053,15 @@ mod tests { #[tokio::test] async fn test_actor_handle_downcast() { - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] struct NothingActor; + impl Actor for NothingActor {} + // Just test that we can round-trip the handle through a downcast. let proc = Proc::local(); - let handle = proc.spawn::("nothing", ()).await.unwrap(); + let handle = proc.spawn("nothing", NothingActor).await.unwrap(); let cell = handle.cell(); // Invalid actor doesn't succeed. diff --git a/hyperactor/src/actor/remote.rs b/hyperactor/src/actor/remote.rs index 09b7e5dcb..0a4fe7fff 100644 --- a/hyperactor/src/actor/remote.rs +++ b/hyperactor/src/actor/remote.rs @@ -42,8 +42,8 @@ macro_rules! remote { $crate::submit! { $crate::actor::remote::SpawnableActor { name: &[<$actor:snake:upper _NAME>], - gspawn: <$actor as $crate::actor::RemotableActor>::gspawn, - get_type_id: <$actor as $crate::actor::RemotableActor>::get_type_id, + gspawn: <$actor as $crate::actor::RemoteSpawn>::gspawn, + get_type_id: <$actor as $crate::actor::RemoteSpawn>::get_type_id, } } } @@ -141,13 +141,17 @@ mod tests { use crate as hyperactor; // for macros use crate::Context; use crate::Handler; + use crate::RemoteSpawn; #[derive(Debug)] #[hyperactor::export(handlers = [()])] struct MyActor; #[async_trait] - impl Actor for MyActor { + impl Actor for MyActor {} + + #[async_trait] + impl RemoteSpawn for MyActor { type Params = bool; async fn new(params: bool) -> Result { diff --git a/hyperactor/src/host.rs b/hyperactor/src/host.rs index b95c1749a..e2ccf2c93 100644 --- a/hyperactor/src/host.rs +++ b/hyperactor/src/host.rs @@ -1182,13 +1182,24 @@ pub mod testing { use crate::Context; use crate::Handler; use crate::OncePortRef; + use crate::RemoteSpawn; /// Just a simple actor, available in both the bootstrap binary as well as /// hyperactor tests. - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] #[hyperactor::export(handlers = [OncePortRef])] pub struct EchoActor; + impl Actor for EchoActor {} + + #[async_trait] + impl RemoteSpawn for EchoActor { + type Params = (); + async fn new(_: ()) -> anyhow::Result { + Ok(Self) + } + } + #[async_trait] impl Handler> for EchoActor { async fn handle( diff --git a/hyperactor/src/lib.rs b/hyperactor/src/lib.rs index 4e9c4596d..905005152 100644 --- a/hyperactor/src/lib.rs +++ b/hyperactor/src/lib.rs @@ -99,6 +99,7 @@ pub use actor::Actor; pub use actor::ActorHandle; pub use actor::Handler; pub use actor::RemoteHandles; +pub use actor::RemoteSpawn; // Re-export public dependencies of hyperactor_macros codegen. #[doc(hidden)] pub use anyhow; @@ -111,8 +112,6 @@ pub use cityhasher; #[doc(hidden)] pub use dashmap; // For intern_typename! pub use data::Named; -#[doc(hidden)] -pub use hyperactor_macros::Actor; #[doc(inline)] pub use hyperactor_macros::AttrValue; #[doc(inline)] diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index 0c36fda11..fb0f011f6 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -3104,9 +3104,11 @@ mod tests { ); } - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] struct Foo; + impl Actor for Foo {} + // Test that a message delivery failure causes the sending actor // to stop running. #[tokio::test] @@ -3123,7 +3125,7 @@ mod tests { let mut proc = Proc::new(proc_id.clone(), proc_forwarder); ProcSupervisionCoordinator::set(&proc).await.unwrap(); - let foo = proc.spawn::("foo", ()).await.unwrap(); + let foo = proc.spawn("foo", Foo).await.unwrap(); let return_handle = foo.port::>(); let message = MessageEnvelope::new( foo.actor_id().clone(), diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index c272f61bb..a9cf2f5c1 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -511,7 +511,7 @@ impl Proc { pub async fn spawn( &self, name: &str, - params: A::Params, + actor: A, ) -> Result, anyhow::Error> { let actor_id = self.allocate_root_id(name)?; let span = tracing::span!( @@ -525,7 +525,6 @@ impl Proc { let _guard = span.clone().entered(); Instance::new(self.clone(), actor_id.clone(), false, None) }; - let actor = A::new(params).instrument(span.clone()).await?; // Add this actor to the proc's actor ledger. We do not actively remove // inactive actors from ledger, because the actor's state can be inferred // from its weak cell. @@ -588,12 +587,11 @@ impl Proc { async fn spawn_child( &self, parent: InstanceCell, - params: A::Params, + actor: A, ) -> Result, anyhow::Error> { let actor_id = self.allocate_child_id(parent.actor_id())?; let (instance, mut actor_loop_receivers, work_rx) = Instance::new(self.clone(), actor_id, false, Some(parent.clone())); - let actor = A::new(params).await?; Ok(instance .start(actor, actor_loop_receivers.take().unwrap(), work_rx) .await) @@ -1542,13 +1540,10 @@ impl Instance { } /// Spawn on child on this instance. Currently used only by cap::CanSpawn. - pub(crate) async fn spawn( - &self, - params: C::Params, - ) -> anyhow::Result> { + pub(crate) async fn spawn(&self, actor: C) -> anyhow::Result> { self.inner .proc - .spawn_child(self.inner.cell.clone(), params) + .spawn_child(self.inner.cell.clone(), actor) .await } @@ -2285,7 +2280,7 @@ mod tests { cx: &crate::Context, reply: oneshot::Sender>, ) -> Result<(), anyhow::Error> { - let handle = ::spawn(cx, ()).await?; + let handle = TestActor::default().spawn(cx).await?; reply.send(handle).unwrap(); Ok(()) } @@ -2295,7 +2290,10 @@ mod tests { #[async_timed_test(timeout_secs = 30)] async fn test_spawn_actor() { let proc = Proc::local(); - let handle = proc.spawn::("test", ()).await.unwrap(); + let handle = proc + .spawn::("test", TestActor::default()) + .await + .unwrap(); // Check on the join handle. assert!(logs_contain( @@ -2337,15 +2335,21 @@ mod tests { .unwrap(); handle.drain_and_stop().unwrap(); - handle.await; + handle; assert_matches!(*state.borrow(), ActorStatus::Stopped); } #[async_timed_test(timeout_secs = 30)] async fn test_proc_actors_messaging() { let proc = Proc::local(); - let first = proc.spawn::("first", ()).await.unwrap(); - let second = proc.spawn::("second", ()).await.unwrap(); + let first = proc + .spawn::("first", TestActor::default()) + .await + .unwrap(); + let second = proc + .spawn::("second", TestActor::default()) + .await + .unwrap(); let (tx, rx) = oneshot::channel::<()>(); let reply_message = TestActorMessage::Reply(tx); first @@ -2354,9 +2358,11 @@ mod tests { rx.await.unwrap(); } - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] struct LookupTestActor; + impl Actor for LookupTestActor {} + #[derive(Handler, HandleClient, Debug)] enum LookupTestMessage { ActorExists(ActorRef, #[reply] OncePortRef), @@ -2379,9 +2385,15 @@ mod tests { let proc = Proc::local(); let (client, _handle) = proc.instance("client").unwrap(); - let target_actor = proc.spawn::("target", ()).await.unwrap(); - let target_actor_ref = target_actor.bind(); - let lookup_actor = proc.spawn::("lookup", ()).await.unwrap(); + let target_actor = proc + .spawn::("target", TestActor::default()) + .await + .unwrap(); + let target_actor_ref = actor::ActorHandle::bind(); + let lookup_actor = proc + .spawn::("lookup", LookupTestActor::default()) + .await + .unwrap(); assert!( lookup_actor @@ -2409,7 +2421,7 @@ mod tests { ); target_actor.drain_and_stop().unwrap(); - target_actor.await; + target_actor; assert!( !lookup_actor @@ -2439,7 +2451,10 @@ mod tests { async fn test_spawn_child() { let proc = Proc::local(); - let first = proc.spawn::("first", ()).await.unwrap(); + let first = proc + .spawn::("first", TestActor::default()) + .await + .unwrap(); let second = TestActor::spawn_child(&first).await; let third = TestActor::spawn_child(&second).await; @@ -2486,20 +2501,20 @@ mod tests { // Once each actor is stopped, it should have no linked children. let third_cell = third.cell().clone(); third.drain_and_stop().unwrap(); - third.await; + third; assert!(third_cell.inner.children.is_empty()); drop(third_cell); validate_link(second.cell(), first.cell()); let second_cell = second.cell().clone(); second.drain_and_stop().unwrap(); - second.await; + second; assert!(second_cell.inner.children.is_empty()); drop(second_cell); let first_cell = first.cell().clone(); first.drain_and_stop().unwrap(); - first.await; + first; assert!(first_cell.inner.children.is_empty()); } @@ -2507,17 +2522,20 @@ mod tests { async fn test_child_lifecycle() { let proc = Proc::local(); - let root = proc.spawn::("root", ()).await.unwrap(); + let root = proc + .spawn::("root", TestActor::default()) + .await + .unwrap(); let root_1 = TestActor::spawn_child(&root).await; let root_2 = TestActor::spawn_child(&root).await; let root_2_1 = TestActor::spawn_child(&root_2).await; root.drain_and_stop().unwrap(); - root.await; + root; for actor in [root_1, root_2, root_2_1] { assert!(actor.send(TestActorMessage::Noop()).is_err()); - assert_matches!(actor.await, ActorStatus::Stopped); + assert_matches!(actor, ActorStatus::Stopped); } } @@ -2528,7 +2546,10 @@ mod tests { // be actor failure(s) in this test which trigger supervision. ProcSupervisionCoordinator::set(&proc).await.unwrap(); - let root = proc.spawn::("root", ()).await.unwrap(); + let root = proc + .spawn::("root", TestActor::default()) + .await + .unwrap(); let root_1 = TestActor::spawn_child(&root).await; let root_2 = TestActor::spawn_child(&root).await; let root_2_1 = TestActor::spawn_child(&root_2).await; @@ -2540,7 +2561,7 @@ mod tests { .unwrap(); let _root_2_actor_id = root_2.actor_id().clone(); assert_matches!( - root_2.await, + root_2, ActorStatus::Failed(err) if err.to_string() == "some random failure" ); @@ -2548,11 +2569,11 @@ mod tests { // stopped by a parent failure? // Currently the parent fails with an error related to the child's failure. assert_matches!( - root.await, + root, ActorStatus::Failed(err) if err.to_string().contains("some random failure") ); - assert_eq!(root_2_1.await, ActorStatus::Stopped); - assert_eq!(root_1.await, ActorStatus::Stopped); + assert_eq!(root_2_1, ActorStatus::Stopped); + assert_eq!(root_1, ActorStatus::Stopped); } #[async_timed_test(timeout_secs = 30)] @@ -2568,7 +2589,10 @@ mod tests { let proc = Proc::local(); // Add the 1st root. This root will remain active until the end of the test. - let root: ActorHandle = proc.spawn::("root", ()).await.unwrap(); + let root: ActorHandle = proc + .spawn::("root", TestActor::default()) + .await + .unwrap(); wait_until_idle(&root).await; { let snapshot = proc.state().ledger.snapshot(); @@ -2582,8 +2606,10 @@ mod tests { } // Add the 2nd root. - let another_root: ActorHandle = - proc.spawn::("another_root", ()).await.unwrap(); + let another_root: ActorHandle = proc + .spawn::("another_root", TestActor::default()) + .await + .unwrap(); wait_until_idle(&another_root).await; { let snapshot = proc.state().ledger.snapshot(); @@ -2601,7 +2627,7 @@ mod tests { // Stop the 2nd root. It should be excluded from the snapshot after it // is stopped. another_root.drain_and_stop().unwrap(); - another_root.await; + another_root; { let snapshot = proc.state().ledger.snapshot(); assert_eq!( @@ -2739,7 +2765,7 @@ mod tests { // Stop root_1. This should remove it, and its child, from snapshot. root_1.drain_and_stop().unwrap(); - root_1.await; + root_1; { let snapshot = proc.state().ledger.snapshot(); assert_eq!( @@ -2773,7 +2799,7 @@ mod tests { // Finally stop root. No roots should be left in snapshot. root.drain_and_stop().unwrap(); - root.await; + root; { let snapshot = proc.state().ledger.snapshot(); assert_eq!(snapshot.roots, hashmap! {}); @@ -2789,13 +2815,7 @@ mod tests { struct TestActor(Arc); #[async_trait] - impl Actor for TestActor { - type Params = Arc; - - async fn new(param: Arc) -> Result { - Ok(Self(param)) - } - } + impl Actor for TestActor {} #[async_trait] impl Handler>> for TestActor { @@ -2823,10 +2843,8 @@ mod tests { let proc = Proc::local(); let state = Arc::new(AtomicUsize::new(0)); - let handle = proc - .spawn::("test", state.clone()) - .await - .unwrap(); + let actor = TestActor(state.clone()); + let handle = proc.spawn::("test", actor).await.unwrap(); let client = proc.attach("client").unwrap(); let (tx, rx) = client.open_once_port(); handle.send(tx).unwrap(); @@ -2850,12 +2868,15 @@ mod tests { ProcSupervisionCoordinator::set(&proc).await.unwrap(); let (client, _handle) = proc.instance("client").unwrap(); - let actor_handle = proc.spawn::("test", ()).await.unwrap(); + let actor_handle = proc + .spawn::("test", TestActor::default()) + .await + .unwrap(); actor_handle .panic(&client, "some random failure".to_string()) .await .unwrap(); - let actor_status = actor_handle.await; + let actor_status = actor_handle; // Note: even when the test passes, the panic stacktrace will still be // printed to stderr because that is the behavior controlled by the panic @@ -2879,12 +2900,6 @@ mod tests { #[async_trait] impl Actor for TestActor { - type Params = (Arc, bool); - - async fn new(param: (Arc, bool)) -> Result { - Ok(Self(param.0, param.1)) - } - async fn handle_supervision_event( &mut self, _this: &Instance, @@ -2927,13 +2942,13 @@ mod tests { let root_2_1_state = Arc::new(AtomicBool::new(false)); let root = proc - .spawn::("root", (root_state.clone(), false)) + .spawn::("root", TestActor(root_state.clone(), false)) .await .unwrap(); let root_1 = proc .spawn_child::( root.cell().clone(), - ( + TestActor( root_1_state.clone(), true, /* set true so children's event stops here */ ), @@ -2941,19 +2956,28 @@ mod tests { .await .unwrap(); let root_1_1 = proc - .spawn_child::(root_1.cell().clone(), (root_1_1_state.clone(), false)) + .spawn_child::( + root_1.cell().clone(), + TestActor(root_1_1_state.clone(), false), + ) .await .unwrap(); let root_1_1_1 = proc - .spawn_child::(root_1_1.cell().clone(), (root_1_1_1_state.clone(), false)) + .spawn_child::( + root_1_1.cell().clone(), + TestActor(root_1_1_1_state.clone(), false), + ) .await .unwrap(); let root_2 = proc - .spawn_child::(root.cell().clone(), (root_2_state.clone(), false)) + .spawn_child::(root.cell().clone(), TestActor(root_2_state.clone(), false)) .await .unwrap(); let root_2_1 = proc - .spawn_child::(root_2.cell().clone(), (root_2_1_state.clone(), false)) + .spawn_child::( + root_2.cell().clone(), + TestActor(root_2_1_state.clone(), false), + ) .await .unwrap(); @@ -2985,9 +3009,11 @@ mod tests { #[async_timed_test(timeout_secs = 30)] async fn test_instance() { - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] struct TestActor; + impl Actor for TestActor {} + #[async_trait] impl Handler<(String, PortRef)> for TestActor { async fn handle( @@ -3004,7 +3030,7 @@ mod tests { let (instance, handle) = proc.instance("my_test_actor").unwrap(); - let child_actor = TestActor::spawn(&instance, ()).await.unwrap(); + let child_actor = TestActor::default().spawn(&instance).await.unwrap(); let (port, mut receiver) = instance.open_port(); child_actor @@ -3035,7 +3061,10 @@ mod tests { // Intentionally not setting a proc supervison coordinator. This // should cause the process to terminate. // ProcSupervisionCoordinator::set(&proc).await.unwrap(); - let root = proc.spawn::("root", ()).await.unwrap(); + let root = proc + .spawn::("root", TestActor::default()) + .await + .unwrap(); let (client, _handle) = proc.instance("client").unwrap(); root.fail(&client, anyhow::anyhow!("some random failure")) .await @@ -3065,9 +3094,11 @@ mod tests { #[ignore = "until trace recording is turned back on"] #[test] fn test_handler_logging() { - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] struct LoggingActor; + impl Actor for LoggingActor {} + impl LoggingActor { async fn wait(handle: &ActorHandle) { let barrier = Arc::new(Barrier::new(2)); @@ -3128,7 +3159,7 @@ mod tests { } trace_and_block(async { - let handle = LoggingActor::spawn_detached(()).await.unwrap(); + let handle = LoggingActor::default().spawn_detached().await.unwrap(); handle.send("hello world".to_string()).unwrap(); handle.send("hello world again".to_string()).unwrap(); handle.send(123u64).unwrap(); diff --git a/hyperactor/src/test_utils/pingpong.rs b/hyperactor/src/test_utils/pingpong.rs index da50eace7..bce19ca37 100644 --- a/hyperactor/src/test_utils/pingpong.rs +++ b/hyperactor/src/test_utils/pingpong.rs @@ -21,6 +21,7 @@ use crate::Instance; use crate::Named; use crate::OncePortRef; use crate::PortRef; +use crate::RemoteSpawn; use crate::clock::Clock; use crate::clock::RealClock; use crate::mailbox::MessageEnvelope; @@ -34,9 +35,10 @@ use crate::mailbox::UndeliverableMessageError; #[derive(Serialize, Deserialize, Debug, Named)] pub struct PingPongMessage(pub u64, pub ActorRef, pub OncePortRef); -/// Initialization parameters for `PingPongActor`s. -#[derive(Debug, Named, Serialize, Deserialize, Clone)] -pub struct PingPongActorParams { +/// A PingPong actor that can play the PingPong game by sending messages around. +#[derive(Debug)] +#[hyperactor::export(spawn = true, handlers = [PingPongMessage])] +pub struct PingPongActor { /// A port to send undeliverable messages to. undeliverable_port_ref: Option>>, /// The TTL at which the actor will exit with error. @@ -45,40 +47,40 @@ pub struct PingPongActorParams { delay: Option, } -impl PingPongActorParams { - /// Create a new set of initialization parameters. +impl PingPongActor { + /// Create a new ping pong actor with the following parameters: + /// + /// - `undeliverable_port_ref`: A port to send undeliverable messages to. + /// - `error_ttl`: The TTL at which the actor will exit with error. + /// - `delay`: Manual delay before sending handling the message. pub fn new( undeliverable_port_ref: Option>>, error_ttl: Option, + delay: Option, ) -> Self { Self { undeliverable_port_ref, error_ttl, - delay: None, + delay, } } - - /// Set the delay - pub fn set_delay(&mut self, delay: Duration) { - self.delay = Some(delay); - } -} - -/// A PingPong actor that can play the PingPong game by sending messages around. -#[derive(Debug)] -#[hyperactor::export(handlers = [PingPongMessage])] -pub struct PingPongActor { - params: PingPongActorParams, } #[async_trait] -impl Actor for PingPongActor { - type Params = PingPongActorParams; +impl RemoteSpawn for PingPongActor { + type Params = ( + Option>>, + Option, + Option, + ); - async fn new(params: Self::Params) -> Result { - Ok(Self { params }) + async fn new((undeliverable_port_ref, error_ttl, delay): Self::Params) -> anyhow::Result { + Ok(Self::new(undeliverable_port_ref, error_ttl, delay)) } +} +#[async_trait] +impl Actor for PingPongActor { // This is an override of the default actor behavior. It is used // for testing the mechanism for returning undeliverable messages to // their senders. @@ -87,7 +89,7 @@ impl Actor for PingPongActor { cx: &Instance, undelivered: crate::mailbox::Undeliverable, ) -> Result<(), anyhow::Error> { - match &self.params.undeliverable_port_ref { + match &self.undeliverable_port_ref { Some(port) => port.send(cx, undelivered).unwrap(), None => { let Undeliverable(envelope) = undelivered; @@ -111,13 +113,13 @@ impl Handler for PingPongActor { ) -> anyhow::Result<()> { // PingPongActor sends the messages back and forth. When it's ttl = 0, it will stop. // User can set a preconfigured TTL that can cause mocked problem: such as an error. - if Some(ttl) == self.params.error_ttl { + if Some(ttl) == self.error_ttl { anyhow::bail!("PingPong handler encountered an Error"); } if ttl == 0 { done_port.send(cx, true)?; } else { - if let Some(delay) = self.params.delay { + if let Some(delay) = self.delay { RealClock.sleep(delay).await; } let next_message = PingPongMessage(ttl - 1, cx.bind(), done_port); @@ -126,5 +128,3 @@ impl Handler for PingPongActor { Ok(()) } } - -hyperactor::remote!(PingPongActor); diff --git a/hyperactor/src/test_utils/proc_supervison.rs b/hyperactor/src/test_utils/proc_supervison.rs index 6e40309c6..e63a1b883 100644 --- a/hyperactor/src/test_utils/proc_supervison.rs +++ b/hyperactor/src/test_utils/proc_supervison.rs @@ -42,8 +42,9 @@ impl ProcSupervisionCoordinator { /// proc. pub async fn set(proc: &Proc) -> Result { let state = ReportedEvent::new(); + let actor = ProcSupervisionCoordinator(state.clone()); let coordinator = proc - .spawn::("coordinator", state.clone()) + .spawn::("coordinator", actor) .await?; proc.set_supervision_coordinator(coordinator.port())?; Ok(state) @@ -69,13 +70,7 @@ impl ReportedEvent { } #[async_trait] -impl Actor for ProcSupervisionCoordinator { - type Params = ReportedEvent; - - async fn new(param: ReportedEvent) -> Result { - Ok(Self(param)) - } -} +impl Actor for ProcSupervisionCoordinator {} #[async_trait] impl Handler for ProcSupervisionCoordinator { diff --git a/hyperactor/test/host_bootstrap.rs b/hyperactor/test/host_bootstrap.rs index 85643e6d2..d15dfef78 100644 --- a/hyperactor/test/host_bootstrap.rs +++ b/hyperactor/test/host_bootstrap.rs @@ -18,7 +18,8 @@ async fn main() { let proc = ProcessProcManager::::boot_proc(|proc| async move { - proc.spawn("echo", ()).await + proc.spawn("echo", hyperactor::host::testing::EchoActor) + .await }) .await .unwrap(); diff --git a/hyperactor_macros/src/lib.rs b/hyperactor_macros/src/lib.rs index 3718d2051..678a4a47e 100644 --- a/hyperactor_macros/src/lib.rs +++ b/hyperactor_macros/src/lib.rs @@ -2141,96 +2141,6 @@ pub fn derive_unbind(input: TokenStream) -> TokenStream { TokenStream::from(expand) } -/// Derives the `Actor` trait for a struct. By default, generates an implementation -/// with no params (`type Params = ()` and `async fn new(_params: ()) -> Result`). -/// This requires that the Actor implements [`Default`]. -/// -/// If the `#[actor(passthrough)]` attribute is specified, generates an implementation -/// with where the parameter type is `Self` -/// (`type Params = Self` and `async fn new(instance: Self) -> Result`). -/// -/// # Examples -/// -/// Default behavior: -/// ``` -/// #[derive(Actor, Default)] -/// struct MyActor(u64); -/// ``` -/// -/// Generates: -/// ```ignore -/// #[async_trait] -/// impl Actor for MyActor { -/// type Params = (); -/// -/// async fn new(_params: ()) -> Result { -/// Ok(Default::default()) -/// } -/// } -/// ``` -/// -/// Passthrough behavior: -/// ``` -/// #[derive(Actor, Default)] -/// #[actor(passthrough)] -/// struct MyActor(u64); -/// ``` -/// -/// Generates: -/// ```ignore -/// #[async_trait] -/// impl Actor for MyActor { -/// type Params = Self; -/// -/// async fn new(instance: Self) -> Result { -/// Ok(instance) -/// } -/// } -/// ``` -#[proc_macro_derive(Actor, attributes(actor))] -pub fn derive_actor(input: TokenStream) -> TokenStream { - let input = parse_macro_input!(input as DeriveInput); - let name = &input.ident; - let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl(); - - let is_passthrough = input.attrs.iter().any(|attr| { - if attr.path().is_ident("actor") { - if let Ok(meta) = attr.parse_args_with( - syn::punctuated::Punctuated::::parse_terminated, - ) { - return meta.iter().any(|ident| ident == "passthrough"); - } - } - false - }); - - let expanded = if is_passthrough { - quote! { - #[hyperactor::async_trait::async_trait] - impl #impl_generics hyperactor::Actor for #name #ty_generics #where_clause { - type Params = Self; - - async fn new(instance: Self) -> Result { - Ok(instance) - } - } - } - } else { - quote! { - #[hyperactor::async_trait::async_trait] - impl #impl_generics hyperactor::Actor for #name #ty_generics #where_clause { - type Params = (); - - async fn new(_params: ()) -> Result { - Ok(Default::default()) - } - } - } - }; - - TokenStream::from(expanded) -} - // Helper function for common parsing and validation fn parse_observe_function( attr: TokenStream, diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index 03b9306c9..9e5b6bdee 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -628,6 +628,7 @@ pub(crate) mod test_util { use hyperactor::Handler; use hyperactor::Instance; use hyperactor::PortRef; + use hyperactor::RemoteSpawn; use ndslice::extent; use super::*; @@ -637,7 +638,7 @@ pub(crate) mod test_util { // be an entry in the spawnable actor registry in the executable // 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor // mesh test suite. - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] #[hyperactor::export( spawn = true, handlers = [ @@ -650,6 +651,17 @@ pub(crate) mod test_util { )] pub struct TestActor; + impl Actor for TestActor {} + + #[async_trait] + impl RemoteSpawn for TestActor { + type Params = (); + + async fn new(_: ()) -> anyhow::Result { + Ok(Self) + } + } + /// Request message to retrieve the actor's rank. /// /// The `bool` in the tuple controls the outcome of the handler: @@ -764,6 +776,14 @@ pub(crate) mod test_util { #[async_trait] impl Actor for ProxyActor { + async fn init(&mut self, this: &Instance) -> Result<(), anyhow::Error> { + self.actor_mesh = Some(self.proc_mesh.spawn(this, "echo", &()).await?); + Ok(()) + } + } + + #[async_trait] + impl RemoteSpawn for ProxyActor { type Params = (); async fn new(_params: Self::Params) -> Result { @@ -776,8 +796,7 @@ pub(crate) mod test_util { use crate::alloc::Allocator; use crate::alloc::LocalAllocator; - let mut allocator = LocalAllocator; - let alloc = allocator + let alloc = LocalAllocator .allocate(AllocSpec { extent: extent! { replica = 1 }, constraints: Default::default(), @@ -787,6 +806,7 @@ pub(crate) mod test_util { }) .await .unwrap(); + let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap()); let leaked: &'static Arc = Box::leak(Box::new(proc_mesh)); Ok(Self { @@ -794,11 +814,6 @@ pub(crate) mod test_util { actor_mesh: None, }) } - - async fn init(&mut self, this: &Instance) -> Result<(), anyhow::Error> { - self.actor_mesh = Some(self.proc_mesh.spawn(this, "echo", &()).await?); - Ok(()) - } } #[async_trait] @@ -847,6 +862,7 @@ mod tests { use hyperactor::ActorId; use hyperactor::PortRef; use hyperactor::ProcId; + use hyperactor::RemoteSpawn; use hyperactor::WorldId; use hyperactor::attrs::Attrs; use hyperactor::data::Encoding; @@ -937,7 +953,6 @@ mod tests { async fn test_ping_pong() { use hyperactor::test_utils::pingpong::PingPongActor; use hyperactor::test_utils::pingpong::PingPongMessage; - use hyperactor::test_utils::pingpong::PingPongActorParams; let alloc = $allocator .allocate(AllocSpec { @@ -953,9 +968,8 @@ mod tests { let mesh = ProcMesh::allocate(alloc).await.unwrap(); let (undeliverable_msg_tx, _) = mesh.client().open_port(); - let ping_pong_actor_params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None); let actor_mesh: RootActorMesh = mesh - .spawn::(&instance, "ping-pong", &ping_pong_actor_params) + .spawn::(&instance, "ping-pong", &(Some(undeliverable_msg_tx.bind()), None, None)) .await .unwrap(); @@ -970,7 +984,6 @@ mod tests { #[tokio::test] async fn test_pingpong_full_mesh() { use hyperactor::test_utils::pingpong::PingPongActor; - use hyperactor::test_utils::pingpong::PingPongActorParams; use hyperactor::test_utils::pingpong::PingPongMessage; use futures::future::join_all; @@ -992,8 +1005,7 @@ mod tests { let instance = $crate::v1::testing::instance().await; let proc_mesh = ProcMesh::allocate(alloc).await.unwrap(); let (undeliverable_tx, _undeliverable_rx) = proc_mesh.client().open_port(); - let params = PingPongActorParams::new(Some(undeliverable_tx.bind()), None); - let actor_mesh = proc_mesh.spawn::(&instance, "pingpong", ¶ms).await.unwrap(); + let actor_mesh = proc_mesh.spawn::(&instance, "pingpong", &(Some(undeliverable_tx.bind()), None, None)).await.unwrap(); let slice = actor_mesh.shape().slice(); let mut futures = Vec::new(); @@ -1275,7 +1287,6 @@ mod tests { hyperactor_telemetry::initialize_logging(hyperactor::clock::ClockKind::default()); use hyperactor::test_utils::pingpong::PingPongActor; - use hyperactor::test_utils::pingpong::PingPongActorParams; use hyperactor::test_utils::pingpong::PingPongMessage; use crate::alloc::ProcStopReason; @@ -1302,12 +1313,16 @@ mod tests { let mut mesh = ProcMesh::allocate(alloc).await.unwrap(); let mut events = mesh.events().unwrap(); - let ping_pong_actor_params = PingPongActorParams::new( - Some(PortRef::attest_message_port(mesh.client().self_id())), - None, - ); let actor_mesh: RootActorMesh = mesh - .spawn::(&instance, "ping-pong", &ping_pong_actor_params) + .spawn::( + &instance, + "ping-pong", + &( + Some(PortRef::attest_message_port(mesh.client().self_id())), + None, + None, + ), + ) .await .unwrap(); @@ -1419,7 +1434,6 @@ mod tests { #[tokio::test] async fn test_stop_actor_mesh() { use hyperactor::test_utils::pingpong::PingPongActor; - use hyperactor::test_utils::pingpong::PingPongActorParams; use hyperactor::test_utils::pingpong::PingPongMessage; let config = hyperactor::config::global::lock(); @@ -1441,17 +1455,29 @@ mod tests { let instance = crate::v1::testing::instance().await; let mesh = ProcMesh::allocate(alloc).await.unwrap(); - let ping_pong_actor_params = PingPongActorParams::new( - Some(PortRef::attest_message_port(mesh.client().self_id())), - None, - ); let mesh_one: RootActorMesh = mesh - .spawn::(&instance, "mesh_one", &ping_pong_actor_params) + .spawn::( + &instance, + "mesh_one", + &( + Some(PortRef::attest_message_port(mesh.client().self_id())), + None, + None, + ), + ) .await .unwrap(); let mesh_two: RootActorMesh = mesh - .spawn::(&instance, "mesh_two", &ping_pong_actor_params) + .spawn::( + &instance, + "mesh_two", + &( + Some(PortRef::attest_message_port(mesh.client().self_id())), + None, + None, + ), + ) .await .unwrap(); @@ -1688,6 +1714,7 @@ mod tests { use hyperactor::Actor; use hyperactor::Context; use hyperactor::Handler; + use hyperactor::RemoteSpawn; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTransport; use hyperactor::channel::ChannelTx; @@ -1716,7 +1743,10 @@ mod tests { struct EchoActor(ChannelTx); #[async_trait] - impl Actor for EchoActor { + impl Actor for EchoActor {} + + #[async_trait] + impl RemoteSpawn for EchoActor { type Params = ChannelAddr; async fn new(params: ChannelAddr) -> Result { diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index 439c7b276..c0b10d1b5 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -671,6 +671,7 @@ pub mod test_utils { use hyperactor::Context; use hyperactor::Handler; use hyperactor::Named; + use hyperactor::RemoteSpawn; use libc::atexit; use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Sender; @@ -688,7 +689,7 @@ pub mod test_utils { // be an entry in the spawnable actor registry in the executable // 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor // mesh test suite. - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] #[hyperactor::export( spawn = true, handlers = [ @@ -697,6 +698,17 @@ pub mod test_utils { )] pub struct TestActor; + impl Actor for TestActor {} + + #[async_trait] + impl RemoteSpawn for TestActor { + type Params = (); + + async fn new(_: ()) -> anyhow::Result { + Ok(Self) + } + } + #[derive(Debug, Serialize, Deserialize, Named, Clone)] pub struct Wait; diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index a83cf6be4..679d269ff 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -2593,10 +2593,10 @@ mod test_alloc { task2_allocator_handle.await.unwrap(); } - #[tracing_test::traced_test] #[async_timed_test(timeout_secs = 60)] #[cfg(fbcode_build)] async fn test_remote_process_alloc_signal_handler() { + hyperactor_telemetry::initialize_logging_for_test(); let num_proc_meshes = 5; let hosts_per_proc_mesh = 5; diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index df8f45383..35969f177 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -489,10 +489,12 @@ impl Bootstrap { let (host, _handle) = ok!(Host::serve(manager, addr).await); let addr = host.addr().clone(); - let host_mesh_agent = ok!(host - .system_proc() - .clone() - .spawn::("agent", HostAgentMode::Process(host)) + let system_proc = host.system_proc().clone(); + let host_mesh_agent = ok!(system_proc + .spawn::( + "agent", + HostMeshAgent::new(HostAgentMode::Process(host)), + ) .await); tracing::info!( @@ -2346,6 +2348,7 @@ mod tests { use hyperactor::ActorId; use hyperactor::ActorRef; use hyperactor::ProcId; + use hyperactor::RemoteSpawn; use hyperactor::WorldId; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTransport; @@ -2608,14 +2611,19 @@ mod tests { // Spawn the log client and disable aggregation (immediate // print + tap push). - let log_client: ActorRef = - proc.spawn("log_client", ()).await.unwrap().bind(); + let log_client_actor = LogClientActor::new(()).await.unwrap(); + let log_client: ActorRef = proc + .spawn("log_client", log_client_actor) + .await + .unwrap() + .bind(); log_client.set_aggregate(&client, None).await.unwrap(); // Spawn the forwarder in this proc (it will serve // BOOTSTRAP_LOG_CHANNEL). + let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap(); let _log_forwarder: ActorRef = proc - .spawn("log_forwarder", log_client.clone()) + .spawn("log_forwarder", log_forwarder_actor) .await .unwrap() .bind(); diff --git a/hyperactor_mesh/src/comm.rs b/hyperactor_mesh/src/comm.rs index bbbc63f4d..b1660b559 100644 --- a/hyperactor_mesh/src/comm.rs +++ b/hyperactor_mesh/src/comm.rs @@ -75,7 +75,7 @@ struct ReceiveState { /// This is the comm actor used for efficient and scalable message multicasting /// and result accumulation. -#[derive(Debug)] +#[derive(Debug, Default)] #[hyperactor::export( spawn = true, handlers = [ @@ -162,16 +162,6 @@ impl CommActorMode { #[async_trait] impl Actor for CommActor { - type Params = CommActorParams; - - async fn new(_params: Self::Params) -> Result { - Ok(Self { - send_seq: HashMap::new(), - recv_state: HashMap::new(), - mode: Default::default(), - }) - } - // This is an override of the default actor behavior. async fn handle_undeliverable_message( &mut self, @@ -242,6 +232,19 @@ impl Actor for CommActor { } } +#[async_trait] +impl hyperactor::RemoteSpawn for CommActor { + type Params = CommActorParams; + + async fn new(_params: Self::Params) -> Result { + Ok(Self { + send_seq: HashMap::new(), + recv_state: HashMap::new(), + mode: Default::default(), + }) + } +} + impl CommActor { /// Forward the message to the comm actor on the given peer rank. fn forward( @@ -522,7 +525,10 @@ pub mod test_utils { } #[async_trait] - impl Actor for TestActor { + impl Actor for TestActor {} + + #[async_trait] + impl hyperactor::RemoteSpawn for TestActor { type Params = TestActorParams; async fn new(params: Self::Params) -> Result { diff --git a/hyperactor_mesh/src/connect.rs b/hyperactor_mesh/src/connect.rs index 455502229..62f3ec987 100644 --- a/hyperactor_mesh/src/connect.rs +++ b/hyperactor_mesh/src/connect.rs @@ -397,15 +397,18 @@ mod tests { use hyperactor::Actor; use hyperactor::Context; use hyperactor::Handler; + use hyperactor::RemoteSpawn; use hyperactor::proc::Proc; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use super::*; - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] struct EchoActor {} + impl Actor for EchoActor {} + #[async_trait] impl Handler for EchoActor { async fn handle( @@ -427,7 +430,7 @@ mod tests { let proc = Proc::local(); let (client, _client_handle) = proc.instance("client")?; let (connect, completer) = Connect::allocate(client.self_id().clone(), client); - let actor = proc.spawn::("actor", ()).await?; + let actor = proc.spawn("actor", EchoActor {}).await?; actor.send(connect)?; let (mut rd, mut wr) = completer.complete().await?.into_split(); let send = [3u8, 4u8, 5u8, 6u8]; diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index 2e1b817c5..fa8e3a2b4 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -998,6 +998,21 @@ pub struct LogForwardActor { #[async_trait] impl Actor for LogForwardActor { + async fn init(&mut self, this: &Instance) -> Result<(), anyhow::Error> { + this.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?; + + // Make sure we start the flush loop periodically so the log channel will not deadlock. + self.flush_tx + .lock() + .await + .send(LogMessage::Flush { sync_version: None }) + .await?; + Ok(()) + } +} + +#[async_trait] +impl hyperactor::RemoteSpawn for LogForwardActor { type Params = ActorRef; async fn new(logging_client_ref: Self::Params) -> Result { @@ -1045,18 +1060,6 @@ impl Actor for LogForwardActor { stream_to_client: true, }) } - - async fn init(&mut self, this: &Instance) -> Result<(), anyhow::Error> { - this.self_message_with_delay(LogForwardMessage::Forward {}, Duration::from_secs(0))?; - - // Make sure we start the flush loop periodically so the log channel will not deadlock. - self.flush_tx - .lock() - .await - .send(LogMessage::Flush { sync_version: None }) - .await?; - Ok(()) - } } #[async_trait] @@ -1182,6 +1185,25 @@ pub struct LogClientActor { current_unflushed_procs: usize, } +impl Default for LogClientActor { + fn default() -> Self { + // Initialize aggregators + let mut aggregators = HashMap::new(); + aggregators.insert(OutputTarget::Stderr, Aggregator::new()); + aggregators.insert(OutputTarget::Stdout, Aggregator::new()); + + Self { + aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC), + aggregators, + last_flush_time: RealClock.system_time_now(), + next_flush_deadline: None, + current_flush_version: 0, + current_flush_port: None, + current_unflushed_procs: 0, + } + } +} + impl LogClientActor { fn print_aggregators(&mut self) { for (output_target, aggregator) in self.aggregators.iter_mut() { @@ -1222,25 +1244,15 @@ impl LogClientActor { } #[async_trait] -impl Actor for LogClientActor { +impl Actor for LogClientActor {} + +#[async_trait] +impl hyperactor::RemoteSpawn for LogClientActor { /// The aggregation window in seconds. type Params = (); async fn new(_: ()) -> Result { - // Initialize aggregators - let mut aggregators = HashMap::new(); - aggregators.insert(OutputTarget::Stderr, Aggregator::new()); - aggregators.insert(OutputTarget::Stdout, Aggregator::new()); - - Ok(Self { - aggregate_window_sec: Some(DEFAULT_AGGREGATE_WINDOW_SEC), - aggregators, - last_flush_time: RealClock.system_time_now(), - next_flush_deadline: None, - current_flush_version: 0, - current_flush_port: None, - current_unflushed_procs: 0, - }) + Ok(LogClientActor::default()) } } @@ -1458,6 +1470,7 @@ mod tests { use std::sync::Arc; use std::sync::Mutex; + use hyperactor::RemoteSpawn; use hyperactor::channel; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTx; @@ -1599,10 +1612,15 @@ mod tests { unsafe { std::env::set_var(BOOTSTRAP_LOG_CHANNEL, log_channel.to_string()); } - let log_client: ActorRef = - proc.spawn("log_client", ()).await.unwrap().bind(); + let log_client_actor = LogClientActor::new(()).await.unwrap(); + let log_client: ActorRef = proc + .spawn("log_client", log_client_actor) + .await + .unwrap() + .bind(); + let log_forwarder_actor = LogForwardActor::new(log_client.clone()).await.unwrap(); let log_forwarder: ActorRef = proc - .spawn("log_forwarder", log_client) + .spawn("log_forwarder", log_forwarder_actor) .await .unwrap() .bind(); diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index 634d97dd5..b6cc4d2df 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -23,9 +23,9 @@ use hyperactor::ActorId; use hyperactor::ActorRef; use hyperactor::Instance; use hyperactor::RemoteMessage; +use hyperactor::RemoteSpawn; use hyperactor::WorldId; use hyperactor::actor::ActorStatus; -use hyperactor::actor::Referable; use hyperactor::actor::remote::Remote; use hyperactor::channel; use hyperactor::channel::ChannelAddr; @@ -501,7 +501,7 @@ impl ProcMesh { /// Referable`. /// - `A::Params: RemoteMessage` - params must serialize for /// cross-proc spawn. - async fn spawn_on_procs( + async fn spawn_on_procs( cx: &impl context::Actor, agents: impl IntoIterator> + '_, actor_name: &str, @@ -608,7 +608,7 @@ impl ProcMesh { /// Referable`. /// - `A::Params: RemoteMessage` — params must be serializable to /// cross proc boundaries when launching each actor. - pub async fn spawn( + pub async fn spawn( &self, cx: &impl context::Actor, actor_name: &str, @@ -958,7 +958,7 @@ impl ProcEvents { pub trait SharedSpawnable { // `Actor`: the type actually runs in the mesh; // `Referable`: so we can hand back ActorRef in RootActorMesh - async fn spawn( + async fn spawn( self, cx: &impl context::Actor, actor_name: &str, @@ -972,7 +972,7 @@ pub trait SharedSpawnable { impl + Send + Sync + 'static> SharedSpawnable for D { // `Actor`: the type actually runs in the mesh; // `Referable`: so we can hand back ActorRef in RootActorMesh - async fn spawn( + async fn spawn( self, cx: &impl context::Actor, actor_name: &str, diff --git a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs index 8ef942c78..07a527ee8 100644 --- a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs @@ -287,12 +287,6 @@ impl ProcMeshAgent { #[async_trait] impl Actor for ProcMeshAgent { - type Params = Self; - - async fn new(params: Self::Params) -> Result { - Ok(params) - } - async fn init(&mut self, this: &Instance) -> Result<(), anyhow::Error> { self.proc.set_supervision_coordinator(this.port())?; Ok(()) diff --git a/hyperactor_mesh/src/reference.rs b/hyperactor_mesh/src/reference.rs index a1b3b8d10..8f8c67bb0 100644 --- a/hyperactor_mesh/src/reference.rs +++ b/hyperactor_mesh/src/reference.rs @@ -289,7 +289,10 @@ mod tests { } #[async_trait] - impl Actor for MeshPingPongActor { + impl Actor for MeshPingPongActor {} + + #[async_trait] + impl hyperactor::RemoteSpawn for MeshPingPongActor { type Params = MeshPingPongActorParams; async fn new(params: Self::Params) -> Result { diff --git a/hyperactor_mesh/src/resource/mesh.rs b/hyperactor_mesh/src/resource/mesh.rs index 3f463ce54..2084d14d4 100644 --- a/hyperactor_mesh/src/resource/mesh.rs +++ b/hyperactor_mesh/src/resource/mesh.rs @@ -101,9 +101,11 @@ mod test { type State = (); } - #[derive(Actor, Debug, Default, Named, Serialize, Deserialize)] + #[derive(Debug, Default, Named, Serialize, Deserialize)] struct TestMeshController; + impl Actor for TestMeshController {} + // Ensure that TestMeshController conforms to the Controller behavior for TestMesh. handler! { TestMeshController, diff --git a/hyperactor_mesh/src/test_utils.rs b/hyperactor_mesh/src/test_utils.rs index f21be6a1f..930ded4b8 100644 --- a/hyperactor_mesh/src/test_utils.rs +++ b/hyperactor_mesh/src/test_utils.rs @@ -12,6 +12,7 @@ use hyperactor::Bind; use hyperactor::Context; use hyperactor::Handler; use hyperactor::Named; +use hyperactor::RemoteSpawn; use hyperactor::Unbind; use serde::Deserialize; use serde::Serialize; @@ -20,18 +21,29 @@ use serde::Serialize; #[derive(Serialize, Deserialize, Debug, Named, Clone, Bind, Unbind)] pub struct EmptyMessage(); -#[derive(Debug, PartialEq, Default, Actor)] +#[derive(Debug, PartialEq, Default)] #[hyperactor::export( + spawn = true, handlers = [ EmptyMessage { cast = true }, ], )] pub struct EmptyActor(); +#[async_trait] +impl RemoteSpawn for EmptyActor { + type Params = (); + + async fn new(_: ()) -> anyhow::Result { + Ok(EmptyActor::default()) + } +} + +impl Actor for EmptyActor {} + #[async_trait] impl Handler for EmptyActor { async fn handle(&mut self, _: &Context, _: EmptyMessage) -> Result<(), anyhow::Error> { Ok(()) } } -hyperactor::remote!(EmptyActor); diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index c56570b26..7872049d0 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -259,10 +259,9 @@ impl HostMesh { let manager = BootstrapProcManager::new(bootstrap_cmd)?; let (host, _handle) = Host::serve(manager, addr).await?; let addr = host.addr().clone(); - let host_mesh_agent = host - .system_proc() - .clone() - .spawn::("agent", HostAgentMode::Process(host)) + let system_proc = host.system_proc().clone(); + let host_mesh_agent = system_proc + .spawn::("agent", HostMeshAgent::new(HostAgentMode::Process(host))) .await .map_err(v1::Error::SingletonActorSpawnError)?; host_mesh_agent.bind::(); diff --git a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs index 9d89d64e0..0e5ac388d 100644 --- a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs @@ -106,11 +106,22 @@ struct ProcCreationState { ShutdownHost ] )] +#[derive(Default)] pub struct HostMeshAgent { host: Option, created: HashMap, } +impl HostMeshAgent { + /// Create a new host mesh agent running in the provided mode. + pub fn new(mode: HostAgentMode) -> Self { + Self { + host: Some(mode), + created: HashMap::new(), + } + } +} + impl fmt::Debug for HostMeshAgent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("HostMeshAgent") @@ -121,27 +132,7 @@ impl fmt::Debug for HostMeshAgent { } #[async_trait] -impl Actor for HostMeshAgent { - type Params = HostAgentMode; - - async fn new(host: HostAgentMode) -> anyhow::Result { - if let HostAgentMode::Process(_) = host { - let (directory, file) = hyperactor_telemetry::log_file_path( - hyperactor_telemetry::env::Env::current(), - None, - ) - .unwrap(); - eprintln!( - "Monarch internal logs are being written to {}/{}.log", - directory, file - ); - } - Ok(Self { - host: Some(host), - created: HashMap::new(), - }) - } -} +impl Actor for HostMeshAgent {} #[async_trait] impl Handler> for HostMeshAgent { @@ -438,6 +429,14 @@ pub(crate) struct HostMeshAgentProcMeshTrampoline { #[async_trait] impl Actor for HostMeshAgentProcMeshTrampoline { + async fn init(&mut self, this: &Instance) -> anyhow::Result<()> { + self.reply_port.send(this, self.host_mesh_agent.bind())?; + Ok(()) + } +} + +#[async_trait] +impl hyperactor::RemoteSpawn for HostMeshAgentProcMeshTrampoline { type Params = ( ChannelTransport, PortRef>, @@ -462,22 +461,31 @@ impl Actor for HostMeshAgentProcMeshTrampoline { HostAgentMode::Process(host) }; - let host_mesh_agent = host - .system_proc() - .clone() - .spawn::("agent", host) - .await?; + if let HostAgentMode::Process(_) = &host { + let (directory, file) = hyperactor_telemetry::log_file_path( + hyperactor_telemetry::env::Env::current(), + None, + ) + .unwrap(); + eprintln!( + "Monarch internal logs are being written to {}/{}.log", + directory, file + ); + } + + let system_proc = host.system_proc().clone(); + let actor = HostMeshAgent { + host: Some(host), + created: HashMap::new(), + }; + + let host_mesh_agent = system_proc.spawn::("agent", actor).await?; Ok(Self { host_mesh_agent, reply_port, }) } - - async fn init(&mut self, this: &Instance) -> anyhow::Result<()> { - self.reply_port.send(this, self.host_mesh_agent.bind())?; - Ok(()) - } } #[derive(Serialize, Deserialize, Debug, Named, Handler, RefClient)] @@ -525,7 +533,13 @@ mod tests { let host_addr = host.addr().clone(); let system_proc = host.system_proc().clone(); let host_agent = system_proc - .spawn::("agent", HostAgentMode::Process(host)) + .spawn::( + "agent", + HostMeshAgent { + host: Some(HostAgentMode::Process(host)), + created: HashMap::new(), + }, + ) .await .unwrap(); diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 6665a33d1..d200decc7 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -18,12 +18,12 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::time::Duration; -use hyperactor::Actor; use hyperactor::ActorId; use hyperactor::ActorRef; use hyperactor::Named; use hyperactor::ProcId; use hyperactor::RemoteMessage; +use hyperactor::RemoteSpawn; use hyperactor::accum::ReducerOpts; use hyperactor::actor::ActorStatus; use hyperactor::actor::Referable; @@ -822,7 +822,7 @@ impl ProcMeshRef { /// inside the `ActorMesh`. /// - `A::Params: RemoteMessage` - spawn parameters must be /// serializable and routable. - pub async fn spawn( + pub async fn spawn( &self, cx: &impl context::Actor, name: &str, @@ -841,7 +841,7 @@ impl ProcMeshRef { /// /// Note: avoid using service actors if possible; the mechanism will /// be replaced by an actor registry. - pub async fn spawn_service( + pub async fn spawn_service( &self, cx: &impl context::Actor, name: &str, @@ -867,7 +867,7 @@ impl ProcMeshRef { /// inside the `ActorMesh`. /// - `A::Params: RemoteMessage` - spawn parameters must be /// serializable and routable. - pub(crate) async fn spawn_with_name( + pub(crate) async fn spawn_with_name( &self, cx: &impl context::Actor, name: Name, diff --git a/hyperactor_mesh/src/v1/testactor.rs b/hyperactor_mesh/src/v1/testactor.rs index df56443d4..5f6baf095 100644 --- a/hyperactor_mesh/src/v1/testactor.rs +++ b/hyperactor_mesh/src/v1/testactor.rs @@ -28,6 +28,7 @@ use hyperactor::Instance; use hyperactor::Named; use hyperactor::PortRef; use hyperactor::RefClient; +use hyperactor::RemoteSpawn; use hyperactor::Unbind; #[cfg(test)] use hyperactor::clock::Clock as _; @@ -53,7 +54,7 @@ use crate::v1::ActorMeshRef; use crate::v1::testing; /// A simple test actor used by various unit tests. -#[derive(Actor, Default, Debug)] +#[derive(Default, Debug)] #[hyperactor::export( spawn = true, handlers = [ @@ -67,6 +68,17 @@ use crate::v1::testing; )] pub struct TestActor; +impl Actor for TestActor {} + +#[async_trait] +impl RemoteSpawn for TestActor { + type Params = (); + + async fn new(_params: Self::Params) -> Result { + Ok(Self) + } +} + /// A message that returns the recipient actor's id. #[derive(Debug, Clone, Named, Bind, Unbind, Serialize, Deserialize)] pub struct GetActorId(#[binding(include)] pub PortRef); @@ -131,12 +143,6 @@ pub struct TestActorWithSupervisionHandling; #[async_trait] impl Actor for TestActorWithSupervisionHandling { - type Params = (); - - async fn new(_params: Self::Params) -> Result { - Ok(Self {}) - } - async fn handle_supervision_event( &mut self, _this: &Instance, @@ -148,6 +154,15 @@ impl Actor for TestActorWithSupervisionHandling { } } +#[async_trait] +impl RemoteSpawn for TestActorWithSupervisionHandling { + type Params = (); + + async fn new(_params: Self::Params) -> Result { + Ok(Self) + } +} + #[async_trait] impl Handler for TestActorWithSupervisionHandling { async fn handle( @@ -224,7 +239,10 @@ impl Handler for TestActor { pub struct FailingCreateTestActor; #[async_trait] -impl Actor for FailingCreateTestActor { +impl Actor for FailingCreateTestActor {} + +#[async_trait] +impl hyperactor::RemoteSpawn for FailingCreateTestActor { type Params = (); async fn new(_params: Self::Params) -> Result { diff --git a/hyperactor_multiprocess/src/ping_pong.rs b/hyperactor_multiprocess/src/ping_pong.rs index 694a1397b..7d8f57235 100644 --- a/hyperactor_multiprocess/src/ping_pong.rs +++ b/hyperactor_multiprocess/src/ping_pong.rs @@ -21,7 +21,6 @@ mod tests { use hyperactor::reference::WorldId; use hyperactor::simnet; use hyperactor::test_utils::pingpong::PingPongActor; - use hyperactor::test_utils::pingpong::PingPongActorParams; use hyperactor::test_utils::pingpong::PingPongMessage; use crate::System; @@ -104,12 +103,11 @@ mod tests { .await .unwrap(); - let params = PingPongActorParams::new(None, None); spawn::( cx, &bootstrap.proc_actor.bind(), actor_index.to_string().as_str(), - ¶ms, + &(None, None, None), ) .await .unwrap() diff --git a/hyperactor_multiprocess/src/proc_actor.rs b/hyperactor_multiprocess/src/proc_actor.rs index 26b6b607a..082c6f634 100644 --- a/hyperactor_multiprocess/src/proc_actor.rs +++ b/hyperactor_multiprocess/src/proc_actor.rs @@ -27,12 +27,11 @@ use hyperactor::Named; use hyperactor::OncePortRef; use hyperactor::PortRef; use hyperactor::RefClient; -use hyperactor::RemoteMessage; +use hyperactor::RemoteSpawn; use hyperactor::WorldId; use hyperactor::actor::ActorErrorKind; use hyperactor::actor::ActorHandle; use hyperactor::actor::ActorStatus; -use hyperactor::actor::Referable; use hyperactor::actor::remote::Remote; use hyperactor::channel; use hyperactor::channel::ChannelAddr; @@ -425,9 +424,9 @@ impl ProcActor { let handle = match proc .clone() - .spawn::( + .spawn( "proc", - ProcActorParams { + ProcActor::new(ProcActorParams { proc: proc.clone(), world_id: world_id.clone(), system_actor_ref: SYSTEM_ACTOR_REF.clone(), @@ -438,7 +437,7 @@ impl ProcActor { supervision_update_interval, labels, lifecycle_mode, - }, + }), ) .await { @@ -449,11 +448,7 @@ impl ProcActor { } }; - let comm_actor = match proc - .clone() - .spawn::("comm", Default::default()) - .await - { + let comm_actor = match proc.clone().spawn("comm", CommActor::default()).await { Ok(handle) => handle, Err(e) => { Self::failed_proc_bootstrap_cleanup(mailbox_handle).await; @@ -502,18 +497,6 @@ impl ProcActor { #[async_trait] impl Actor for ProcActor { - type Params = ProcActorParams; - - async fn new(params: ProcActorParams) -> Result { - let last_successful_supervision_update = params.proc.clock().system_time_now(); - Ok(Self { - params, - state: ProcState::AwaitingJoin, - remote: Remote::collect(), - last_successful_supervision_update, - }) - } - async fn init(&mut self, this: &Instance) -> anyhow::Result<()> { // Bind ports early so that when the proc actor joins, it can serve. this.bind::(); @@ -550,6 +533,16 @@ impl Actor for ProcActor { } impl ProcActor { + fn new(params: ProcActorParams) -> Self { + let last_successful_supervision_update = params.proc.clock().system_time_now(); + Self { + params, + state: ProcState::AwaitingJoin, + remote: Remote::collect(), + last_successful_supervision_update, + } + } + /// This proc's rank in the world. fn rank(&self) -> Index { self.params @@ -837,15 +830,12 @@ impl Handler for ProcActor { /// Convenience utility to spawn an actor on a proc. Spawn returns /// with the new ActorRef on success. -pub async fn spawn( +pub async fn spawn( cx: &impl context::Actor, proc_actor: &ActorRef, actor_name: &str, params: &A::Params, -) -> Result, anyhow::Error> -where - A::Params: RemoteMessage, -{ +) -> Result, anyhow::Error> { let remote = Remote::collect(); let (spawned_port, mut spawned_receiver) = open_port(cx); let ActorId(proc_id, _, _) = (*proc_actor).clone().into(); @@ -880,6 +870,7 @@ mod tests { use std::collections::HashSet; use std::time::Duration; + use hyperactor::RemoteSpawn; use hyperactor::actor::ActorStatus; use hyperactor::channel; use hyperactor::channel::ChannelAddr; @@ -891,7 +882,6 @@ mod tests { use hyperactor::id; use hyperactor::reference::ActorRef; use hyperactor::test_utils::pingpong::PingPongActor; - use hyperactor::test_utils::pingpong::PingPongActorParams; use hyperactor::test_utils::pingpong::PingPongMessage; use maplit::hashset; use rand::Rng; @@ -970,7 +960,7 @@ mod tests { server_handle.await; } - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] #[hyperactor::export( spawn = true, handlers = [ @@ -979,6 +969,17 @@ mod tests { )] struct TestActor; + impl Actor for TestActor {} + + #[async_trait] + impl RemoteSpawn for TestActor { + type Params = (); + + async fn new(_: ()) -> anyhow::Result { + Ok(Self) + } + } + #[derive(Handler, HandleClient, RefClient, Serialize, Deserialize, Debug, Named)] enum TestActorMessage { Increment(u64, #[reply] OncePortRef), @@ -1031,7 +1032,7 @@ mod tests { } // Sleep - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] #[hyperactor::export( spawn = true, handlers = [ @@ -1040,6 +1041,17 @@ mod tests { )] struct SleepActor {} + impl Actor for SleepActor {} + + #[async_trait] + impl RemoteSpawn for SleepActor { + type Params = (); + + async fn new(_: ()) -> anyhow::Result { + Ok(Default::default()) + } + } + #[async_trait] impl Handler for SleepActor { async fn handle(&mut self, _cx: &Context, message: u64) -> anyhow::Result<()> { @@ -1444,17 +1456,21 @@ mod tests { let proc_1_client = proc_1.attach("client").unwrap(); let (proc_1_undeliverable_tx, mut _proc_1_undeliverable_rx) = proc_1_client.open_port(); - let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None); // Spawn two actors 'ping' and 'pong' where 'ping' runs on // 'world[0]' and 'pong' on 'world[1]' (that is, not on the // same proc). let ping_handle = proc_0 - .spawn::("ping", ping_params) + .spawn( + "ping", + PingPongActor::new(Some(proc_0_undeliverable_tx.bind()), None, None), + ) .await .unwrap(); - let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None); let pong_handle = proc_1 - .spawn::("pong", pong_params) + .spawn( + "pong", + PingPongActor::new(Some(proc_1_undeliverable_tx.bind()), None, None), + ) .await .unwrap(); @@ -1571,14 +1587,18 @@ mod tests { // Spawn two actors 'ping' and 'pong' where 'ping' runs on // 'world[0]' and 'pong' on 'world[1]' (that is, not on the // same proc). - let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None); let ping_handle = proc_0 - .spawn::("ping", ping_params) + .spawn( + "ping", + PingPongActor::new(Some(proc_0_undeliverable_tx.bind()), None, None), + ) .await .unwrap(); - let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None); let pong_handle = proc_1 - .spawn::("pong", pong_params) + .spawn( + "pong", + PingPongActor::new(Some(proc_1_undeliverable_tx.bind()), None, None), + ) .await .unwrap(); @@ -1733,12 +1753,11 @@ mod tests { .await .unwrap(); let (undeliverable_msg_tx, _) = cx.mailbox().open_port(); - let params = PingPongActorParams::new(Some(undeliverable_msg_tx.bind()), None); let actor_ref = spawn::( cx, &bootstrap.proc_actor.bind(), &actor_id.to_string(), - ¶ms, + &(Some(undeliverable_msg_tx.bind()), None, None), ) .await .unwrap(); diff --git a/hyperactor_multiprocess/src/system_actor.rs b/hyperactor_multiprocess/src/system_actor.rs index 2712a3950..c790042aa 100644 --- a/hyperactor_multiprocess/src/system_actor.rs +++ b/hyperactor_multiprocess/src/system_actor.rs @@ -35,6 +35,7 @@ use hyperactor::PortHandle; use hyperactor::PortRef; use hyperactor::ProcId; use hyperactor::RefClient; +use hyperactor::RemoteSpawn; use hyperactor::WorldId; use hyperactor::actor::Handler; use hyperactor::channel::ChannelAddr; @@ -1135,6 +1136,17 @@ pub static SYSTEM_ACTOR_REF: LazyLock> = LazyLock::new(|| ActorRef::attest(id!(system[0].root))); impl SystemActor { + fn new(params: SystemActorParams) -> Self { + let supervision_update_timeout = params.supervision_update_timeout.clone(); + Self { + params, + supervision_state: SystemSupervisionState::new(supervision_update_timeout), + worlds: HashMap::new(), + worlds_to_stop: HashMap::new(), + shutting_down: false, + } + } + /// Adds a new world that's awaiting creation to the worlds. fn add_new_world(&mut self, world_id: WorldId) -> Result<(), anyhow::Error> { let world_state = WorldState { @@ -1180,7 +1192,7 @@ impl SystemActor { clock, ); let actor_handle = system_proc - .spawn::(SYSTEM_ACTOR_ID.name(), params) + .spawn(SYSTEM_ACTOR_ID.name(), SystemActor::new(params)) .await?; Ok((actor_handle, system_proc)) @@ -1200,19 +1212,6 @@ impl SystemActor { #[async_trait] impl Actor for SystemActor { - type Params = SystemActorParams; - - async fn new(params: SystemActorParams) -> Result { - let supervision_update_timeout = params.supervision_update_timeout.clone(); - Ok(Self { - params, - supervision_state: SystemSupervisionState::new(supervision_update_timeout), - worlds: HashMap::new(), - worlds_to_stop: HashMap::new(), - shutting_down: false, - }) - } - async fn init(&mut self, cx: &Instance) -> Result<(), anyhow::Error> { // Start to periodically check the unhealthy worlds. cx.self_message_with_delay(MaintainWorldHealth {}, Duration::from_secs(0))?; @@ -1859,7 +1858,6 @@ mod tests { use hyperactor::mailbox::PortHandle; use hyperactor::mailbox::PortReceiver; use hyperactor::simnet; - use hyperactor::test_utils::pingpong::PingPongActorParams; use super::*; use crate::System; @@ -2291,14 +2289,18 @@ mod tests { // Spawn two actors 'ping' and 'pong' where 'ping' runs on // 'world[0]' and 'pong' on 'world[1]' (that is, not on the // same proc). - let ping_params = PingPongActorParams::new(Some(proc_0_undeliverable_tx.bind()), None); let ping_handle = proc_0 - .spawn::("ping", ping_params) + .spawn( + "ping", + PingPongActor::new(Some(proc_0_undeliverable_tx.bind()), None, None), + ) .await .unwrap(); - let pong_params = PingPongActorParams::new(Some(proc_1_undeliverable_tx.bind()), None); let pong_handle = proc_1 - .spawn::("pong", pong_params) + .spawn( + "pong", + PingPongActor::new(Some(proc_1_undeliverable_tx.bind()), None, None), + ) .await .unwrap(); diff --git a/monarch_hyperactor/src/actor.rs b/monarch_hyperactor/src/actor.rs index dfd25603a..20215a7a3 100644 --- a/monarch_hyperactor/src/actor.rs +++ b/monarch_hyperactor/src/actor.rs @@ -22,6 +22,7 @@ use hyperactor::Named; use hyperactor::OncePortHandle; use hyperactor::PortHandle; use hyperactor::ProcId; +use hyperactor::RemoteSpawn; use hyperactor::actor::ActorError; use hyperactor::actor::ActorErrorKind; use hyperactor::actor::ActorStatus; @@ -556,25 +557,6 @@ fn update_undeliverable_envelope_for_casting( #[async_trait] impl Actor for PythonActor { - type Params = PickledPyObject; - - async fn new(actor_type: PickledPyObject) -> Result { - Ok(Python::with_gil(|py| -> Result { - let unpickled = actor_type.unpickle(py)?; - let class_type: &Bound<'_, PyType> = unpickled.downcast()?; - let actor: PyObject = class_type.call0()?.into_py_any(py)?; - - // Only create per-actor TaskLocals if not using shared runtime - let task_locals = (!hyperactor::config::global::get(SHARED_ASYNCIO_RUNTIME)) - .then(|| Python::allow_threads(py, create_task_locals)); - Ok(Self { - actor, - task_locals, - instance: None, - }) - })?) - } - async fn cleanup( &mut self, this: &Instance, @@ -709,6 +691,28 @@ impl Actor for PythonActor { } } +#[async_trait] +impl RemoteSpawn for PythonActor { + type Params = PickledPyObject; + + async fn new(actor_type: PickledPyObject) -> Result { + Ok(Python::with_gil(|py| -> Result { + let unpickled = actor_type.unpickle(py)?; + let class_type: &Bound<'_, PyType> = unpickled.downcast()?; + let actor: PyObject = class_type.call0()?.into_py_any(py)?; + + // Only create per-actor TaskLocals if not using shared runtime + let task_locals = (!hyperactor::config::global::get(SHARED_ASYNCIO_RUNTIME)) + .then(|| Python::allow_threads(py, create_task_locals)); + Ok(Self { + actor, + task_locals, + instance: None, + }) + })?) + } +} + /// Create a new TaskLocals with its own asyncio event loop in a dedicated thread. fn create_task_locals() -> pyo3_async_runtimes::TaskLocals { Python::with_gil(|py| { diff --git a/monarch_hyperactor/src/code_sync/auto_reload.rs b/monarch_hyperactor/src/code_sync/auto_reload.rs index bc8e34536..dceb66781 100644 --- a/monarch_hyperactor/src/code_sync/auto_reload.rs +++ b/monarch_hyperactor/src/code_sync/auto_reload.rs @@ -15,6 +15,7 @@ use hyperactor::Context; use hyperactor::Handler; use hyperactor::Named; use hyperactor::PortRef; +use hyperactor::RemoteSpawn; use monarch_types::SerializablePyErr; use pyo3::prelude::*; use serde::Deserialize; @@ -37,11 +38,19 @@ pub struct AutoReloadActor { state: Result<(Arc, PyObject), SerializablePyErr>, } +impl Actor for AutoReloadActor {} + #[async_trait] -impl Actor for AutoReloadActor { +impl RemoteSpawn for AutoReloadActor { type Params = AutoReloadParams; async fn new(Self::Params {}: Self::Params) -> Result { + AutoReloadActor::new().await + } +} + +impl AutoReloadActor { + pub(crate) async fn new() -> Result { Ok(Self { state: tokio::task::spawn_blocking(move || { Python::with_gil(|py| { @@ -51,9 +60,7 @@ impl Actor for AutoReloadActor { .await?, }) } -} -impl AutoReloadActor { fn create_state(py: Python) -> PyResult<(Arc, PyObject)> { // Import the Python AutoReloader class let auto_reload_module = py.import("monarch._src.actor.code_sync.auto_reload")?; diff --git a/monarch_hyperactor/src/code_sync/conda_sync.rs b/monarch_hyperactor/src/code_sync/conda_sync.rs index 079e3163f..19de5f458 100644 --- a/monarch_hyperactor/src/code_sync/conda_sync.rs +++ b/monarch_hyperactor/src/code_sync/conda_sync.rs @@ -19,6 +19,7 @@ use hyperactor::Bind; use hyperactor::Handler; use hyperactor::Named; use hyperactor::PortRef; +use hyperactor::RemoteSpawn; use hyperactor::Unbind; use hyperactor_mesh::actor_mesh::ActorMesh; use hyperactor_mesh::connect::Connect; @@ -61,12 +62,14 @@ pub struct CondaSyncMessage { #[derive(Debug, Named, Serialize, Deserialize)] pub struct CondaSyncParams {} -#[derive(Debug)] +#[derive(Debug, Default)] #[hyperactor::export(spawn = true, handlers = [CondaSyncMessage { cast = true }])] pub struct CondaSyncActor {} +impl Actor for CondaSyncActor {} + #[async_trait] -impl Actor for CondaSyncActor { +impl RemoteSpawn for CondaSyncActor { type Params = CondaSyncParams; async fn new(CondaSyncParams {}: Self::Params) -> Result { diff --git a/monarch_hyperactor/src/code_sync/manager.rs b/monarch_hyperactor/src/code_sync/manager.rs index bcad94185..4f14a3d62 100644 --- a/monarch_hyperactor/src/code_sync/manager.rs +++ b/monarch_hyperactor/src/code_sync/manager.rs @@ -30,6 +30,7 @@ use hyperactor::Context; use hyperactor::Handler; use hyperactor::Named; use hyperactor::PortRef; +use hyperactor::RemoteSpawn; use hyperactor::Unbind; use hyperactor::context; use hyperactor::forward; @@ -62,15 +63,12 @@ use tokio::net::TcpStream; use crate::code_sync::WorkspaceLocation; use crate::code_sync::auto_reload::AutoReloadActor; use crate::code_sync::auto_reload::AutoReloadMessage; -use crate::code_sync::auto_reload::AutoReloadParams; use crate::code_sync::conda_sync::CondaSyncActor; use crate::code_sync::conda_sync::CondaSyncMessage; -use crate::code_sync::conda_sync::CondaSyncParams; use crate::code_sync::conda_sync::CondaSyncResult; use crate::code_sync::rsync::RsyncActor; use crate::code_sync::rsync::RsyncDaemon; use crate::code_sync::rsync::RsyncMessage; -use crate::code_sync::rsync::RsyncParams; use crate::code_sync::rsync::RsyncResult; #[derive(Clone, Serialize, Deserialize, Debug)] @@ -212,8 +210,10 @@ pub struct CodeSyncManager { rank: once_cell::sync::OnceCell, } +impl Actor for CodeSyncManager {} + #[async_trait] -impl Actor for CodeSyncManager { +impl RemoteSpawn for CodeSyncManager { type Params = CodeSyncManagerParams; async fn new(CodeSyncManagerParams {}: Self::Params) -> Result { @@ -233,7 +233,7 @@ impl CodeSyncManager { cx: &Context<'a, Self>, ) -> Result<&'a ActorHandle> { self.rsync - .get_or_try_init(RsyncActor::spawn(cx, RsyncParams {})) + .get_or_try_init(RsyncActor::default().spawn(cx)) .await } @@ -242,7 +242,7 @@ impl CodeSyncManager { cx: &Context<'a, Self>, ) -> Result<&'a ActorHandle> { self.auto_reload - .get_or_try_init(AutoReloadActor::spawn(cx, AutoReloadParams {})) + .get_or_try_init(async move { AutoReloadActor::new().await?.spawn(cx).await }) .await } @@ -251,7 +251,7 @@ impl CodeSyncManager { cx: &Context<'a, Self>, ) -> Result<&'a ActorHandle> { self.conda_sync - .get_or_try_init(CondaSyncActor::spawn(cx, CondaSyncParams {})) + .get_or_try_init(CondaSyncActor::default().spawn(cx)) .await } } diff --git a/monarch_hyperactor/src/code_sync/rsync.rs b/monarch_hyperactor/src/code_sync/rsync.rs index 93f8c27e4..b1c304236 100644 --- a/monarch_hyperactor/src/code_sync/rsync.rs +++ b/monarch_hyperactor/src/code_sync/rsync.rs @@ -28,6 +28,7 @@ use hyperactor::Bind; use hyperactor::Handler; use hyperactor::Named; use hyperactor::PortRef; +use hyperactor::RemoteSpawn; use hyperactor::Unbind; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; @@ -341,14 +342,16 @@ pub struct RsyncParams { //pub workspace: WorkspaceLocation, } -#[derive(Debug)] +#[derive(Debug, Default)] #[hyperactor::export(spawn = true, handlers = [RsyncMessage { cast = true }])] pub struct RsyncActor { //workspace: WorkspaceLocation, } +impl Actor for RsyncActor {} + #[async_trait] -impl Actor for RsyncActor { +impl RemoteSpawn for RsyncActor { type Params = RsyncParams; async fn new(RsyncParams {}: Self::Params) -> Result { diff --git a/monarch_hyperactor/src/local_state_broker.rs b/monarch_hyperactor/src/local_state_broker.rs index 6225a0fc9..cdb0368ce 100644 --- a/monarch_hyperactor/src/local_state_broker.rs +++ b/monarch_hyperactor/src/local_state_broker.rs @@ -16,6 +16,7 @@ use hyperactor::ActorRef; use hyperactor::Context; use hyperactor::Handler; use hyperactor::OncePortHandle; +use hyperactor::RemoteSpawn; use pyo3::prelude::*; #[derive(Debug)] @@ -30,13 +31,24 @@ pub enum LocalStateBrokerMessage { Get(usize, OncePortHandle), } -#[derive(Debug, Default, Actor)] +#[derive(Debug, Default)] #[hyperactor::export(spawn = true)] pub struct LocalStateBrokerActor { states: HashMap, ports: HashMap>, } +impl Actor for LocalStateBrokerActor {} + +#[async_trait] +impl RemoteSpawn for LocalStateBrokerActor { + type Params = (); + + async fn new(_: ()) -> anyhow::Result { + Ok(Default::default()) + } +} + #[async_trait] impl Handler for LocalStateBrokerActor { async fn handle( diff --git a/monarch_hyperactor/src/logging.rs b/monarch_hyperactor/src/logging.rs index 8b99c09b8..5a9dd795c 100644 --- a/monarch_hyperactor/src/logging.rs +++ b/monarch_hyperactor/src/logging.rs @@ -19,6 +19,7 @@ use hyperactor::HandleClient; use hyperactor::Handler; use hyperactor::Named; use hyperactor::RefClient; +use hyperactor::RemoteSpawn; use hyperactor::Unbind; use monarch_types::SerializablePyErr; use pyo3::prelude::*; @@ -63,9 +64,10 @@ impl LoggerRuntimeActor { Ok(()) } } +impl Actor for LoggerRuntimeActor {} #[async_trait] -impl Actor for LoggerRuntimeActor { +impl RemoteSpawn for LoggerRuntimeActor { type Params = (); async fn new(_: ()) -> Result { diff --git a/monarch_hyperactor/src/proc.rs b/monarch_hyperactor/src/proc.rs index 86bac31b7..cc2cb2f32 100644 --- a/monarch_hyperactor/src/proc.rs +++ b/monarch_hyperactor/src/proc.rs @@ -26,6 +26,7 @@ use std::time::SystemTime; use anyhow::Result; use hyperactor::ActorRef; use hyperactor::RemoteMessage; +use hyperactor::RemoteSpawn; use hyperactor::actor::Signal; use hyperactor::channel; use hyperactor::channel::ChannelAddr; @@ -61,6 +62,7 @@ use pyo3::types::PyType; use tokio::sync::OnceCell; use tokio::sync::watch; +use crate::actor::PythonActor; use crate::actor::PythonActorHandle; use crate::mailbox::PyMailbox; use crate::runtime::get_tokio_runtime; @@ -146,7 +148,10 @@ impl PyProc { crate::runtime::future_into_py(py, async move { Ok(PythonActorHandle { inner: proc - .spawn(name.as_deref().unwrap_or("anon"), pickled_type) + .spawn( + name.as_deref().unwrap_or("anon"), + PythonActor::new(pickled_type).await?, + ) .await?, }) }) @@ -163,8 +168,11 @@ impl PyProc { let pickled_type = PickledPyObject::pickle(actor.as_any())?; Ok(PythonActorHandle { inner: signal_safe_block_on(py, async move { - proc.spawn(name.as_deref().unwrap_or("anon"), pickled_type) - .await + proc.spawn( + name.as_deref().unwrap_or("anon"), + PythonActor::new(pickled_type).await?, + ) + .await }) .map_err(|e| PyRuntimeError::new_err(e.to_string()))??, }) diff --git a/monarch_hyperactor/src/proc_mesh.rs b/monarch_hyperactor/src/proc_mesh.rs index 938495f62..97da5ec11 100644 --- a/monarch_hyperactor/src/proc_mesh.rs +++ b/monarch_hyperactor/src/proc_mesh.rs @@ -12,9 +12,8 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use hyperactor::Actor; -use hyperactor::RemoteMessage; +use hyperactor::RemoteSpawn; use hyperactor::WorldId; -use hyperactor::actor::Referable; use hyperactor::context; use hyperactor::context::Mailbox as _; use hyperactor::proc::Instance; @@ -89,15 +88,12 @@ impl From for TrackedProcMesh { } impl TrackedProcMesh { - pub async fn spawn( + pub async fn spawn( &self, cx: &impl context::Actor, actor_name: &str, params: &A::Params, - ) -> Result>, anyhow::Error> - where - A::Params: RemoteMessage, - { + ) -> Result>, anyhow::Error> { let mesh = self.cell.borrow()?; let actor = mesh.spawn(cx, actor_name, params).await?; Ok(self.children.insert(actor)) diff --git a/monarch_hyperactor/src/v1/actor_mesh.rs b/monarch_hyperactor/src/v1/actor_mesh.rs index 84bbf37da..8c2857a12 100644 --- a/monarch_hyperactor/src/v1/actor_mesh.rs +++ b/monarch_hyperactor/src/v1/actor_mesh.rs @@ -18,7 +18,7 @@ use hyperactor::RemoteMessage; use hyperactor::actor::ActorErrorKind; use hyperactor::actor::ActorStatus; use hyperactor::actor::Referable; -use hyperactor::actor::RemotableActor; +use hyperactor::actor::RemoteSpawn; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; use hyperactor::context; @@ -509,7 +509,7 @@ async fn actor_states_monitor( canceled: CancellationToken, supervision_display_name: String, ) where - A: Actor + RemotableActor + Referable, + A: Actor + RemoteSpawn + Referable, A::Params: RemoteMessage, F: Fn(MeshFailure), { diff --git a/monarch_hyperactor/src/v1/logging.rs b/monarch_hyperactor/src/v1/logging.rs index 747fe7c66..56661a602 100644 --- a/monarch_hyperactor/src/v1/logging.rs +++ b/monarch_hyperactor/src/v1/logging.rs @@ -204,7 +204,10 @@ impl LoggingMeshClient { instance_dispatch!(instance, async move |cx_instance| { cx_instance .proc() - .spawn(&Name::new("log_client").to_string(), ()) + .spawn( + &Name::new("log_client").to_string(), + LogClientActor::default(), + ) .await })?; let client_actor_ref = client_actor.bind(); From 00539b92df1ac42e5fef015d2ad54d296e1274ec Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Thu, 20 Nov 2025 19:02:43 -0800 Subject: [PATCH 2/2] Update on "[hyperactor] remove `new` from actor" We shouldn't mandate how an actor is actually instantiated. Rather, 'spawn' should just take over ownership of an `Actor` object directly, and run its loop from there. This helps to separate concerns and simplify the implementation. In this change: 1) We remote `new` from `Actor`, and only require it in `RemoteSpawn` (renamed from `RemotableActor`), in order to enable remote spawning 2) Change the spawn APIs to take ownership over an actor object 3) Remote the `derive(Actor)` macro, this can now be implemented with a simple `impl Actor for Foo{}` marker trait when no additonal behavior needs to be customized. 4) Simplify a bunch of actor construction, esp. in tests, where we like to just use simple objects. Using this, in the next change, we will make `spawn` fully synchronous. This leaves the #[export] attribute macro somewhat schizophrenic: `spawn = true` now only registers the actor in the remote registry. We should think about how to simplify this, too. Differential Revision: [D87575629](https://our.internmc.facebook.com/intern/diff/D87575629/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D87575629/)! [ghstack-poisoned] --- hyperactor/src/actor.rs | 11 +++++ hyperactor/src/host.rs | 8 ---- hyperactor/src/proc.rs | 42 +++++++++---------- hyperactor_mesh/src/actor_mesh.rs | 9 ---- hyperactor_mesh/src/alloc.rs | 9 ---- hyperactor_mesh/src/comm.rs | 13 ------ hyperactor_mesh/src/logging.rs | 10 ----- hyperactor_mesh/src/test_utils.rs | 9 ---- .../src/v1/host_mesh/mesh_agent.rs | 5 +-- hyperactor_mesh/src/v1/testactor.rs | 20 +-------- .../src/code_sync/conda_sync.rs | 9 ---- monarch_hyperactor/src/code_sync/rsync.rs | 19 +-------- monarch_hyperactor/src/local_state_broker.rs | 9 ---- monarch_hyperactor/src/proc_mesh.rs | 1 - 14 files changed, 35 insertions(+), 139 deletions(-) diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 4996ab538..0c03753f5 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -282,6 +282,17 @@ pub trait RemoteSpawn: Actor + Referable + Binds { } } +/// If an actor implements Default, we use this as the +/// `RemoteSpawn` implementation, too. +#[async_trait] +impl + Default> RemoteSpawn for A { + type Params = (); + + async fn new(_params: Self::Params) -> anyhow::Result { + Ok(Default::default()) + } +} + #[async_trait] impl Checkpointable for T where diff --git a/hyperactor/src/host.rs b/hyperactor/src/host.rs index e2ccf2c93..8e1c4766e 100644 --- a/hyperactor/src/host.rs +++ b/hyperactor/src/host.rs @@ -1192,14 +1192,6 @@ pub mod testing { impl Actor for EchoActor {} - #[async_trait] - impl RemoteSpawn for EchoActor { - type Params = (); - async fn new(_: ()) -> anyhow::Result { - Ok(Self) - } - } - #[async_trait] impl Handler> for EchoActor { async fn handle( diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index a9cf2f5c1..93e6cf6e5 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -2198,10 +2198,12 @@ mod tests { } } - #[derive(Debug, Default, Actor)] + #[derive(Debug, Default)] #[export] struct TestActor; + impl Actor for TestActor {} + #[derive(Handler, HandleClient, Debug)] enum TestActorMessage { Reply(oneshot::Sender<()>), @@ -2290,10 +2292,7 @@ mod tests { #[async_timed_test(timeout_secs = 30)] async fn test_spawn_actor() { let proc = Proc::local(); - let handle = proc - .spawn::("test", TestActor::default()) - .await - .unwrap(); + let handle = proc.spawn("test", TestActor::default()).await.unwrap(); // Check on the join handle. assert!(logs_contain( @@ -2335,7 +2334,7 @@ mod tests { .unwrap(); handle.drain_and_stop().unwrap(); - handle; + handle.await; assert_matches!(*state.borrow(), ActorStatus::Stopped); } @@ -2359,6 +2358,7 @@ mod tests { } #[derive(Debug, Default)] + #[export] struct LookupTestActor; impl Actor for LookupTestActor {} @@ -2389,7 +2389,7 @@ mod tests { .spawn::("target", TestActor::default()) .await .unwrap(); - let target_actor_ref = actor::ActorHandle::bind(); + let target_actor_ref = target_actor.bind(); let lookup_actor = proc .spawn::("lookup", LookupTestActor::default()) .await @@ -2421,7 +2421,7 @@ mod tests { ); target_actor.drain_and_stop().unwrap(); - target_actor; + target_actor.await; assert!( !lookup_actor @@ -2501,20 +2501,20 @@ mod tests { // Once each actor is stopped, it should have no linked children. let third_cell = third.cell().clone(); third.drain_and_stop().unwrap(); - third; + third.await; assert!(third_cell.inner.children.is_empty()); drop(third_cell); validate_link(second.cell(), first.cell()); let second_cell = second.cell().clone(); second.drain_and_stop().unwrap(); - second; + second.await; assert!(second_cell.inner.children.is_empty()); drop(second_cell); let first_cell = first.cell().clone(); first.drain_and_stop().unwrap(); - first; + first.await; assert!(first_cell.inner.children.is_empty()); } @@ -2531,11 +2531,11 @@ mod tests { let root_2_1 = TestActor::spawn_child(&root_2).await; root.drain_and_stop().unwrap(); - root; + root.await; for actor in [root_1, root_2, root_2_1] { assert!(actor.send(TestActorMessage::Noop()).is_err()); - assert_matches!(actor, ActorStatus::Stopped); + assert_matches!(actor.await, ActorStatus::Stopped); } } @@ -2561,7 +2561,7 @@ mod tests { .unwrap(); let _root_2_actor_id = root_2.actor_id().clone(); assert_matches!( - root_2, + root_2.await, ActorStatus::Failed(err) if err.to_string() == "some random failure" ); @@ -2569,11 +2569,11 @@ mod tests { // stopped by a parent failure? // Currently the parent fails with an error related to the child's failure. assert_matches!( - root, + root.await, ActorStatus::Failed(err) if err.to_string().contains("some random failure") ); - assert_eq!(root_2_1, ActorStatus::Stopped); - assert_eq!(root_1, ActorStatus::Stopped); + assert_eq!(root_2_1.await, ActorStatus::Stopped); + assert_eq!(root_1.await, ActorStatus::Stopped); } #[async_timed_test(timeout_secs = 30)] @@ -2627,7 +2627,7 @@ mod tests { // Stop the 2nd root. It should be excluded from the snapshot after it // is stopped. another_root.drain_and_stop().unwrap(); - another_root; + another_root.await; { let snapshot = proc.state().ledger.snapshot(); assert_eq!( @@ -2765,7 +2765,7 @@ mod tests { // Stop root_1. This should remove it, and its child, from snapshot. root_1.drain_and_stop().unwrap(); - root_1; + root_1.await; { let snapshot = proc.state().ledger.snapshot(); assert_eq!( @@ -2799,7 +2799,7 @@ mod tests { // Finally stop root. No roots should be left in snapshot. root.drain_and_stop().unwrap(); - root; + root.await; { let snapshot = proc.state().ledger.snapshot(); assert_eq!(snapshot.roots, hashmap! {}); @@ -2876,7 +2876,7 @@ mod tests { .panic(&client, "some random failure".to_string()) .await .unwrap(); - let actor_status = actor_handle; + let actor_status = actor_handle.await; // Note: even when the test passes, the panic stacktrace will still be // printed to stderr because that is the behavior controlled by the panic diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index 9e5b6bdee..eaef8e7ba 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -653,15 +653,6 @@ pub(crate) mod test_util { impl Actor for TestActor {} - #[async_trait] - impl RemoteSpawn for TestActor { - type Params = (); - - async fn new(_: ()) -> anyhow::Result { - Ok(Self) - } - } - /// Request message to retrieve the actor's rank. /// /// The `bool` in the tuple controls the outcome of the handler: diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index c0b10d1b5..c597cce5d 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -700,15 +700,6 @@ pub mod test_utils { impl Actor for TestActor {} - #[async_trait] - impl RemoteSpawn for TestActor { - type Params = (); - - async fn new(_: ()) -> anyhow::Result { - Ok(Self) - } - } - #[derive(Debug, Serialize, Deserialize, Named, Clone)] pub struct Wait; diff --git a/hyperactor_mesh/src/comm.rs b/hyperactor_mesh/src/comm.rs index b1660b559..18c0b15f9 100644 --- a/hyperactor_mesh/src/comm.rs +++ b/hyperactor_mesh/src/comm.rs @@ -232,19 +232,6 @@ impl Actor for CommActor { } } -#[async_trait] -impl hyperactor::RemoteSpawn for CommActor { - type Params = CommActorParams; - - async fn new(_params: Self::Params) -> Result { - Ok(Self { - send_seq: HashMap::new(), - recv_state: HashMap::new(), - mode: Default::default(), - }) - } -} - impl CommActor { /// Forward the message to the comm actor on the given peer rank. fn forward( diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index fa8e3a2b4..e1865df35 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -1246,16 +1246,6 @@ impl LogClientActor { #[async_trait] impl Actor for LogClientActor {} -#[async_trait] -impl hyperactor::RemoteSpawn for LogClientActor { - /// The aggregation window in seconds. - type Params = (); - - async fn new(_: ()) -> Result { - Ok(LogClientActor::default()) - } -} - impl Drop for LogClientActor { fn drop(&mut self) { // Flush the remaining logs before shutting down diff --git a/hyperactor_mesh/src/test_utils.rs b/hyperactor_mesh/src/test_utils.rs index 930ded4b8..78a30db82 100644 --- a/hyperactor_mesh/src/test_utils.rs +++ b/hyperactor_mesh/src/test_utils.rs @@ -30,15 +30,6 @@ pub struct EmptyMessage(); )] pub struct EmptyActor(); -#[async_trait] -impl RemoteSpawn for EmptyActor { - type Params = (); - - async fn new(_: ()) -> anyhow::Result { - Ok(EmptyActor::default()) - } -} - impl Actor for EmptyActor {} #[async_trait] diff --git a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs index 0e5ac388d..d316b9369 100644 --- a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs @@ -474,10 +474,7 @@ impl hyperactor::RemoteSpawn for HostMeshAgentProcMeshTrampoline { } let system_proc = host.system_proc().clone(); - let actor = HostMeshAgent { - host: Some(host), - created: HashMap::new(), - }; + let actor = HostMeshAgent::new(host); let host_mesh_agent = system_proc.spawn::("agent", actor).await?; diff --git a/hyperactor_mesh/src/v1/testactor.rs b/hyperactor_mesh/src/v1/testactor.rs index 5f6baf095..512fcac81 100644 --- a/hyperactor_mesh/src/v1/testactor.rs +++ b/hyperactor_mesh/src/v1/testactor.rs @@ -70,15 +70,6 @@ pub struct TestActor; impl Actor for TestActor {} -#[async_trait] -impl RemoteSpawn for TestActor { - type Params = (); - - async fn new(_params: Self::Params) -> Result { - Ok(Self) - } -} - /// A message that returns the recipient actor's id. #[derive(Debug, Clone, Named, Bind, Unbind, Serialize, Deserialize)] pub struct GetActorId(#[binding(include)] pub PortRef); @@ -154,15 +145,6 @@ impl Actor for TestActorWithSupervisionHandling { } } -#[async_trait] -impl RemoteSpawn for TestActorWithSupervisionHandling { - type Params = (); - - async fn new(_params: Self::Params) -> Result { - Ok(Self) - } -} - #[async_trait] impl Handler for TestActorWithSupervisionHandling { async fn handle( @@ -234,7 +216,7 @@ impl Handler for TestActor { } } -#[derive(Default, Debug)] +#[derive(Debug)] #[hyperactor::export(spawn = true)] pub struct FailingCreateTestActor; diff --git a/monarch_hyperactor/src/code_sync/conda_sync.rs b/monarch_hyperactor/src/code_sync/conda_sync.rs index 19de5f458..517f27716 100644 --- a/monarch_hyperactor/src/code_sync/conda_sync.rs +++ b/monarch_hyperactor/src/code_sync/conda_sync.rs @@ -68,15 +68,6 @@ pub struct CondaSyncActor {} impl Actor for CondaSyncActor {} -#[async_trait] -impl RemoteSpawn for CondaSyncActor { - type Params = CondaSyncParams; - - async fn new(CondaSyncParams {}: Self::Params) -> Result { - Ok(Self {}) - } -} - #[async_trait] impl Handler for CondaSyncActor { async fn handle( diff --git a/monarch_hyperactor/src/code_sync/rsync.rs b/monarch_hyperactor/src/code_sync/rsync.rs index b1c304236..eb36f9a74 100644 --- a/monarch_hyperactor/src/code_sync/rsync.rs +++ b/monarch_hyperactor/src/code_sync/rsync.rs @@ -337,11 +337,6 @@ pub struct RsyncMessage { pub workspace: WorkspaceLocation, } -#[derive(Debug, Named, Serialize, Deserialize)] -pub struct RsyncParams { - //pub workspace: WorkspaceLocation, -} - #[derive(Debug, Default)] #[hyperactor::export(spawn = true, handlers = [RsyncMessage { cast = true }])] pub struct RsyncActor { @@ -350,15 +345,6 @@ pub struct RsyncActor { impl Actor for RsyncActor {} -#[async_trait] -impl RemoteSpawn for RsyncActor { - type Params = RsyncParams; - - async fn new(RsyncParams {}: Self::Params) -> Result { - Ok(Self {}) - } -} - #[async_trait] impl Handler for RsyncActor { async fn handle( @@ -518,16 +504,13 @@ mod tests { let proc_mesh = ProcMesh::allocate(alloc).await?; - // Create RsyncParams - all actors will use the same target workspace for this test - let params = RsyncParams {}; - // TODO: thread through context, or access the actual python context; // for now this is basically equivalent (arguably better) to using the proc mesh client. let instance = global_root_client(); // Spawn actor mesh with RsyncActors let actor_mesh = proc_mesh - .spawn::(&instance, "rsync_test", ¶ms) + .spawn::(&instance, "rsync_test", &()) .await?; // Test rsync_mesh function - this coordinates rsync operations across the mesh diff --git a/monarch_hyperactor/src/local_state_broker.rs b/monarch_hyperactor/src/local_state_broker.rs index cdb0368ce..c0b37fb99 100644 --- a/monarch_hyperactor/src/local_state_broker.rs +++ b/monarch_hyperactor/src/local_state_broker.rs @@ -40,15 +40,6 @@ pub struct LocalStateBrokerActor { impl Actor for LocalStateBrokerActor {} -#[async_trait] -impl RemoteSpawn for LocalStateBrokerActor { - type Params = (); - - async fn new(_: ()) -> anyhow::Result { - Ok(Default::default()) - } -} - #[async_trait] impl Handler for LocalStateBrokerActor { async fn handle( diff --git a/monarch_hyperactor/src/proc_mesh.rs b/monarch_hyperactor/src/proc_mesh.rs index 97da5ec11..5198ee2c9 100644 --- a/monarch_hyperactor/src/proc_mesh.rs +++ b/monarch_hyperactor/src/proc_mesh.rs @@ -11,7 +11,6 @@ use std::fmt::Display; use std::sync::Arc; use std::sync::atomic::AtomicBool; -use hyperactor::Actor; use hyperactor::RemoteSpawn; use hyperactor::WorldId; use hyperactor::context;