Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 86 additions & 7 deletions concurrency/src/child_handle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::error::ExitReason;
use crate::link::{LinkTable, LinkedExitReason, SendExitFn, TrapExitFlag};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};

Expand Down Expand Up @@ -44,20 +45,27 @@ pub(crate) enum Completion {
// ---------------------------------------------------------------------------

/// Type-erased cancel function. Wraps the mode-specific cancellation token.
type CancelFn = Arc<dyn Fn() + Send + Sync>;
pub(crate) type CancelFn = Arc<dyn Fn() + Send + Sync>;

/// Type-erased handle to a running actor. Provides lifecycle operations
/// (stop, liveness check, exit reason) without knowing the actor's concrete type.
///
/// Obtained via `ChildHandle::from(actor_ref)`.
///
/// Unlike `ActorRef<A>`, a `ChildHandle` cannot send messages — it only
/// provides supervision-related operations (stop, wait, check liveness).
/// Unlike `ActorRef<A>`, a `ChildHandle` cannot send messages directly — it
/// only provides supervision-related operations (stop, wait, check liveness,
/// link/monitor).
#[derive(Clone)]
pub struct ChildHandle {
id: ActorId,
cancel: CancelFn,
completion: Completion,
// Link/trap_exit support — carried so `ctx.link()` can wire up signal
// propagation without knowing the underlying actor type.
trap_exit: TrapExitFlag,
links: LinkTable,
linked_reason: LinkedExitReason,
send_exit: SendExitFn,
}

impl std::fmt::Debug for ChildHandle {
Expand All @@ -70,31 +78,66 @@ impl std::fmt::Debug for ChildHandle {

impl ChildHandle {
/// Create a ChildHandle for tasks mode.
#[allow(clippy::too_many_arguments)]
pub(crate) fn from_tasks(
id: ActorId,
cancellation_token: spawned_rt::tasks::CancellationToken,
completion_rx: spawned_rt::tasks::watch::Receiver<Option<ExitReason>>,
trap_exit: TrapExitFlag,
links: LinkTable,
linked_reason: LinkedExitReason,
send_exit: SendExitFn,
) -> Self {
Self {
id,
cancel: Arc::new(move || cancellation_token.cancel()),
completion: Completion::Tasks(completion_rx),
trap_exit,
links,
linked_reason,
send_exit,
}
}

/// Create a ChildHandle for threads mode.
#[allow(clippy::too_many_arguments)]
pub(crate) fn from_threads(
id: ActorId,
cancellation_token: spawned_rt::threads::CancellationToken,
completion: Arc<(Mutex<Option<ExitReason>>, Condvar)>,
trap_exit: TrapExitFlag,
links: LinkTable,
linked_reason: LinkedExitReason,
send_exit: SendExitFn,
) -> Self {
Self {
id,
cancel: Arc::new(move || cancellation_token.cancel()),
completion: Completion::Threads(completion),
trap_exit,
links,
linked_reason,
send_exit,
}
}

/// Crate-internal accessors used by `ctx.link()` to wire up signal propagation.
pub(crate) fn trap_exit_flag(&self) -> &TrapExitFlag {
&self.trap_exit
}
pub(crate) fn links(&self) -> &LinkTable {
&self.links
}
pub(crate) fn linked_reason(&self) -> &LinkedExitReason {
&self.linked_reason
}
pub(crate) fn cancel_fn(&self) -> &CancelFn {
&self.cancel
}
pub(crate) fn send_exit_fn(&self) -> &SendExitFn {
&self.send_exit
}

/// The actor's unique identity.
pub fn id(&self) -> ActorId {
self.id
Expand Down Expand Up @@ -250,6 +293,7 @@ impl std::hash::Hash for ChildHandle {
#[cfg(test)]
mod tests {
use super::*;
use crate::link::{new_link_table, new_linked_exit_reason, new_trap_exit_flag};

// Shared tasks-mode fixture: an Idler actor that does nothing and never
// stops on its own. Used by multiple tests below.
Expand All @@ -259,6 +303,24 @@ mod tests {
impl Actor for Idler {}
}

/// Build a threads-mode ChildHandle for unit tests that exercise only the
/// ChildHandle surface (no real actor).
fn test_handle(
token: spawned_rt::threads::CancellationToken,
completion: Arc<(Mutex<Option<ExitReason>>, Condvar)>,
) -> ChildHandle {
let no_op_send_exit: SendExitFn = Arc::new(|_| Ok(()));
ChildHandle::from_threads(
ActorId::next(),
token,
completion,
new_trap_exit_flag(),
new_link_table(),
new_linked_exit_reason(),
no_op_send_exit,
)
}

#[test]
fn actor_id_is_unique() {
let a = ActorId::next();
Expand All @@ -277,7 +339,7 @@ mod tests {
fn child_handle_from_threads_basics() {
let completion = Arc::new((Mutex::new(None), Condvar::new()));
let token = spawned_rt::threads::CancellationToken::new();
let handle = ChildHandle::from_threads(ActorId::next(), token, completion.clone());
let handle = test_handle(token, completion.clone());

assert!(handle.is_alive());
assert!(handle.exit_reason().is_none());
Expand All @@ -299,7 +361,7 @@ mod tests {
let completion = Arc::new((Mutex::new(None), Condvar::new()));
let token = spawned_rt::threads::CancellationToken::new();
assert!(!token.is_cancelled());
let handle = ChildHandle::from_threads(ActorId::next(), token.clone(), completion);
let handle = test_handle(token.clone(), completion);
handle.stop();
assert!(token.is_cancelled());
}
Expand All @@ -309,8 +371,25 @@ mod tests {
let completion = Arc::new((Mutex::new(None), Condvar::new()));
let token = spawned_rt::threads::CancellationToken::new();
let id = ActorId::next();
let h1 = ChildHandle::from_threads(id, token.clone(), completion.clone());
let h2 = ChildHandle::from_threads(id, token, completion);
let no_op_send_exit: SendExitFn = Arc::new(|_| Ok(()));
let h1 = ChildHandle::from_threads(
id,
token.clone(),
completion.clone(),
new_trap_exit_flag(),
new_link_table(),
new_linked_exit_reason(),
no_op_send_exit.clone(),
);
let h2 = ChildHandle::from_threads(
id,
token,
completion,
new_trap_exit_flag(),
new_link_table(),
new_linked_exit_reason(),
no_op_send_exit,
);
assert_eq!(h1, h2);
}

Expand Down
3 changes: 3 additions & 0 deletions concurrency/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
//! - [`message`] — `Message` trait for defining message types
//! - [`child_handle`] — `ChildHandle` and `ActorId` for type-erased actor management
//! - [`monitor`] — `MonitorRef` and `Down` for unidirectional death observation
//! - [`link`] — `Exit` and bidirectional links with `trap_exit` semantics
//!
//! # Choosing `tasks` vs `threads`
//!
Expand All @@ -75,6 +76,7 @@

pub mod child_handle;
pub mod error;
pub mod link;
pub mod message;
pub mod monitor;
pub mod registry;
Expand All @@ -84,6 +86,7 @@ pub mod threads;

pub use child_handle::{ActorId, ChildHandle};
pub use error::{ActorError, ExitReason};
pub use link::Exit;
pub use monitor::{Down, MonitorRef};
pub use response::Response;
pub use spawned_macros::{actor, protocol};
Loading
Loading