Skip to content

Commit

Permalink
Add SpawnActor event wrapper for spawning actors (#88)
Browse files Browse the repository at this point in the history
* Add SpawnActor event wrapper for spawning actors, which can be used in combination with DelayEvent to spawn after a delay.

Co-authored-by: /alex/ <alexander.schmidt@iota.org>

Co-authored-by: /alex/ <alexander.schmidt@iota.org>
  • Loading branch information
DaughterOfMars and Alex6323 committed Apr 26, 2022
1 parent a79127c commit 01d5d8e
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 32 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/inx_listener.rs
Expand Up @@ -187,7 +187,7 @@ where
event: Result<E, Status>,
_state: &mut Self::State,
) -> Result<(), Self::Error> {
self.broker_addr.send(event?).map_err(RuntimeError::SendError)?;
self.broker_addr.send(event?)?;
Ok(())
}
}
Expand Down
19 changes: 6 additions & 13 deletions bin/inx-chronicle/src/main.rs
Expand Up @@ -18,19 +18,13 @@ use std::{error::Error, ops::Deref, time::Duration};
use api::ApiWorker;
use async_trait::async_trait;
use broker::{Broker, BrokerError};
use chronicle::runtime::actor::util::SpawnActor;
#[cfg(feature = "stardust")]
use chronicle::{
db::MongoDbError,
inx::InxError,
runtime::{
actor::{
addr::{Addr, SendError},
context::ActorContext,
error::ActorError,
event::HandleEvent,
report::Report,
Actor,
},
actor::{addr::Addr, context::ActorContext, error::ActorError, event::HandleEvent, report::Report, Actor},
error::RuntimeError,
scope::RuntimeScope,
Runtime,
Expand All @@ -53,8 +47,6 @@ pub enum LauncherError {
MongoDb(#[from] MongoDbError),
#[error(transparent)]
Runtime(#[from] RuntimeError),
#[error(transparent)]
Send(#[from] SendError),
}

#[derive(Debug)]
Expand Down Expand Up @@ -156,9 +148,10 @@ impl HandleEvent<Report<InxListener>> for Launcher {
InxError::ConnectionError(_) => {
let wait_interval = self.inx_connection_retry_interval;
log::info!("Retrying INX connection in {} seconds.", wait_interval.as_secs_f32());
tokio::time::sleep(wait_interval).await;
cx.spawn_actor_supervised(InxListener::new(config.inx.clone(), broker_addr.clone()))
.await;
cx.delay(
SpawnActor::new(InxListener::new(config.inx.clone(), broker_addr.clone())),
wait_interval,
)?;
}
InxError::InvalidAddress(_) => {
cx.shutdown();
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/actor/addr.rs
Expand Up @@ -8,7 +8,7 @@ use super::{
event::{DynEvent, Envelope},
Actor,
};
use crate::runtime::{registry::ScopeId, scope::ScopeView};
use crate::runtime::{error::RuntimeError, registry::ScopeId, scope::ScopeView};

/// Error sending a message to an actor
#[derive(Error, Debug)]
Expand Down Expand Up @@ -56,13 +56,13 @@ impl<A: Actor> Addr<A> {
}

/// Sends a message to the actor
pub fn send<E: 'static + DynEvent<A> + Send + Sync>(&self, event: E) -> Result<(), SendError>
pub fn send<E: 'static + DynEvent<A> + Send + Sync>(&self, event: E) -> Result<(), RuntimeError>
where
Self: Sized,
{
self.sender
.send(Box::new(event))
.map_err(|_| "Failed to send event".into())
.map_err(|_| RuntimeError::SendError("Failed to send event".into()))
}

/// Returns whether the actor's event channel is closed.
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/actor/context.rs
Expand Up @@ -15,13 +15,13 @@ use futures::{
};

use super::{
addr::{Addr, SendError},
addr::Addr,
event::{DynEvent, EnvelopeStream, HandleEvent},
report::Report,
util::DelayedEvent,
Actor,
};
use crate::runtime::{config::SpawnConfig, scope::RuntimeScope, shutdown::ShutdownStream};
use crate::runtime::{config::SpawnConfig, error::RuntimeError, scope::RuntimeScope, shutdown::ShutdownStream};

type Receiver<A> = ShutdownStream<EnvelopeStream<A>>;

Expand Down Expand Up @@ -69,7 +69,7 @@ impl<A: Actor> ActorContext<A> {
&self,
event: E,
delay: impl Into<Option<Duration>>,
) -> Result<(), SendError>
) -> Result<(), RuntimeError>
where
A: 'static,
{
Expand Down
31 changes: 31 additions & 0 deletions src/runtime/actor/util.rs
Expand Up @@ -8,6 +8,7 @@ use async_trait::async_trait;
use super::{
context::ActorContext,
event::{DynEvent, HandleEvent},
report::Report,
Actor,
};

Expand Down Expand Up @@ -47,3 +48,33 @@ where
Ok(())
}
}

/// An event which will spawn a supervised actor.
#[derive(Debug)]
pub struct SpawnActor<A: Actor> {
actor: A,
}

impl<A: Actor> SpawnActor<A> {
/// Creates a new [`SpawnActor`] event.
pub fn new(actor: A) -> Self {
Self { actor }
}
}

#[async_trait]
impl<T, A> HandleEvent<SpawnActor<A>> for T
where
T: 'static + Actor + HandleEvent<Report<A>>,
A: 'static + Actor + Debug + Send + Sync,
{
async fn handle_event(
&mut self,
cx: &mut ActorContext<Self>,
event: SpawnActor<A>,
_state: &mut Self::State,
) -> Result<(), Self::Error> {
cx.spawn_actor_supervised(event.actor).await;
Ok(())
}
}

0 comments on commit 01d5d8e

Please sign in to comment.