Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 45 additions & 47 deletions hyperactor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::hash::Hash;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Weak;
use std::time::SystemTime;

use async_trait::async_trait;
Expand Down Expand Up @@ -54,7 +53,6 @@ use crate::proc::Instance;
use crate::proc::InstanceCell;
use crate::proc::Ports;
use crate::proc::Proc;
use crate::proc::WeakInstanceCell;
use crate::reference::ActorId;
use crate::reference::GangId;
use crate::reference::Index;
Expand Down Expand Up @@ -569,14 +567,6 @@ impl<A: Actor> ActorHandle<A> {
pub fn bind<R: Binds<A>>(&self) -> ActorRef<R> {
self.cell.bind(self.ports.as_ref())
}

/// Downgrade this ActorHandle to a weak reference.
pub fn downgrade(&self) -> WeakActorHandle<A> {
WeakActorHandle {
cell: self.cell.downgrade(),
ports: Arc::downgrade(&self.ports),
}
}
}

/// IntoFuture allows users to await the handle to join it. The future
Expand Down Expand Up @@ -615,43 +605,6 @@ impl<A: Actor> Clone for ActorHandle<A> {
}
}

/// A weak reference to an [`ActorHandle`]. This allows holding references to actors
/// without preventing them from being garbage collected when all strong references
/// are dropped.
#[derive(Debug)]
pub struct WeakActorHandle<A: Actor> {
cell: WeakInstanceCell,
ports: Weak<Ports<A>>,
}

impl<A: Actor> WeakActorHandle<A> {
/// Create a new weak actor handle that is never upgradeable.
pub fn new() -> Self {
Self {
cell: WeakInstanceCell::new(),
ports: Weak::new(),
}
}

/// Upgrade this weak actor handle to a strong reference, if possible.
/// Returns `Some(ActorHandle<A>)` if the actor is still alive, `None` otherwise.
pub fn upgrade(&self) -> Option<ActorHandle<A>> {
match (self.cell.upgrade(), self.ports.upgrade()) {
(Some(cell), Some(ports)) => Some(ActorHandle::new(cell, ports)),
_ => None,
}
}
}

impl<A: Actor> Clone for WeakActorHandle<A> {
fn clone(&self) -> Self {
Self {
cell: self.cell.clone(),
ports: self.ports.clone(),
}
}
}

/// RemoteActor is a marker trait for types that can be used as
/// remote actor references. All [`Actor`]s are thus referencable;
/// but other types may also implement this in order to separately
Expand Down Expand Up @@ -1046,4 +999,49 @@ mod tests {
test.sync().await;
assert_eq!(test.get_values(), (321u64, "bar".to_string()));
}

#[tokio::test]
async fn test_ref_alias() {
let test = MultiValuesTest::new().await;

test.send(123u64);
test.send("foo".to_string());

hyperactor::alias!(MyActorAlias, u64, String);

let myref: ActorRef<MyActorAlias> = test.handle.bind();
myref.port().send(&test.client, "biz".to_string()).unwrap();
myref.port().send(&test.client, 999u64).unwrap();

test.sync().await;
assert_eq!(test.get_values(), (999u64, "biz".to_string()));
}

#[tokio::test]
async fn test_actor_handle_downcast() {
#[derive(Debug)]
struct NothingActor;

#[async_trait]
impl Actor for NothingActor {
type Params = ();

async fn new(_: ()) -> Result<Self, anyhow::Error> {
Ok(Self)
}
}

// Just test that we can round-trip the handle through a downcast.

let proc = Proc::local();
let handle = proc.spawn::<NothingActor>("nothing", ()).await.unwrap();
let cell = handle.cell();

// Invalid actor doesn't succeed.
assert!(cell.downcast_handle::<EchoActor>().is_none());

let handle = cell.downcast_handle::<NothingActor>().unwrap();
handle.drain_and_stop().unwrap();
handle.await;
}
}
14 changes: 14 additions & 0 deletions hyperactor/src/cap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@ impl<T: sealed::CanSplitPort> CanSplitPort for T {}
pub trait CanSpawn: sealed::CanSpawn {}
impl<T: sealed::CanSpawn> CanSpawn for T {}

/// CanResolveActorRef is a capability that confers the ability to resolve
/// an ActorRef to a local ActorHandle if the actor is available locally.
pub trait CanResolveActorRef: sealed::CanResolveActorRef {}
impl<T: sealed::CanResolveActorRef> CanResolveActorRef for T {}

pub(crate) mod sealed {
use async_trait::async_trait;

use crate::ActorRef;
use crate::PortId;
use crate::accum::ReducerSpec;
use crate::actor::Actor;
use crate::actor::ActorHandle;
use crate::actor::RemoteActor;
use crate::attrs::Attrs;
use crate::data::Serialized;
use crate::mailbox::Mailbox;
Expand All @@ -60,4 +67,11 @@ pub(crate) mod sealed {
pub trait CanSpawn: Send + Sync {
async fn spawn<A: Actor>(&self, params: A::Params) -> anyhow::Result<ActorHandle<A>>;
}

pub trait CanResolveActorRef: Send + Sync {
fn resolve_actor_ref<A: RemoteActor + Actor>(
&self,
actor_ref: &ActorRef<A>,
) -> Option<ActorHandle<A>>;
}
}
Loading