Skip to content

Commit

Permalink
Adding handling if the actor fails during pre_start that it doesn't p…
Browse files Browse the repository at this point in the history
…oison the named and pid registries. (#243)

* Adding handling if the actor fails during pre_start that it doesn't poison the named and pid registries.

Resolves #240

* Expose job and worker options for custom routing and queuing implementations.

This allows people implementing their own traits for routing and queueing to have access to the necessary worker properties to route jobs, as well as necessary job properties.
  • Loading branch information
slawlor committed May 24, 2024
1 parent f43bd25 commit 6f04d98
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 15 deletions.
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.10.2"
version = "0.10.3"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
35 changes: 30 additions & 5 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,12 @@ where
startup_args: TActor::Arguments,
) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
let (actor, ports) = Self::new(name, handler)?;
actor.start(ports, startup_args, None).await
let aref = actor.actor_ref.clone();
let result = actor.start(ports, startup_args, None).await;
if result.is_err() {
aref.set_status(ActorStatus::Stopped);
}
result
}

/// Spawn an actor with a supervisor, automatically starting the actor
Expand All @@ -517,7 +522,12 @@ where
supervisor: ActorCell,
) -> Result<(ActorRef<TActor::Msg>, JoinHandle<()>), SpawnErr> {
let (actor, ports) = Self::new(name, handler)?;
actor.start(ports, startup_args, Some(supervisor)).await
let aref = actor.actor_ref.clone();
let result = actor.start(ports, startup_args, Some(supervisor)).await;
if result.is_err() {
aref.set_status(ActorStatus::Stopped);
}
result
}

/// Spawn an actor instantly, not waiting on the actor's `pre_start` routine. This is helpful
Expand Down Expand Up @@ -550,8 +560,13 @@ where
> {
let (actor, ports) = Self::new(name.clone(), handler)?;
let actor_ref = actor.actor_ref.clone();
let actor_ref2 = actor_ref.clone();
let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
let (_, handle) = actor.start(ports, startup_args, None).await?;
let result = actor.start(ports, startup_args, None).await;
if result.is_err() {
actor_ref2.set_status(ActorStatus::Stopped);
}
let (_, handle) = result?;
Ok(handle)
});
Ok((actor_ref, join_op))
Expand Down Expand Up @@ -592,8 +607,13 @@ where
> {
let (actor, ports) = Self::new(name.clone(), handler)?;
let actor_ref = actor.actor_ref.clone();
let actor_ref2 = actor_ref.clone();
let join_op = crate::concurrency::spawn_named(name.as_deref(), async move {
let (_, handle) = actor.start(ports, startup_args, Some(supervisor)).await?;
let result = actor.start(ports, startup_args, Some(supervisor)).await;
if result.is_err() {
actor_ref2.set_status(ActorStatus::Stopped);
}
let (_, handle) = result?;
Ok(handle)
});
Ok((actor_ref, join_op))
Expand Down Expand Up @@ -627,6 +647,7 @@ where
let (actor_cell, ports) = actor_cell::ActorCell::new_remote::<TActor>(name, id)?;
let id = actor_cell.get_id();
let name = actor_cell.get_name();
let actor_cell2 = actor_cell.clone();
let (actor, ports) = (
Self {
actor_ref: actor_cell.into(),
Expand All @@ -636,7 +657,11 @@ where
},
ports,
);
actor.start(ports, startup_args, Some(supervisor)).await
let result = actor.start(ports, startup_args, Some(supervisor)).await;
if result.is_err() {
actor_cell2.set_status(ActorStatus::Stopped);
}
result
}
}

Expand Down
44 changes: 44 additions & 0 deletions ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,3 +956,47 @@ fn returns_actor_references() {
assert_eq!(event.actor_id().is_some(), want);
}
}

/// https://github.com/slawlor/ractor/issues/240
#[crate::concurrency::test]
#[tracing_test::traced_test]
async fn actor_failing_in_spawn_err_doesnt_poison_registries() {
struct Test;

#[crate::async_trait]
impl Actor for Test {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(&self, _: ActorRef<Self::Msg>, _: ()) -> Result<(), ActorProcessingErr> {
Err("something".into())
}
}

struct Test2;

#[crate::async_trait]
impl Actor for Test2 {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(&self, _: ActorRef<Self::Msg>, _: ()) -> Result<(), ActorProcessingErr> {
Ok(())
}
}

let a = Actor::spawn(Some("test".to_owned()), Test, ()).await;
assert!(a.is_err());
drop(a);

let (a, h) = Actor::spawn(Some("test".to_owned()), Test2, ())
.await
.expect("Failed to spawn second actor with name clash");

// startup ok, we were able to reuse the name

a.stop(None);
h.await.unwrap();
}
3 changes: 2 additions & 1 deletion ractor/src/factory/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ where
TKey: JobKey,
TMsg: Message,
{
pub(crate) fn is_expired(&self) -> bool {
/// Determine if this job is expired
pub fn is_expired(&self) -> bool {
if let Some(ttl) = self.options.ttl {
self.options.submit_time.elapsed().unwrap() > ttl
} else {
Expand Down
17 changes: 11 additions & 6 deletions ractor/src/factory/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ where
self.actor.get_id() == pid
}

pub(crate) fn is_processing_key(&self, key: &TKey) -> bool {
/// Identifies if a worker is processing a specific job key
///
/// Returns true if the worker is currently processing the given key
pub fn is_processing_key(&self, key: &TKey) -> bool {
self.curr_jobs.contains_key(key)
}

Expand All @@ -219,16 +222,17 @@ where
Ok(())
}

pub(crate) fn is_available(&self) -> bool {
/// Identify if the worker is available for enqueueing work
pub fn is_available(&self) -> bool {
self.curr_jobs.is_empty()
}

pub(crate) fn is_working(&self) -> bool {
/// Identify if the worker is currently processing any requests
pub fn is_working(&self) -> bool {
!self.curr_jobs.is_empty()
}

/// Denotes if the worker is stuck (i.e. unable to complete it's current job)

pub(crate) fn is_stuck(&self, duration: Duration) -> bool {
if Instant::now() - self.stats.last_ping > duration {
let key_strings = self
Expand All @@ -244,8 +248,9 @@ where
}

/// Enqueue a new job to this worker. If the discard threshold has been exceeded
/// it will discard the oldest elements from the message queue
pub(crate) fn enqueue_job(
/// it will discard the oldest or newest elements from the message queue (based
/// on discard semantics)
pub fn enqueue_job(
&mut self,
mut job: Job<TKey, TMsg>,
) -> Result<(), MessagingErr<WorkerMessage<TKey, TMsg>>> {
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.10.2"
version = "0.10.3"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster_derive"
version = "0.10.2"
version = "0.10.3"
authors = ["Sean Lawlor <seanlawlor@fb.com>"]
description = "Derives for ractor_cluster"
license = "MIT"
Expand Down

0 comments on commit 6f04d98

Please sign in to comment.