Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime)!: make actors abortable from init #279

Merged
merged 5 commits into from Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/api/mod.rs
Expand Up @@ -107,7 +107,7 @@ impl Actor for ApiWorker {
.with_graceful_shutdown(shutdown_signal(receiver))
.await;
// If the Axum server shuts down, we should also shutdown the API actor
api_handle.shutdown();
api_handle.shutdown().await;
res
});
self.server_handle = Some((join_handle, sender));
Expand Down
40 changes: 16 additions & 24 deletions bin/inx-chronicle/src/launcher.rs
Expand Up @@ -28,15 +28,9 @@ pub enum LauncherError {
/// Supervisor actor
pub struct Launcher;

pub struct LauncherState {
config: ChronicleConfig,
#[allow(dead_code)]
db: MongoDb,
}

#[async_trait]
impl Actor for Launcher {
type State = LauncherState;
type State = ChronicleConfig;
type Error = LauncherError;

async fn init(&mut self, cx: &mut ActorContext<Self>) -> Result<Self::State, Self::Error> {
Expand Down Expand Up @@ -70,7 +64,7 @@ impl Actor for Launcher {
cx.spawn_child(super::metrics::MetricsWorker::new(&db, &config.metrics))
.await;

Ok(LauncherState { config, db })
Ok(config)
}

fn name(&self) -> std::borrow::Cow<'static, str> {
Expand All @@ -85,25 +79,15 @@ impl HandleEvent<Report<super::stardust_inx::InxWorker>> for Launcher {
&mut self,
cx: &mut ActorContext<Self>,
event: Report<super::stardust_inx::InxWorker>,
LauncherState { config, db }: &mut Self::State,
config: &mut Self::State,
) -> Result<(), Self::Error> {
use chronicle::runtime::SpawnActor;

use super::stardust_inx::InxError;
match event {
Report::Success(_) => {
cx.abort().await;
}
Report::Error(report) => match report.error {
ActorError::Result(e) => match e {
InxError::ConnectionError => {
log::warn!(
"INX connection failed. Retrying in {}s.",
config.inx.connection_retry_interval.as_secs()
);
let inx_worker = super::stardust_inx::InxWorker::new(db, &config.inx);
cx.delay(SpawnActor::new(inx_worker), config.inx.connection_retry_interval)?;
}
InxError::MongoDb(e) => match e.kind.as_ref() {
// Only a few possible errors we could potentially recover from
mongodb::error::ErrorKind::Io(_)
Expand All @@ -116,9 +100,17 @@ impl HandleEvent<Report<super::stardust_inx::InxWorker>> for Launcher {
cx.abort().await;
}
},
InxError::Read(_) => {
cx.spawn_child(report.actor).await;
}
InxError::Read(e) => match e.code() {
inx::tonic::Code::DeadlineExceeded
| inx::tonic::Code::ResourceExhausted
| inx::tonic::Code::Aborted
| inx::tonic::Code::Unavailable => {
cx.spawn_child(report.actor).await;
}
_ => {
cx.abort().await;
}
},
_ => {
cx.abort().await;
}
Expand All @@ -139,7 +131,7 @@ impl HandleEvent<Report<super::api::ApiWorker>> for Launcher {
&mut self,
cx: &mut ActorContext<Self>,
event: Report<super::api::ApiWorker>,
LauncherState { config, .. }: &mut Self::State,
config: &mut Self::State,
) -> Result<(), Self::Error> {
match event {
Report::Success(_) => {
Expand All @@ -166,7 +158,7 @@ impl HandleEvent<Report<super::metrics::MetricsWorker>> for Launcher {
&mut self,
cx: &mut ActorContext<Self>,
event: Report<super::metrics::MetricsWorker>,
LauncherState { config, .. }: &mut Self::State,
config: &mut Self::State,
) -> Result<(), Self::Error> {
match event {
Report::Success(_) => {
Expand Down
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/metrics.rs
Expand Up @@ -115,7 +115,7 @@ impl Actor for MetricsWorker {
};

// Stop the actor if the server stops.
metrics_handle.shutdown();
metrics_handle.shutdown().await;

res.unwrap()
})
Expand Down
7 changes: 5 additions & 2 deletions bin/inx-chronicle/src/stardust_inx/config.rs
Expand Up @@ -6,8 +6,7 @@ use std::time::Duration;
use chronicle::types::tangle::MilestoneIndex;
use serde::{Deserialize, Serialize};

/// A builder to establish a connection to INX.
#[must_use]
DaughterOfMars marked this conversation as resolved.
Show resolved Hide resolved
/// Configuration for an INX connection.
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct InxConfig {
Expand All @@ -16,6 +15,9 @@ pub struct InxConfig {
/// The time that has to pass until a new connection attempt is made.
#[serde(with = "humantime_serde")]
pub connection_retry_interval: Duration,
/// The number of retries when connecting fails.
pub connection_retry_count: usize,
/// The milestone at which synchronization should begin.
pub sync_start_milestone: MilestoneIndex,
}

Expand All @@ -24,6 +26,7 @@ impl Default for InxConfig {
Self {
connect_url: "http://localhost:9029".into(),
connection_retry_interval: Duration::from_secs(5),
connection_retry_count: 5,
sync_start_milestone: 1.into(),
}
}
Expand Down
17 changes: 14 additions & 3 deletions bin/inx-chronicle/src/stardust_inx/mod.rs
Expand Up @@ -46,9 +46,20 @@ impl InxWorker {
return Err(InxError::InvalidAddress(inx_config.connect_url.clone()));
}

InxClient::connect(inx_config.connect_url.clone())
.await
.map_err(|_| InxError::ConnectionError)
for i in 0..inx_config.connection_retry_count {
match InxClient::connect(inx_config.connect_url.clone()).await {
Ok(inx_client) => return Ok(inx_client),
Err(_) => {
log::warn!(
"INX connection failed. Retrying in {}s. {} retries remaining.",
inx_config.connection_retry_interval.as_secs(),
inx_config.connection_retry_count - i
);
tokio::time::sleep(inx_config.connection_retry_interval).await;
}
}
}
Err(InxError::ConnectionError)
}

async fn spawn_syncer(
Expand Down
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/stardust_inx/syncer.rs
Expand Up @@ -120,7 +120,7 @@ impl HandleEvent<SyncNext> for Syncer {
.await;
} else {
log::info!("Successfully finished synchronization with node.");
cx.shutdown();
cx.shutdown().await;
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/actor/addr.rs
Expand Up @@ -41,8 +41,8 @@ impl<A: Actor> Addr<A> {
}

/// Shuts down the actor. Use with care!
pub fn shutdown(&self) {
self.scope.shutdown();
pub async fn shutdown(&self) {
self.scope.shutdown().await;
}

/// Aborts the actor. Use with care!
Expand Down
9 changes: 8 additions & 1 deletion src/runtime/actor/context.rs
Expand Up @@ -21,7 +21,11 @@ use super::{
Actor,
};
use crate::runtime::{
config::SpawnConfig, error::RuntimeError, scope::RuntimeScope, shutdown::ShutdownStream, Sender, Task, TaskReport,
config::SpawnConfig,
error::RuntimeError,
scope::RuntimeScope,
shutdown::{ShutdownHandle, ShutdownStream},
Sender, Task, TaskReport,
};

type Receiver<A> = ShutdownStream<EnvelopeStream<A>>;
Expand Down Expand Up @@ -95,10 +99,13 @@ impl<A: Actor> ActorContext<A> {
actor: &mut A,
actor_state: &mut Option<A::State>,
abort_reg: AbortRegistration,
shutdown_handle: ShutdownHandle,
) -> Result<Result<Result<(), A::Error>, Box<dyn Any + Send>>, Aborted> {
let res = Abortable::new(
AssertUnwindSafe(async {
let mut state = actor.init(self).await?;
// Set the shutdown handle before starting the event loop.
self.scope.0.set_shutdown_handle(shutdown_handle).await;
// Call handle events until shutdown
let res = actor.run(self, &mut state).await;
let res = actor.shutdown(self, &mut state, res).await;
Expand Down
28 changes: 14 additions & 14 deletions src/runtime/registry.rs
Expand Up @@ -45,8 +45,8 @@ pub(crate) struct Scope {
pub(crate) struct ScopeInner {
pub(crate) id: ScopeId,
address_registry: RwLock<AddressRegistry>,
shutdown_handle: Option<ShutdownHandle>,
abort_handle: Option<AbortHandle>,
shutdown_handle: RwLock<Option<ShutdownHandle>>,
abort_handle: AbortHandle,
parent: Option<Scope>,
children: RwLock<HashMap<ScopeId, Scope>>,
}
Expand All @@ -58,7 +58,7 @@ impl Scope {
id: ROOT_SCOPE,
address_registry: Default::default(),
shutdown_handle: Default::default(),
abort_handle: Some(abort_handle),
abort_handle,
parent: None,
children: Default::default(),
}),
Expand All @@ -68,19 +68,15 @@ impl Scope {
}
}

pub(crate) async fn child(
&self,
shutdown_handle: Option<ShutdownHandle>,
abort_handle: Option<AbortHandle>,
) -> Self {
pub(crate) async fn child(&self, abort_handle: AbortHandle) -> Self {
log::trace!("Adding child to {:x}", self.id.as_fields().0);
let id = Uuid::new_v4();
let parent = self.clone();
let child = Scope {
inner: Arc::new(ScopeInner {
id,
address_registry: Default::default(),
shutdown_handle,
shutdown_handle: Default::default(),
abort_handle,
parent: Some(parent),
children: Default::default(),
Expand All @@ -94,6 +90,10 @@ impl Scope {
child
}

pub(crate) async fn set_shutdown_handle(&self, handle: ShutdownHandle) {
self.inner.shutdown_handle.write().await.replace(handle);
}

/// Finds a scope by id.
pub(crate) fn find(&self, id: ScopeId) -> Option<&Scope> {
if id == self.id {
Expand Down Expand Up @@ -136,13 +136,13 @@ impl Scope {
log::trace!("Dropped scope {:x}", self.id.as_fields().0);
}

pub(crate) fn shutdown(&self) {
pub(crate) async fn shutdown(&self) {
log::trace!("Shutting down scope {:x}", self.id.as_fields().0);
self.valid.store(false, Ordering::Release);
if let Some(handle) = self.shutdown_handle.as_ref() {
if let Some(handle) = self.shutdown_handle.read().await.as_ref() {
handle.shutdown();
} else if let Some(abort) = self.abort_handle.as_ref() {
abort.abort();
} else {
self.abort_handle.abort();
}
log::trace!("Shut down scope {:x}", self.id.as_fields().0);
}
Expand All @@ -155,7 +155,7 @@ impl Scope {
for child_scope in children {
child_scope.abort().await;
}
self.shutdown();
self.shutdown().await;
log::trace!("Aborted scope {:x}", self.id.as_fields().0);
}

Expand Down
30 changes: 13 additions & 17 deletions src/runtime/scope.rs
Expand Up @@ -87,8 +87,8 @@ impl ScopeView {
}

/// Shuts down the scope.
pub fn shutdown(&self) {
self.0.shutdown();
pub async fn shutdown(&self) {
self.0.shutdown().await;
}

/// Aborts the tasks in this runtime's scope. This will shutdown tasks that have
Expand All @@ -115,13 +115,9 @@ impl RuntimeScope {
}
}

pub(crate) async fn child(
&self,
shutdown_handle: Option<ShutdownHandle>,
abort_handle: Option<AbortHandle>,
) -> Self {
pub(crate) async fn child(&self, abort_handle: AbortHandle) -> Self {
Self {
scope: ScopeView(self.scope.0.child(shutdown_handle, abort_handle).await),
scope: ScopeView(self.scope.0.child(abort_handle).await),
join_handles: Default::default(),
}
}
Expand All @@ -140,7 +136,7 @@ impl RuntimeScope {
F: Future<Output = Result<O, Box<dyn Error + Send + Sync>>>,
{
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let mut child_scope = self.child(None, Some(abort_handle)).await;
let mut child_scope = self.child(abort_handle).await;
let res = Abortable::new(f(&mut child_scope), abort_registration).await;
if let Ok(Err(_)) = res {
child_scope.abort().await;
Expand Down Expand Up @@ -168,7 +164,7 @@ impl RuntimeScope {
stream,
add_to_registry,
}: SpawnConfigInner<A>,
) -> (Addr<A>, ActorContext<A>, AbortRegistration)
) -> (Addr<A>, ActorContext<A>, AbortRegistration, ShutdownHandle)
where
A: 'static + Actor,
{
Expand Down Expand Up @@ -208,14 +204,14 @@ impl RuntimeScope {
let (receiver, shutdown_handle) = ShutdownStream::new(Box::new(receiver) as _);
(receiver, shutdown_handle)
};
let scope = self.child(Some(shutdown_handle), Some(abort_handle)).await;
let scope = self.child(abort_handle).await;
let handle = Addr::new(scope.scope.clone(), sender);
if add_to_registry {
self.scope.0.insert_addr(handle.clone()).await;
}
let cx = ActorContext::new(scope, handle.clone(), receiver);
log::debug!("Initializing {}", actor.name());
(handle, cx, abort_reg)
(handle, cx, abort_reg, shutdown_handle)
}

/// Spawns a new, plain task.
Expand All @@ -225,7 +221,7 @@ impl RuntimeScope {
Sup: 'static + HandleEvent<TaskReport<T>>,
{
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let mut child_scope = self.child(None, Some(abort_handle.clone())).await;
let mut child_scope = self.child(abort_handle.clone()).await;
let child_task = spawn_task(task.name().as_ref(), async move {
let fut = task.run();
let res = Abortable::new(AssertUnwindSafe(fut).catch_unwind(), abort_registration).await;
Expand Down Expand Up @@ -269,10 +265,10 @@ impl RuntimeScope {
Cfg: Into<SpawnConfig<A>>,
{
let SpawnConfig { mut actor, config } = actor.into();
let (handle, mut cx, abort_reg) = self.common_spawn(&actor, config).await;
let (handle, mut cx, abort_reg, shutdown_handle) = self.common_spawn(&actor, config).await;
let child_task = spawn_task(actor.name().as_ref(), async move {
let mut data = None;
let res = cx.start(&mut actor, &mut data, abort_reg).await;
let res = cx.start(&mut actor, &mut data, abort_reg, shutdown_handle).await;
match res {
Ok(res) => match res {
Ok(res) => match res {
Expand Down Expand Up @@ -313,10 +309,10 @@ impl RuntimeScope {
Cfg: Into<SpawnConfig<A>>,
{
let SpawnConfig { mut actor, config } = actor.into();
let (handle, mut cx, abort_reg) = self.common_spawn(&actor, config).await;
let (handle, mut cx, abort_reg, shutdown_handle) = self.common_spawn(&actor, config).await;
let child_task = spawn_task(actor.name().as_ref(), async move {
let mut data = None;
let res = cx.start(&mut actor, &mut data, abort_reg).await;
let res = cx.start(&mut actor, &mut data, abort_reg, shutdown_handle).await;
match res {
Ok(res) => match res {
Ok(res) => match res {
Expand Down