Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for fully async actors #74

Closed
wants to merge 12 commits into from

Update tests to use new await

  • Loading branch information...
mrjoe7 committed Aug 28, 2019
commit be3df8be0145874c1fed42cbefca721069952ef9
@@ -1,6 +1,6 @@
[package]
name = "riker"
version = "0.3.2"
version = "0.4.0"
authors = ["Lee Smith <lee@riker.rs>"]
edition = "2018"
description = "Easily build fast, highly concurrent and resilient applications. An Actor Framework for Rust."
@@ -28,4 +28,5 @@ pin-utils = "0.1.0-alpha.4"


[dev-dependencies]
riker-testkit = "0.1.0"
riker-testkit = { git = "https://github.com/mrjoe7/riker-testkit.git" }
futures-timer = "0.3.0"
@@ -83,7 +83,7 @@ trait Actor: Send + 'static {
async fn post_start(&mut self, _ctx: &Context<Self::Msg>) {}

/// Invoked after an actor has been stopped.
fn post_stop(&mut self) {}
async fn post_stop(&mut self) {}

/// Invoked when an actor receives a message
///
@@ -1,5 +1,5 @@
#![allow(unused_variables)]

use async_trait::async_trait;
use crate::{
actor::{
actor_cell::Context,
@@ -9,6 +9,7 @@ use crate::{
Message,
};

#[async_trait]
pub trait Actor: Send + 'static {
type Msg: Message;

@@ -19,18 +20,18 @@ pub trait Actor: Send + 'static {
///
/// Panics in `pre_start` do not invoke the
/// supervision strategy and the actor will be terminated.
fn pre_start(&mut self, ctx: &Context<Self::Msg>) {}
async fn pre_start(&mut self, ctx: &Context<Self::Msg>) {}

/// Invoked after an actor has started.
///
/// Any post initialization can be performed here, such as writing
/// to a log file, emmitting metrics.
///
/// Panics in `post_start` follow the supervision strategy.
fn post_start(&mut self, ctx: &Context<Self::Msg>) {}
async fn post_start(&mut self, ctx: &Context<Self::Msg>) {}

/// Invoked after an actor has been stopped.
fn post_stop(&mut self) {}
async fn post_stop(&mut self) {}

/// Return a supervisor strategy that will be used when handling failed child actors.
fn supervisor_strategy(&self) -> Strategy {
@@ -41,45 +42,46 @@ pub trait Actor: Send + 'static {
///
/// It is guaranteed that only one message in the actor's mailbox is processed
/// at any one time, including `recv` and `sys_recv`.
fn sys_recv(&mut self, ctx: &Context<Self::Msg>, msg: SystemMsg, sender: Sender) {}
async fn sys_recv(&mut self, ctx: &Context<Self::Msg>, msg: SystemMsg, sender: Sender) {}

/// Invoked when an actor receives a message
///
/// It is guaranteed that only one message in the actor's mailbox is processed
/// at any one time, including `recv` and `sys_recv`.
fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender);
async fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Sender);
}

#[async_trait]
impl<A: Actor + ?Sized> Actor for Box<A> {
type Msg = A::Msg;

fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
(**self).pre_start(ctx);
async fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
(**self).pre_start(ctx).await;
}

fn post_start(&mut self, ctx: &Context<Self::Msg>) {
(**self).post_start(ctx)
async fn post_start(&mut self, ctx: &Context<Self::Msg>) {
(**self).post_start(ctx).await
}

fn post_stop(&mut self) {
(**self).post_stop()
async fn post_stop(&mut self) {
(**self).post_stop().await
}

fn sys_recv(
async fn sys_recv(
&mut self,
ctx: &Context<Self::Msg>,
msg: SystemMsg,
sender: Option<BasicActorRef>,
) {
(**self).sys_recv(ctx, msg, sender)
(**self).sys_recv(ctx, msg, sender).await
}

fn supervisor_strategy(&self) -> Strategy {
(**self).supervisor_strategy()
}

fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Option<BasicActorRef>) {
(**self).recv(ctx, msg, sender)
async fn recv(&mut self, ctx: &Context<Self::Msg>, msg: Self::Msg, sender: Option<BasicActorRef>) {
(**self).recv(ctx, msg, sender).await
}
}

@@ -91,7 +93,8 @@ impl<A: Actor + ?Sized> Actor for Box<A> {
/// # Examples
///
/// ```
/// # use riker::actors::*;
/// use riker::actors::*;
/// use async_trait::async_trait;
///
/// #[derive(Clone, Debug)]
/// struct Foo;
@@ -100,14 +103,15 @@ impl<A: Actor + ?Sized> Actor for Box<A> {
/// #[actor(Foo, Bar)] // <-- set our actor to receive Foo and Bar types
/// struct MyActor;
///
/// #[async_trait]
/// impl Actor for MyActor {
/// type Msg = MyActorMsg; // <-- MyActorMsg is provided for us
///
/// fn recv(&mut self,
/// async fn recv(&mut self,
/// ctx: &Context<Self::Msg>,
/// msg: Self::Msg,
/// sender: Sender) {
/// self.receive(ctx, msg, sender); // <-- call the respective implementation
/// self.receive(ctx, msg, sender).await; // <-- call the respective implementation
/// }
/// }
///
@@ -121,21 +125,23 @@ impl<A: Actor + ?Sized> Actor for Box<A> {
/// }
/// }
///
/// #[async_trait]
/// impl Receive<Foo> for MyActor {
/// type Msg = MyActorMsg;
///
/// fn receive(&mut self,
/// async fn receive(&mut self,
/// ctx: &Context<Self::Msg>,
/// msg: Foo, // <-- receive Foo
/// sender: Sender) {
/// println!("Received a Foo");
/// }
/// }
///
/// #[async_trait]
/// impl Receive<Bar> for MyActor {
/// type Msg = MyActorMsg;
///
/// fn receive(&mut self,
/// async fn receive(&mut self,
/// ctx: &Context<Self::Msg>,
/// msg: Bar, // <-- receive Bar
/// sender: Sender) {
@@ -150,14 +156,15 @@ impl<A: Actor + ?Sized> Actor for Box<A> {
/// actor.tell(Foo, None);
/// actor.tell(Bar, None);
/// ```
#[async_trait]
pub trait Receive<Msg: Message> {
type Msg: Message;

/// Invoked when an actor receives a message
///
/// It is guaranteed that only one message in the actor's mailbox is processed
/// at any one time, including `receive`, `other_receive` and `system_receive`.
fn receive(&mut self, ctx: &Context<Self::Msg>, msg: Msg, sender: Option<BasicActorRef>);
async fn receive(&mut self, ctx: &Context<Self::Msg>, msg: Msg, sender: Sender);
}

/// The actor trait object
@@ -476,7 +476,7 @@ fn post_stop<A: Actor>(actor: &mut Option<A>) {
/// Operations performed are in most cases done so from the
/// actor's perspective. For example, creating a child actor
/// using `ctx.actor_of` will create the child under the current
/// actor within the heirarchy. In a similar manner, persistence
/// actor within the hierarchy. In a similar manner, persistence
/// operations such as `persist_event` use the current actor's
/// persistence configuration.
///
@@ -515,8 +515,11 @@ impl<Msg: Message> ActorRefFactory for Context<Msg> {
.create_actor(props, name, &parent, &self.system).await
}

fn stop(&self, actor: impl ActorReference) {
actor.sys_tell(SystemCmd::Stop.into());
async fn stop<A>(&self, actor: A)
where
A: ActorReference
{
actor.sys_tell(SystemCmd::Stop.into()).await;
}
}

@@ -10,7 +10,8 @@ use crate::{
AnyMessage, Envelope, Message,
};

pub trait ActorReference {
#[async_trait]
pub trait ActorReference: Send + Sync {
/// Actor name.
///
/// Unique among siblings.
@@ -51,30 +52,33 @@ pub trait ActorReference {
fn children<'a>(&'a self) -> Box<dyn Iterator<Item = BasicActorRef> + 'a>;

/// Send a system message to this actor
fn sys_tell(&self, msg: SystemMsg);
async fn sys_tell(&self, msg: SystemMsg);
}

pub type BoxedTell<T> = Box<dyn Tell<T> + Send + 'static>;

#[async_trait]
pub trait Tell<T>: ActorReference + Send + 'static {
fn tell(&self, msg: T, sender: Option<BasicActorRef>);
async fn tell(&self, msg: T, sender: Sender);
fn box_clone(&self) -> BoxedTell<T>;
}

#[async_trait]
impl<T, M> Tell<T> for ActorRef<M>
where
T: Message + Into<M>,
M: Message,
{
fn tell(&self, msg: T, sender: Sender) {
self.send_msg(msg.into(), sender);
async fn tell(&self, msg: T, sender: Sender) {
self.send_msg(msg.into(), sender).await;
}

fn box_clone(&self) -> BoxedTell<T> {
Box::new((*self).clone())
}
}

#[async_trait]
impl<T> ActorReference for BoxedTell<T>
where
T: Message,
@@ -115,8 +119,8 @@ where
(**self).children()
}

fn sys_tell(&self, msg: SystemMsg) {
(**self).sys_tell(msg)
async fn sys_tell(&self, msg: SystemMsg) {
(**self).sys_tell(msg).await
}
}

@@ -201,6 +205,7 @@ impl BasicActorRef {
}
}

#[async_trait]
impl ActorReference for BasicActorRef {
fn name(&self) -> &str {
self.cell.uri().name.as_str()
@@ -238,12 +243,13 @@ impl ActorReference for BasicActorRef {
self.cell.children()
}

fn sys_tell(&self, msg: SystemMsg) {
async fn sys_tell(&self, msg: SystemMsg) {
let envelope = Envelope { msg, sender: None };
let _ = self.cell.send_sys_msg(envelope);
let _ = self.cell.send_sys_msg(envelope).await;
}
}

#[async_trait]
impl ActorReference for &BasicActorRef {
fn name(&self) -> &str {
self.cell.uri().name.as_str()
@@ -281,9 +287,9 @@ impl ActorReference for &BasicActorRef {
self.cell.children()
}

fn sys_tell(&self, msg: SystemMsg) {
async fn sys_tell(&self, msg: SystemMsg) {
let envelope = Envelope { msg, sender: None };
let _ = self.cell.send_sys_msg(envelope);
let _ = self.cell.send_sys_msg(envelope).await;
}
}

@@ -354,16 +360,17 @@ impl<Msg: Message> ActorRef<Msg> {
ActorRef { cell }
}

pub fn send_msg(&self, msg: Msg, sender: impl Into<Option<BasicActorRef>>) {
pub async fn send_msg(&self, msg: Msg, sender: impl Into<Option<BasicActorRef>>) {
let envelope = Envelope {
msg,
sender: sender.into(),
};
// consume the result (we don't return it to user)
let _ = self.cell.send_msg(envelope);
let _ = self.cell.send_msg(envelope).await;
}
}

#[async_trait]
impl<Msg: Message> ActorReference for ActorRef<Msg> {
fn name(&self) -> &str {
self.cell.uri().name.as_str()
@@ -401,12 +408,13 @@ impl<Msg: Message> ActorReference for ActorRef<Msg> {
self.cell.children()
}

fn sys_tell(&self, msg: SystemMsg) {
async fn sys_tell(&self, msg: SystemMsg) {
let envelope = Envelope { msg, sender: None };
let _ = self.cell.send_sys_msg(envelope);
let _ = self.cell.send_sys_msg(envelope).await;
}
}

#[async_trait]
impl<Msg: Message> ActorReference for &ActorRef<Msg> {
fn name(&self) -> &str {
self.cell.uri().name.as_str()
@@ -444,9 +452,9 @@ impl<Msg: Message> ActorReference for &ActorRef<Msg> {
self.cell.children()
}

fn sys_tell(&self, msg: SystemMsg) {
async fn sys_tell(&self, msg: SystemMsg) {
let envelope = Envelope { msg, sender: None };
let _ = self.cell.send_sys_msg(envelope);
let _ = self.cell.send_sys_msg(envelope).await;
}
}

@@ -484,7 +492,9 @@ pub trait ActorRefFactory {
where
A: Actor;

fn stop(&self, actor: impl ActorReference);
async fn stop<A>(&self, actor: A)
where
A: ActorReference;
}

/// Produces `ActorRef`s under the `temp` guardian actor.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.