From 956734ddb2c99d1123d180936d6df747ba71792d Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 14 May 2026 12:32:03 -0300 Subject: [PATCH 1/2] feat: add bidirectional links, trap_exit, and start_linked for actors --- concurrency/src/child_handle.rs | 93 +++++- concurrency/src/lib.rs | 3 + concurrency/src/link.rs | 215 +++++++++++++ concurrency/src/tasks/actor.rs | 501 ++++++++++++++++++++++++++++++- concurrency/src/threads/actor.rs | 287 +++++++++++++++++- examples/exit_reason/src/main.rs | 66 +++- 6 files changed, 1154 insertions(+), 11 deletions(-) create mode 100644 concurrency/src/link.rs diff --git a/concurrency/src/child_handle.rs b/concurrency/src/child_handle.rs index 6d4f820..d3a7f17 100644 --- a/concurrency/src/child_handle.rs +++ b/concurrency/src/child_handle.rs @@ -1,4 +1,5 @@ use crate::error::ExitReason; +use crate::link::{LinkTable, LinkedExitReason, SendExitFn, TrapExitFlag}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Condvar, Mutex}; @@ -44,20 +45,27 @@ pub(crate) enum Completion { // --------------------------------------------------------------------------- /// Type-erased cancel function. Wraps the mode-specific cancellation token. -type CancelFn = Arc; +pub(crate) type CancelFn = Arc; /// Type-erased handle to a running actor. Provides lifecycle operations /// (stop, liveness check, exit reason) without knowing the actor's concrete type. /// /// Obtained via `ChildHandle::from(actor_ref)`. /// -/// Unlike `ActorRef`, a `ChildHandle` cannot send messages — it only -/// provides supervision-related operations (stop, wait, check liveness). +/// Unlike `ActorRef`, a `ChildHandle` cannot send messages directly — it +/// only provides supervision-related operations (stop, wait, check liveness, +/// link/monitor). #[derive(Clone)] pub struct ChildHandle { id: ActorId, cancel: CancelFn, completion: Completion, + // Link/trap_exit support — carried so `ctx.link()` can wire up signal + // propagation without knowing the underlying actor type. + trap_exit: TrapExitFlag, + links: LinkTable, + linked_reason: LinkedExitReason, + send_exit: SendExitFn, } impl std::fmt::Debug for ChildHandle { @@ -70,31 +78,66 @@ impl std::fmt::Debug for ChildHandle { impl ChildHandle { /// Create a ChildHandle for tasks mode. + #[allow(clippy::too_many_arguments)] pub(crate) fn from_tasks( id: ActorId, cancellation_token: spawned_rt::tasks::CancellationToken, completion_rx: spawned_rt::tasks::watch::Receiver>, + trap_exit: TrapExitFlag, + links: LinkTable, + linked_reason: LinkedExitReason, + send_exit: SendExitFn, ) -> Self { Self { id, cancel: Arc::new(move || cancellation_token.cancel()), completion: Completion::Tasks(completion_rx), + trap_exit, + links, + linked_reason, + send_exit, } } /// Create a ChildHandle for threads mode. + #[allow(clippy::too_many_arguments)] pub(crate) fn from_threads( id: ActorId, cancellation_token: spawned_rt::threads::CancellationToken, completion: Arc<(Mutex>, Condvar)>, + trap_exit: TrapExitFlag, + links: LinkTable, + linked_reason: LinkedExitReason, + send_exit: SendExitFn, ) -> Self { Self { id, cancel: Arc::new(move || cancellation_token.cancel()), completion: Completion::Threads(completion), + trap_exit, + links, + linked_reason, + send_exit, } } + /// Crate-internal accessors used by `ctx.link()` to wire up signal propagation. + pub(crate) fn trap_exit_flag(&self) -> &TrapExitFlag { + &self.trap_exit + } + pub(crate) fn links(&self) -> &LinkTable { + &self.links + } + pub(crate) fn linked_reason(&self) -> &LinkedExitReason { + &self.linked_reason + } + pub(crate) fn cancel_fn(&self) -> &CancelFn { + &self.cancel + } + pub(crate) fn send_exit_fn(&self) -> &SendExitFn { + &self.send_exit + } + /// The actor's unique identity. pub fn id(&self) -> ActorId { self.id @@ -250,6 +293,7 @@ impl std::hash::Hash for ChildHandle { #[cfg(test)] mod tests { use super::*; + use crate::link::{new_link_table, new_linked_exit_reason, new_trap_exit_flag}; // Shared tasks-mode fixture: an Idler actor that does nothing and never // stops on its own. Used by multiple tests below. @@ -259,6 +303,24 @@ mod tests { impl Actor for Idler {} } + /// Build a threads-mode ChildHandle for unit tests that exercise only the + /// ChildHandle surface (no real actor). + fn test_handle( + token: spawned_rt::threads::CancellationToken, + completion: Arc<(Mutex>, Condvar)>, + ) -> ChildHandle { + let no_op_send_exit: SendExitFn = Arc::new(|_| Ok(())); + ChildHandle::from_threads( + ActorId::next(), + token, + completion, + new_trap_exit_flag(), + new_link_table(), + new_linked_exit_reason(), + no_op_send_exit, + ) + } + #[test] fn actor_id_is_unique() { let a = ActorId::next(); @@ -277,7 +339,7 @@ mod tests { fn child_handle_from_threads_basics() { let completion = Arc::new((Mutex::new(None), Condvar::new())); let token = spawned_rt::threads::CancellationToken::new(); - let handle = ChildHandle::from_threads(ActorId::next(), token, completion.clone()); + let handle = test_handle(token, completion.clone()); assert!(handle.is_alive()); assert!(handle.exit_reason().is_none()); @@ -299,7 +361,7 @@ mod tests { let completion = Arc::new((Mutex::new(None), Condvar::new())); let token = spawned_rt::threads::CancellationToken::new(); assert!(!token.is_cancelled()); - let handle = ChildHandle::from_threads(ActorId::next(), token.clone(), completion); + let handle = test_handle(token.clone(), completion); handle.stop(); assert!(token.is_cancelled()); } @@ -309,8 +371,25 @@ mod tests { let completion = Arc::new((Mutex::new(None), Condvar::new())); let token = spawned_rt::threads::CancellationToken::new(); let id = ActorId::next(); - let h1 = ChildHandle::from_threads(id, token.clone(), completion.clone()); - let h2 = ChildHandle::from_threads(id, token, completion); + let no_op_send_exit: SendExitFn = Arc::new(|_| Ok(())); + let h1 = ChildHandle::from_threads( + id, + token.clone(), + completion.clone(), + new_trap_exit_flag(), + new_link_table(), + new_linked_exit_reason(), + no_op_send_exit.clone(), + ); + let h2 = ChildHandle::from_threads( + id, + token, + completion, + new_trap_exit_flag(), + new_link_table(), + new_linked_exit_reason(), + no_op_send_exit, + ); assert_eq!(h1, h2); } diff --git a/concurrency/src/lib.rs b/concurrency/src/lib.rs index 4e49352..34731e5 100644 --- a/concurrency/src/lib.rs +++ b/concurrency/src/lib.rs @@ -59,6 +59,7 @@ //! - [`message`] — `Message` trait for defining message types //! - [`child_handle`] — `ChildHandle` and `ActorId` for type-erased actor management //! - [`monitor`] — `MonitorRef` and `Down` for unidirectional death observation +//! - [`link`] — `Exit` and bidirectional links with `trap_exit` semantics //! //! # Choosing `tasks` vs `threads` //! @@ -75,6 +76,7 @@ pub mod child_handle; pub mod error; +pub mod link; pub mod message; pub mod monitor; pub mod registry; @@ -84,6 +86,7 @@ pub mod threads; pub use child_handle::{ActorId, ChildHandle}; pub use error::{ActorError, ExitReason}; +pub use link::Exit; pub use monitor::{Down, MonitorRef}; pub use response::Response; pub use spawned_macros::{actor, protocol}; diff --git a/concurrency/src/link.rs b/concurrency/src/link.rs new file mode 100644 index 0000000..0c58fba --- /dev/null +++ b/concurrency/src/link.rs @@ -0,0 +1,215 @@ +use crate::child_handle::ActorId; +use crate::error::{ActorError, ExitReason}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; + +// --------------------------------------------------------------------------- +// Exit message +// --------------------------------------------------------------------------- + +/// Notification delivered to an actor (via `exit_received`) when a linked actor +/// stops, if the receiver has called [`Context::trap_exit(true)`]. +/// +/// Without trapping, a linked actor's death cancels the receiver's actor +/// instead of delivering this message. +/// +/// [`Context::trap_exit(true)`]: crate::tasks::actor::Context::trap_exit +#[derive(Debug, Clone, PartialEq)] +pub struct Exit { + /// The actor that died. + pub from: ActorId, + /// Why it stopped. + pub reason: ExitReason, +} + +// --------------------------------------------------------------------------- +// Internal types +// --------------------------------------------------------------------------- + +/// Type-erased function that delivers an `Exit` message to a linked actor's +/// mailbox. Captures the peer's typed sender at link time. +pub(crate) type SendExitFn = Arc Result<(), ActorError> + Send + Sync>; + +/// Per-actor flag controlling how exit signals from linked actors are handled. +/// `false` (default): the receiver is cancelled. `true`: the receiver gets an +/// `Exit` message via `Actor::exit_received`. +pub type TrapExitFlag = Arc; + +/// Per-actor slot holding the exit reason of a linked actor whose death +/// triggered cancellation. When a non-trapping actor is cancelled by a link +/// signal, this slot is set so the actor's own exit reason propagates +/// transitively through further links. +pub type LinkedExitReason = Arc>>; + +/// Create a new empty linked-exit-reason slot. +pub(crate) fn new_linked_exit_reason() -> LinkedExitReason { + Arc::new(Mutex::new(None)) +} + +/// An entry in an actor's link table, representing one linked peer. +pub(crate) struct LinkEntry { + /// The peer's unique actor ID. + pub peer_id: ActorId, + /// Delivers an exit signal to the peer. + pub signal: ExitSignalFn, + /// Reference to the peer's link table, so we can remove ourselves on death. + pub peer_links: LinkTable, +} + +/// Type-erased function that signals exit to a peer. Decides whether to +/// cancel the peer or send an `Exit` message based on the peer's `trap_exit` +/// flag and the exit reason. +pub(crate) type ExitSignalFn = Arc; + +/// An actor's link table — a list of linked peers. +pub(crate) type LinkTable = Arc>>; + +/// Create a new empty link table. +pub(crate) fn new_link_table() -> LinkTable { + Arc::new(Mutex::new(Vec::new())) +} + +/// Create a new trap_exit flag (default: false). +pub(crate) fn new_trap_exit_flag() -> TrapExitFlag { + Arc::new(AtomicBool::new(false)) +} + +/// Build an `ExitSignalFn` that signals a peer based on its trap_exit flag +/// and a typed-erased send-exit function. +pub(crate) fn make_signal( + peer_trap_exit: TrapExitFlag, + peer_cancel: Arc, + peer_send_exit: SendExitFn, + peer_linked_reason: LinkedExitReason, +) -> ExitSignalFn { + Arc::new(move |sender_id, reason| { + // Kill is untrappable — always cancel (with Kill as the linked reason) + if matches!(reason, ExitReason::Kill) { + let mut slot = peer_linked_reason.lock().unwrap_or_else(|p| p.into_inner()); + if slot.is_none() { + *slot = Some(ExitReason::Kill); + } + drop(slot); + peer_cancel(); + return; + } + // Normal exit signals are silently dropped unless the peer is trapping + let trapping = peer_trap_exit.load(Ordering::Acquire); + if matches!(reason, ExitReason::Normal) && !trapping { + return; + } + if trapping { + // Send Exit message to the peer's mailbox + let exit = Exit { + from: sender_id, + reason, + }; + let _ = peer_send_exit(exit); // mailbox may be closed if peer just died + } else { + // Peer is not trapping: record the linked reason then cancel + let mut slot = peer_linked_reason.lock().unwrap_or_else(|p| p.into_inner()); + if slot.is_none() { + *slot = Some(reason); + } + drop(slot); + peer_cancel(); + } + }) +} + +/// Register a bidirectional link between two actors. +/// +/// If a link already exists between this pair (by `ActorId`), this is a no-op. +pub(crate) fn register_link( + own_id: ActorId, + own_links: &LinkTable, + own_signal: ExitSignalFn, + peer_id: ActorId, + peer_links: &LinkTable, + peer_signal: ExitSignalFn, +) { + { + let mut table = own_links.lock().unwrap_or_else(|p| p.into_inner()); + if !table.iter().any(|e| e.peer_id == peer_id) { + table.push(LinkEntry { + peer_id, + signal: peer_signal, + peer_links: peer_links.clone(), + }); + } + } + { + let mut table = peer_links.lock().unwrap_or_else(|p| p.into_inner()); + if !table.iter().any(|e| e.peer_id == own_id) { + table.push(LinkEntry { + peer_id: own_id, + signal: own_signal, + peer_links: own_links.clone(), + }); + } + } +} + +/// Remove a bidirectional link between two actors. +pub(crate) fn unregister_link( + own_id: ActorId, + own_links: &LinkTable, + peer_id: ActorId, + peer_links: &LinkTable, +) { + { + let mut table = own_links.lock().unwrap_or_else(|p| p.into_inner()); + table.retain(|e| e.peer_id != peer_id); + } + { + let mut table = peer_links.lock().unwrap_or_else(|p| p.into_inner()); + table.retain(|e| e.peer_id != own_id); + } +} + +/// Propagate exit signals to all linked actors when an actor dies. +/// Drains the link table, signals each peer, and removes self from each +/// peer's table. +pub(crate) fn propagate_exit(own_id: ActorId, own_links: &LinkTable, reason: &ExitReason) { + let entries: Vec = { + let mut table = own_links.lock().unwrap_or_else(|p| p.into_inner()); + std::mem::take(&mut *table) + }; + + for entry in &entries { + // Remove ourselves from the peer's link table so they don't try to + // signal us back (we're dead). + if let Ok(mut peer_table) = entry.peer_links.lock() { + peer_table.retain(|e| e.peer_id != own_id); + } + // Deliver the exit signal to the peer. + (entry.signal)(own_id, reason.clone()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn exit_is_clone_and_eq() { + let e1 = Exit { + from: ActorId::next(), + reason: ExitReason::Normal, + }; + let e2 = e1.clone(); + assert_eq!(e1, e2); + } + + #[test] + fn empty_table_is_default() { + let table = new_link_table(); + assert!(table.lock().unwrap().is_empty()); + } + + #[test] + fn trap_exit_flag_defaults_to_false() { + let flag = new_trap_exit_flag(); + assert!(!flag.load(Ordering::Acquire)); + } +} diff --git a/concurrency/src/tasks/actor.rs b/concurrency/src/tasks/actor.rs index acdab02..c029d74 100644 --- a/concurrency/src/tasks/actor.rs +++ b/concurrency/src/tasks/actor.rs @@ -1,5 +1,9 @@ use crate::child_handle::{ActorId, ChildHandle}; use crate::error::{panic_message, ActorError, ExitReason}; +use crate::link::{ + self, new_link_table, new_linked_exit_reason, new_trap_exit_flag, Exit, LinkTable, + LinkedExitReason, SendExitFn, TrapExitFlag, +}; use crate::message::Message; use crate::monitor::{Down, MonitorRef}; use core::pin::pin; @@ -63,6 +67,16 @@ pub trait Actor: Send + Sized + 'static { fn stopped(&mut self, _ctx: &Context) -> impl Future + Send { async {} } + + /// Called when a linked actor stops, if this actor has called + /// `ctx.trap_exit(true)`. Default impl ignores the signal. + fn exit_received( + &mut self, + _exit: Exit, + _ctx: &Context, + ) -> impl Future + Send { + async {} + } } // --------------------------------------------------------------------------- @@ -113,6 +127,26 @@ where } } +/// System envelope that dispatches an `Exit` notification to the actor's +/// `exit_received` callback. Unlike `MessageEnvelope`, this does not require +/// `Handler` on the actor — every actor can receive `Exit` via the +/// trait's default-no-op `exit_received` callback. +struct ExitEnvelope { + exit: Exit, +} + +impl Envelope for ExitEnvelope { + fn handle<'a>( + self: Box, + actor: &'a mut A, + ctx: &'a Context, + ) -> Pin + Send + 'a>> { + Box::pin(async move { + actor.exit_received(self.exit, ctx).await; + }) + } +} + // --------------------------------------------------------------------------- // Context // --------------------------------------------------------------------------- @@ -127,6 +161,9 @@ pub struct Context { cancellation_token: CancellationToken, completion_rx: watch::Receiver>, monitors: MonitorTable, + trap_exit: TrapExitFlag, + links: LinkTable, + linked_reason: LinkedExitReason, } impl Clone for Context { @@ -137,6 +174,9 @@ impl Clone for Context { cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), monitors: self.monitors.clone(), + trap_exit: self.trap_exit.clone(), + links: self.links.clone(), + linked_reason: self.linked_reason.clone(), } } } @@ -157,6 +197,9 @@ impl Context { cancellation_token: actor_ref.cancellation_token.clone(), completion_rx: actor_ref.completion_rx.clone(), monitors: actor_ref.monitors.clone(), + trap_exit: actor_ref.trap_exit.clone(), + links: actor_ref.links.clone(), + linked_reason: actor_ref.linked_reason.clone(), } } @@ -243,6 +286,9 @@ impl Context { cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), monitors: self.monitors.clone(), + trap_exit: self.trap_exit.clone(), + links: self.links.clone(), + linked_reason: self.linked_reason.clone(), } } @@ -311,6 +357,86 @@ impl Context { } } + /// Set up a bidirectional link with another actor. + /// + /// When either side dies abnormally, the other receives an exit signal. + /// By default the receiver is terminated by the signal. Call + /// [`trap_exit(true)`] to convert signals into `Exit` messages delivered + /// via [`Actor::exit_received`] instead. + /// + /// If the target is already dead with an abnormal reason, the exit + /// signal is delivered immediately. Calling `link` twice on the same + /// peer is a no-op (links are idempotent). + /// + /// [`trap_exit(true)`]: Self::trap_exit + /// [`Actor::exit_received`]: Actor::exit_received + pub fn link(&self, target: &ChildHandle) { + let own_signal = link::make_signal( + self.trap_exit.clone(), + self.own_cancel_fn(), + self.own_send_exit_fn(), + self.linked_reason.clone(), + ); + let peer_signal = link::make_signal( + target.trap_exit_flag().clone(), + target.cancel_fn().clone(), + target.send_exit_fn().clone(), + target.linked_reason().clone(), + ); + link::register_link( + self.id, + &self.links, + own_signal, + target.id(), + target.links(), + peer_signal, + ); + + // If the target is already dead, deliver the signal immediately to us. + if let Some(reason) = target.exit_reason() { + let signal = link::make_signal( + self.trap_exit.clone(), + self.own_cancel_fn(), + self.own_send_exit_fn(), + self.linked_reason.clone(), + ); + signal(target.id(), reason); + } + } + + /// Remove a previously-set bidirectional link. + pub fn unlink(&self, target: &ChildHandle) { + link::unregister_link(self.id, &self.links, target.id(), target.links()); + } + + /// Control how exit signals from linked actors are handled. + /// + /// - `false` (default): an abnormal exit signal from a linked actor cancels + /// this actor (propagating the death). + /// - `true`: the signal is converted to an `Exit` message and delivered to + /// [`Actor::exit_received`]. + /// + /// `Kill` is untrappable — it cancels the actor regardless of this flag. + pub fn trap_exit(&self, enabled: bool) { + self.trap_exit.store(enabled, Ordering::Release); + } + + /// Build a type-erased cancel closure for this actor. + fn own_cancel_fn(&self) -> Arc { + let token = self.cancellation_token.clone(); + Arc::new(move || token.cancel()) + } + + /// Build a type-erased `SendExitFn` that enqueues an `ExitEnvelope` + /// onto this actor's mailbox. + fn own_send_exit_fn(&self) -> SendExitFn { + let sender = self.sender.clone(); + Arc::new(move |exit: Exit| { + let envelope: Box + Send> = Box::new(ExitEnvelope { exit }); + sender.send(envelope).map_err(|_| ActorError::ActorStopped) + }) + } + pub(crate) fn cancellation_token(&self) -> CancellationToken { self.cancellation_token.clone() } @@ -378,6 +504,9 @@ pub struct ActorRef { cancellation_token: CancellationToken, completion_rx: watch::Receiver>, monitors: MonitorTable, + trap_exit: TrapExitFlag, + links: LinkTable, + linked_reason: LinkedExitReason, } impl Debug for ActorRef { @@ -394,6 +523,9 @@ impl Clone for ActorRef { cancellation_token: self.cancellation_token.clone(), completion_rx: self.completion_rx.clone(), monitors: self.monitors.clone(), + trap_exit: self.trap_exit.clone(), + links: self.links.clone(), + linked_reason: self.linked_reason.clone(), } } } @@ -503,10 +635,21 @@ impl ActorRef { impl From> for ChildHandle { fn from(actor_ref: ActorRef) -> Self { + // Build a type-erased SendExitFn that enqueues an ExitEnvelope onto + // this actor's mailbox. + let sender = actor_ref.sender.clone(); + let send_exit: SendExitFn = Arc::new(move |exit: Exit| { + let envelope: Box + Send> = Box::new(ExitEnvelope { exit }); + sender.send(envelope).map_err(|_| ActorError::ActorStopped) + }); ChildHandle::from_tasks( actor_ref.id, actor_ref.cancellation_token, actor_ref.completion_rx, + actor_ref.trap_exit, + actor_ref.links, + actor_ref.linked_reason, + send_exit, ) } } @@ -536,6 +679,9 @@ impl ActorRef { let cancellation_token = CancellationToken::new(); let (completion_tx, completion_rx) = watch::channel(None); let monitors: MonitorTable = Arc::new(Mutex::new(HashMap::new())); + let trap_exit = new_trap_exit_flag(); + let links = new_link_table(); + let linked_reason = new_linked_exit_reason(); let actor_ref = ActorRef { id: ActorId::next(), @@ -543,6 +689,9 @@ impl ActorRef { cancellation_token: cancellation_token.clone(), completion_rx, monitors: monitors.clone(), + trap_exit: trap_exit.clone(), + links: links.clone(), + linked_reason: linked_reason.clone(), }; let ctx = Context { @@ -551,10 +700,29 @@ impl ActorRef { cancellation_token: cancellation_token.clone(), completion_rx: actor_ref.completion_rx.clone(), monitors, + trap_exit, + links: links.clone(), + linked_reason: linked_reason.clone(), }; + let actor_id = actor_ref.id; let inner_future = async move { - let reason = run_actor(actor, ctx, rx, cancellation_token).await; + let mut reason = run_actor(actor, ctx, rx, cancellation_token).await; + // If the actor was cancelled by a link signal (rather than + // ctx.stop()), use the linked reason as our exit reason so the + // death propagates transitively through further links. + if matches!(reason, ExitReason::Normal) { + if let Some(linked) = linked_reason + .lock() + .unwrap_or_else(|p| p.into_inner()) + .take() + { + reason = linked; + } + } + // Propagate exit signal to linked actors before publishing the + // completion signal. + link::propagate_exit(actor_id, &links, &reason); let _ = completion_tx.send(Some(reason)); }; @@ -653,6 +821,17 @@ pub trait ActorStart: Actor { fn start_with_backend(self, backend: Backend) -> ActorRef { ActorRef::spawn(self, backend) } + + /// Atomically start the actor and link it to the caller's context. + /// + /// The link is established before the new actor processes any messages, + /// closing the race window where a child could die before the parent's + /// `link()` call completes. + fn start_linked(self, parent_ctx: &Context

) -> ActorRef { + let actor_ref = self.start(); + parent_ctx.link(&actor_ref.child_handle()); + actor_ref + } } impl ActorStart for A {} @@ -1600,4 +1779,324 @@ mod tests { // If we got here without panicking, the test passes. }); } + + // --- Link tests --- + + /// Trapping actor that records `Exit` notifications. + struct TrapActor { + exits: Arc>>, + trap: bool, + } + + struct GetExits; + impl Message for GetExits { + type Result = Vec; + } + struct LinkTo(crate::ChildHandle); + impl Message for LinkTo { + type Result = (); + } + + impl Actor for TrapActor { + async fn started(&mut self, ctx: &Context) { + ctx.trap_exit(self.trap); + } + async fn exit_received(&mut self, exit: Exit, _ctx: &Context) { + self.exits.lock().unwrap().push(exit); + } + } + + impl Handler for TrapActor { + async fn handle(&mut self, msg: LinkTo, ctx: &Context) { + ctx.link(&msg.0); + } + } + + impl Handler for TrapActor { + async fn handle(&mut self, _msg: GetExits, _ctx: &Context) -> Vec { + self.exits.lock().unwrap().clone() + } + } + + fn make_trapper(trap: bool) -> ActorRef { + TrapActor { + exits: Arc::new(Mutex::new(Vec::new())), + trap, + } + .start() + } + + #[test] + pub fn link_propagates_panic_to_non_trapping_peer() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // Two non-trapping actors. When A panics, B should be terminated. + let a = make_trapper(false); + let b = make_trapper(false); + a.request(LinkTo(b.child_handle())).await.unwrap(); + + // Make A panic by dropping its only ActorRef while it's mid-handler... easier: + // make A panic via a panic message + struct Boom; + impl Message for Boom { + type Result = (); + } + impl Handler for TrapActor { + async fn handle(&mut self, _msg: Boom, _ctx: &Context) { + panic!("boom"); + } + } + + let _ = a.send(Boom); + // Both should die + let reason_a = a.wait_exit().await; + let reason_b = b.wait_exit().await; + assert!(matches!(reason_a, ExitReason::Panic(_))); + // B should propagate A's reason (transitive propagation) + assert!(matches!(reason_b, ExitReason::Panic(_))); + }); + } + + #[test] + pub fn link_delivers_exit_to_trapping_peer() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let a = make_trapper(false); + let b = make_trapper(true); // B traps + b.request(LinkTo(a.child_handle())).await.unwrap(); + + struct Boom2; + impl Message for Boom2 { + type Result = (); + } + impl Handler for TrapActor { + async fn handle(&mut self, _msg: Boom2, _ctx: &Context) { + panic!("boom2"); + } + } + + let _ = a.send(Boom2); + a.wait_exit().await; + // Give B time to process the Exit message + rt::sleep(Duration::from_millis(50)).await; + + // B should still be alive and have received an Exit + assert!(b.exit_reason().is_none()); + let exits = b.request(GetExits).await.unwrap(); + assert_eq!(exits.len(), 1); + assert_eq!(exits[0].from, a.id()); + assert!(matches!(exits[0].reason, ExitReason::Panic(_))); + + // Clean up + let bh = b.child_handle(); + bh.stop(); + bh.wait_exit_async().await; + }); + } + + #[test] + pub fn link_normal_exit_not_propagated_to_non_trapping_peer() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let a = Counter { count: 0 }.start(); + let b = make_trapper(false); + b.request(LinkTo(a.child_handle())).await.unwrap(); + + // A stops cleanly + a.request(StopCounter).await.unwrap(); + a.join().await; + rt::sleep(Duration::from_millis(50)).await; + + // B should still be alive + assert!(b.exit_reason().is_none()); + + let bh = b.child_handle(); + bh.stop(); + bh.wait_exit_async().await; + }); + } + + #[test] + pub fn link_to_already_dead_actor_delivers_signal() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // Start and panic a target + struct Boom3; + impl Message for Boom3 { + type Result = (); + } + impl Handler for TrapActor { + async fn handle(&mut self, _msg: Boom3, _ctx: &Context) { + panic!("boom3"); + } + } + let target = make_trapper(false); + let _ = target.send(Boom3); + target.wait_exit().await; + + // Now link from a trapping observer — should receive Exit immediately + let observer = make_trapper(true); + observer + .request(LinkTo(target.child_handle())) + .await + .unwrap(); + + rt::sleep(Duration::from_millis(50)).await; + let exits = observer.request(GetExits).await.unwrap(); + assert_eq!(exits.len(), 1); + assert!(matches!(exits[0].reason, ExitReason::Panic(_))); + + let oh = observer.child_handle(); + oh.stop(); + oh.wait_exit_async().await; + }); + } + + #[test] + pub fn unlink_prevents_signal_delivery() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let a = Counter { count: 0 }.start(); + let b = make_trapper(true); + b.request(LinkTo(a.child_handle())).await.unwrap(); + + // Now unlink + struct UnlinkFrom(crate::ChildHandle); + impl Message for UnlinkFrom { + type Result = (); + } + impl Handler for TrapActor { + async fn handle(&mut self, msg: UnlinkFrom, ctx: &Context) { + ctx.unlink(&msg.0); + } + } + b.request(UnlinkFrom(a.child_handle())).await.unwrap(); + + // A panics + struct Boom4; + impl Message for Boom4 { + type Result = u64; + } + impl Handler for Counter { + async fn handle(&mut self, _msg: Boom4, _ctx: &Context) -> u64 { + panic!("boom4"); + } + } + // Counter's Boom4 returns u64; just use send with no expectation of reply + let _ = a.send(Boom4); + a.wait_exit().await; + rt::sleep(Duration::from_millis(50)).await; + + // B should NOT have received an Exit + let exits = b.request(GetExits).await.unwrap(); + assert!(exits.is_empty()); + + let bh = b.child_handle(); + bh.stop(); + bh.wait_exit_async().await; + }); + } + + #[test] + pub fn duplicate_link_is_idempotent() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let a = Counter { count: 0 }.start(); + let b = make_trapper(true); + b.request(LinkTo(a.child_handle())).await.unwrap(); + b.request(LinkTo(a.child_handle())).await.unwrap(); // duplicate + + struct Boom5; + impl Message for Boom5 { + type Result = u64; + } + impl Handler for Counter { + async fn handle(&mut self, _msg: Boom5, _ctx: &Context) -> u64 { + panic!("boom5"); + } + } + let _ = a.send(Boom5); + a.wait_exit().await; + rt::sleep(Duration::from_millis(50)).await; + + // Should receive only ONE Exit, not two + let exits = b.request(GetExits).await.unwrap(); + assert_eq!(exits.len(), 1); + + let bh = b.child_handle(); + bh.stop(); + bh.wait_exit_async().await; + }); + } + + #[test] + pub fn chain_propagation_through_non_trapping_middle() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // A linked to B linked to C. A panics → B dies (not trapping) → + // B's death propagates to C. + let a = make_trapper(false); + let b = make_trapper(false); + let c = make_trapper(true); // C traps + + a.request(LinkTo(b.child_handle())).await.unwrap(); + c.request(LinkTo(b.child_handle())).await.unwrap(); + + struct Boom6; + impl Message for Boom6 { + type Result = (); + } + impl Handler for TrapActor { + async fn handle(&mut self, _msg: Boom6, _ctx: &Context) { + panic!("boom6"); + } + } + let _ = a.send(Boom6); + a.wait_exit().await; + b.wait_exit().await; + rt::sleep(Duration::from_millis(100)).await; + + // C should have received an Exit (B's death propagated) + let exits = c.request(GetExits).await.unwrap(); + assert_eq!(exits.len(), 1, "expected C to receive Exit from B's death"); + + let ch = c.child_handle(); + ch.stop(); + ch.wait_exit_async().await; + }); + } + + #[test] + pub fn start_linked_links_atomically() { + // Parent + child via start_linked. When parent stops via panic, + // child (not trapping) should die too. + struct Parent; + struct PanicParent; + impl Message for PanicParent { + type Result = (); + } + impl Actor for Parent {} + impl Handler for Parent { + async fn handle(&mut self, _msg: PanicParent, _ctx: &Context) { + panic!("parent boom"); + } + } + + struct Child; + impl Actor for Child {} + + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let parent = Parent.start(); + let child = Child.start_linked(&parent.context()); + + // Parent panics — child should die too + let _ = parent.send(PanicParent); + parent.wait_exit().await; + // Child should also die (link propagation) + let reason = child.wait_exit().await; + // Reason should be the parent's panic + assert!(matches!(reason, ExitReason::Panic(_))); + }); + } } diff --git a/concurrency/src/threads/actor.rs b/concurrency/src/threads/actor.rs index 329d3e7..0d6e696 100644 --- a/concurrency/src/threads/actor.rs +++ b/concurrency/src/threads/actor.rs @@ -14,6 +14,10 @@ use std::{ use crate::child_handle::{ActorId, ChildHandle}; use crate::error::{panic_message, ActorError, ExitReason}; +use crate::link::{ + self, new_link_table, new_linked_exit_reason, new_trap_exit_flag, Exit, LinkTable, + LinkedExitReason, SendExitFn, TrapExitFlag, +}; use crate::message::Message; use crate::monitor::{Down, MonitorRef}; @@ -37,6 +41,10 @@ pub use crate::response::DEFAULT_REQUEST_TIMEOUT; pub trait Actor: Send + Sized + 'static { fn started(&mut self, _ctx: &Context) {} fn stopped(&mut self, _ctx: &Context) {} + + /// Called when a linked actor stops, if this actor has called + /// `ctx.trap_exit(true)`. Default impl ignores the signal. + fn exit_received(&mut self, _exit: Exit, _ctx: &Context) {} } // --------------------------------------------------------------------------- @@ -76,6 +84,18 @@ where } } +/// System envelope that dispatches an `Exit` notification to the actor's +/// `exit_received` callback. Does not require `Handler` on the actor. +struct ExitEnvelope { + exit: Exit, +} + +impl Envelope for ExitEnvelope { + fn handle(self: Box, actor: &mut A, ctx: &Context) { + actor.exit_received(self.exit, ctx); + } +} + // --------------------------------------------------------------------------- // Context // --------------------------------------------------------------------------- @@ -90,6 +110,9 @@ pub struct Context { cancellation_token: CancellationToken, completion: Arc<(Mutex>, Condvar)>, monitors: MonitorTable, + trap_exit: TrapExitFlag, + links: LinkTable, + linked_reason: LinkedExitReason, } impl Clone for Context { @@ -100,6 +123,9 @@ impl Clone for Context { cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), monitors: self.monitors.clone(), + trap_exit: self.trap_exit.clone(), + links: self.links.clone(), + linked_reason: self.linked_reason.clone(), } } } @@ -120,6 +146,9 @@ impl Context { cancellation_token: actor_ref.cancellation_token.clone(), completion: actor_ref.completion.clone(), monitors: actor_ref.monitors.clone(), + trap_exit: actor_ref.trap_exit.clone(), + links: actor_ref.links.clone(), + linked_reason: actor_ref.linked_reason.clone(), } } @@ -205,6 +234,9 @@ impl Context { cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), monitors: self.monitors.clone(), + trap_exit: self.trap_exit.clone(), + links: self.links.clone(), + linked_reason: self.linked_reason.clone(), } } @@ -278,6 +310,86 @@ impl Context { } } + /// Set up a bidirectional link with another actor. + /// + /// When either side dies abnormally, the other receives an exit signal. + /// By default the receiver is terminated by the signal. Call + /// [`trap_exit(true)`] to convert signals into `Exit` messages delivered + /// via [`Actor::exit_received`] instead. + /// + /// If the target is already dead with an abnormal reason, the exit + /// signal is delivered immediately. Calling `link` twice on the same + /// peer is a no-op (links are idempotent). + /// + /// [`trap_exit(true)`]: Self::trap_exit + /// [`Actor::exit_received`]: Actor::exit_received + pub fn link(&self, target: &ChildHandle) { + let own_signal = link::make_signal( + self.trap_exit.clone(), + self.own_cancel_fn(), + self.own_send_exit_fn(), + self.linked_reason.clone(), + ); + let peer_signal = link::make_signal( + target.trap_exit_flag().clone(), + target.cancel_fn().clone(), + target.send_exit_fn().clone(), + target.linked_reason().clone(), + ); + link::register_link( + self.id, + &self.links, + own_signal, + target.id(), + target.links(), + peer_signal, + ); + + // If the target is already dead, deliver the signal immediately. + if let Some(reason) = target.exit_reason() { + let signal = link::make_signal( + self.trap_exit.clone(), + self.own_cancel_fn(), + self.own_send_exit_fn(), + self.linked_reason.clone(), + ); + signal(target.id(), reason); + } + } + + /// Remove a previously-set bidirectional link. + pub fn unlink(&self, target: &ChildHandle) { + link::unregister_link(self.id, &self.links, target.id(), target.links()); + } + + /// Control how exit signals from linked actors are handled. + /// + /// - `false` (default): an abnormal exit signal from a linked actor cancels + /// this actor. + /// - `true`: the signal is converted to an `Exit` message and delivered to + /// [`Actor::exit_received`]. + /// + /// `Kill` is untrappable. + pub fn trap_exit(&self, enabled: bool) { + self.trap_exit.store(enabled, Ordering::Release); + } + + /// Build a type-erased cancel closure for this actor. + fn own_cancel_fn(&self) -> Arc { + let token = self.cancellation_token.clone(); + Arc::new(move || token.cancel()) + } + + /// Build a type-erased `SendExitFn` that enqueues an `ExitEnvelope` + /// onto this actor's mailbox. + fn own_send_exit_fn(&self) -> SendExitFn { + let sender = self.sender.clone(); + Arc::new(move |exit: Exit| { + let envelope: Box + Send> = Box::new(ExitEnvelope { exit }); + sender.send(envelope).map_err(|_| ActorError::ActorStopped) + }) + } + pub(crate) fn cancellation_token(&self) -> CancellationToken { self.cancellation_token.clone() } @@ -360,6 +472,9 @@ pub struct ActorRef { cancellation_token: CancellationToken, completion: Arc<(Mutex>, Condvar)>, monitors: MonitorTable, + trap_exit: TrapExitFlag, + links: LinkTable, + linked_reason: LinkedExitReason, } impl Debug for ActorRef { @@ -376,6 +491,9 @@ impl Clone for ActorRef { cancellation_token: self.cancellation_token.clone(), completion: self.completion.clone(), monitors: self.monitors.clone(), + trap_exit: self.trap_exit.clone(), + links: self.links.clone(), + linked_reason: self.linked_reason.clone(), } } } @@ -485,10 +603,19 @@ impl ActorRef { impl From> for ChildHandle { fn from(actor_ref: ActorRef) -> Self { + let sender = actor_ref.sender.clone(); + let send_exit: SendExitFn = Arc::new(move |exit: Exit| { + let envelope: Box + Send> = Box::new(ExitEnvelope { exit }); + sender.send(envelope).map_err(|_| ActorError::ActorStopped) + }); ChildHandle::from_threads( actor_ref.id, actor_ref.cancellation_token, actor_ref.completion, + actor_ref.trap_exit, + actor_ref.links, + actor_ref.linked_reason, + send_exit, ) } } @@ -519,6 +646,9 @@ impl ActorRef { let completion = Arc::new((Mutex::new(None), Condvar::new())); let id = ActorId::next(); let monitors: MonitorTable = Arc::new(Mutex::new(HashMap::new())); + let trap_exit = new_trap_exit_flag(); + let links = new_link_table(); + let linked_reason = new_linked_exit_reason(); let actor_ref = ActorRef { id, @@ -526,6 +656,9 @@ impl ActorRef { cancellation_token: cancellation_token.clone(), completion: completion.clone(), monitors: monitors.clone(), + trap_exit: trap_exit.clone(), + links: links.clone(), + linked_reason: linked_reason.clone(), }; let ctx = Context { @@ -534,14 +667,32 @@ impl ActorRef { cancellation_token: cancellation_token.clone(), completion: actor_ref.completion.clone(), monitors, + trap_exit, + links: links.clone(), + linked_reason: linked_reason.clone(), }; let _thread_handle = rt::spawn(move || { let mut guard = CompletionGuard { completion, - reason: None, // defaults to Kill if run_actor panics unexpectedly + reason: None, }; - guard.reason = Some(run_actor(actor, ctx, rx, cancellation_token)); + let mut reason = run_actor(actor, ctx, rx, cancellation_token); + // If the actor was cancelled by a link signal (rather than + // ctx.stop()), use the linked reason as our exit reason so the + // death propagates transitively through further links. + if matches!(reason, ExitReason::Normal) { + if let Some(linked) = linked_reason + .lock() + .unwrap_or_else(|p| p.into_inner()) + .take() + { + reason = linked; + } + } + // Propagate exit signal to linked actors before publishing completion. + link::propagate_exit(id, &links, &reason); + guard.reason = Some(reason); }); actor_ref @@ -626,6 +777,13 @@ pub trait ActorStart: Actor { fn start(self) -> ActorRef { ActorRef::spawn(self) } + + /// Atomically start the actor and link it to the caller's context. + fn start_linked(self, parent_ctx: &Context

) -> ActorRef { + let actor_ref = self.start(); + parent_ctx.link(&actor_ref.child_handle()); + actor_ref + } } impl ActorStart for A {} @@ -1077,4 +1235,129 @@ mod tests { assert_eq!(downs.len(), 1); assert!(matches!(downs[0].reason, ExitReason::Panic(_))); } + + // --- Link tests --- + + struct TrapActor { + exits: Arc>>, + trap: bool, + } + + struct GetExits; + impl Message for GetExits { + type Result = Vec; + } + struct LinkTo(crate::ChildHandle); + impl Message for LinkTo { + type Result = (); + } + + impl Actor for TrapActor { + fn started(&mut self, ctx: &Context) { + ctx.trap_exit(self.trap); + } + fn exit_received(&mut self, exit: Exit, _ctx: &Context) { + self.exits.lock().unwrap().push(exit); + } + } + + impl Handler for TrapActor { + fn handle(&mut self, msg: LinkTo, ctx: &Context) { + ctx.link(&msg.0); + } + } + + impl Handler for TrapActor { + fn handle(&mut self, _msg: GetExits, _ctx: &Context) -> Vec { + self.exits.lock().unwrap().clone() + } + } + + fn make_trapper(trap: bool) -> ActorRef { + TrapActor { + exits: Arc::new(Mutex::new(Vec::new())), + trap, + } + .start() + } + + #[test] + fn link_propagates_panic_to_non_trapping_peer_threads() { + struct Boom; + impl Message for Boom { + type Result = (); + } + impl Handler for TrapActor { + fn handle(&mut self, _msg: Boom, _ctx: &Context) { + panic!("boom"); + } + } + + let a = make_trapper(false); + let b = make_trapper(false); + a.request(LinkTo(b.child_handle())).unwrap(); + + let _ = a.send(Boom); + let reason_a = a.wait_exit(); + let reason_b = b.wait_exit(); + assert!(matches!(reason_a, ExitReason::Panic(_))); + assert!(matches!(reason_b, ExitReason::Panic(_))); + } + + #[test] + fn link_delivers_exit_to_trapping_peer_threads() { + struct Boom; + impl Message for Boom { + type Result = (); + } + impl Handler for TrapActor { + fn handle(&mut self, _msg: Boom, _ctx: &Context) { + panic!("boom"); + } + } + + let a = make_trapper(false); + let b = make_trapper(true); + b.request(LinkTo(a.child_handle())).unwrap(); + + let _ = a.send(Boom); + a.wait_exit(); + rt::sleep(Duration::from_millis(150)); + + assert!(b.exit_reason().is_none()); + let exits = b.request(GetExits).unwrap(); + assert_eq!(exits.len(), 1); + assert_eq!(exits[0].from, a.id()); + assert!(matches!(exits[0].reason, ExitReason::Panic(_))); + + let bh = b.child_handle(); + bh.stop(); + bh.wait_exit_blocking(); + } + + #[test] + fn start_linked_threads() { + struct Parent; + struct PanicParent; + impl Message for PanicParent { + type Result = (); + } + impl Actor for Parent {} + impl Handler for Parent { + fn handle(&mut self, _msg: PanicParent, _ctx: &Context) { + panic!("parent boom"); + } + } + + struct Child; + impl Actor for Child {} + + let parent = Parent.start(); + let child = Child.start_linked(&parent.context()); + + let _ = parent.send(PanicParent); + parent.wait_exit(); + let reason = child.wait_exit(); + assert!(matches!(reason, ExitReason::Panic(_))); + } } diff --git a/examples/exit_reason/src/main.rs b/examples/exit_reason/src/main.rs index 6d06215..e5e8f93 100644 --- a/examples/exit_reason/src/main.rs +++ b/examples/exit_reason/src/main.rs @@ -1,6 +1,6 @@ use spawned_concurrency::protocol; use spawned_concurrency::tasks::{Actor, ActorStart, Context, Handler}; -use spawned_concurrency::{ChildHandle, Down, MonitorRef, Response}; +use spawned_concurrency::{ChildHandle, Down, Exit, MonitorRef, Response}; use spawned_rt::tasks as rt; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -236,6 +236,70 @@ fn main() { observer.child_handle().stop(); observer.join().await; + // 9. Links and trap_exit + println!("\n--- Scenario 9: Link with trap_exit (Erlang-style supervision) ---"); + + struct Supervisor { + log: Arc>>, + } + struct LinkChild(ChildHandle); + impl spawned_concurrency::message::Message for LinkChild { + type Result = (); + } + impl Actor for Supervisor { + async fn started(&mut self, ctx: &Context) { + // Supervisors trap exits so they observe child deaths as messages + // instead of dying along with their children. + ctx.trap_exit(true); + tracing::info!("[supervisor] trap_exit enabled"); + } + async fn exit_received(&mut self, exit: Exit, _ctx: &Context) { + tracing::info!("[supervisor] child {} died: {}", exit.from, exit.reason); + self.log.lock().unwrap().push(exit); + } + } + impl Handler for Supervisor { + async fn handle(&mut self, msg: LinkChild, ctx: &Context) { + ctx.link(&msg.0); + } + } + + let sup_log = Arc::new(Mutex::new(Vec::new())); + let supervisor = Supervisor { + log: sup_log.clone(), + } + .start(); + + let worker_x = Worker::new("worker-x").start(); + let worker_y = Worker::new("worker-y").start(); + supervisor + .request(LinkChild(worker_x.child_handle())) + .await + .unwrap(); + supervisor + .request(LinkChild(worker_y.child_handle())) + .await + .unwrap(); + println!(" Supervisor linked to worker-x and worker-y, trap_exit=true"); + + worker_x.stop().await.unwrap(); // normal exit — supervisor still notified (trap_exit=true) + let _ = worker_y.panic_now().await; + rt::sleep(Duration::from_millis(100)).await; + + let snapshot = sup_log.lock().unwrap().clone(); + println!(" Supervisor received {} Exit messages:", snapshot.len()); + for e in &snapshot { + println!(" from {} — {}", e.from, e.reason); + } + + // Supervisor is still alive thanks to trap_exit + println!( + " Supervisor still running: {}", + supervisor.exit_reason().is_none() + ); + supervisor.child_handle().stop(); + supervisor.join().await; + // Give tracing a moment to flush rt::sleep(Duration::from_millis(50)).await; println!("\n=== Done ==="); From 0f48d9d5fa950be9e9400f6c4159037ce35c6b04 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 14 May 2026 16:00:24 -0300 Subject: [PATCH 2/2] fix: address bot review feedback on links PR --- concurrency/src/link.rs | 46 +++++++++++++++++--- concurrency/src/tasks/actor.rs | 75 +++++++++++++++++++++++++++----- concurrency/src/threads/actor.rs | 32 ++++++++++---- 3 files changed, 127 insertions(+), 26 deletions(-) diff --git a/concurrency/src/link.rs b/concurrency/src/link.rs index 0c58fba..7dd37a3 100644 --- a/concurrency/src/link.rs +++ b/concurrency/src/link.rs @@ -33,13 +33,13 @@ pub(crate) type SendExitFn = Arc Result<(), ActorError> + Send + /// Per-actor flag controlling how exit signals from linked actors are handled. /// `false` (default): the receiver is cancelled. `true`: the receiver gets an /// `Exit` message via `Actor::exit_received`. -pub type TrapExitFlag = Arc; +pub(crate) type TrapExitFlag = Arc; /// Per-actor slot holding the exit reason of a linked actor whose death /// triggered cancellation. When a non-trapping actor is cancelled by a link /// signal, this slot is set so the actor's own exit reason propagates /// transitively through further links. -pub type LinkedExitReason = Arc>>; +pub(crate) type LinkedExitReason = Arc>>; /// Create a new empty linked-exit-reason slot. pub(crate) fn new_linked_exit_reason() -> LinkedExitReason { @@ -179,14 +179,24 @@ pub(crate) fn propagate_exit(own_id: ActorId, own_links: &LinkTable, reason: &Ex for entry in &entries { // Remove ourselves from the peer's link table so they don't try to // signal us back (we're dead). - if let Ok(mut peer_table) = entry.peer_links.lock() { - peer_table.retain(|e| e.peer_id != own_id); - } + let mut peer_table = entry.peer_links.lock().unwrap_or_else(|p| p.into_inner()); + peer_table.retain(|e| e.peer_id != own_id); + drop(peer_table); // Deliver the exit signal to the peer. (entry.signal)(own_id, reason.clone()); } } +/// Atomically remove `own_id` from `peer_links`. Returns `true` if an entry +/// was actually removed. Used by `ctx.link()` to detect whether the peer's +/// `propagate_exit` has already drained the table. +pub(crate) fn take_self_from_peer_table(own_id: ActorId, peer_links: &LinkTable) -> bool { + let mut table = peer_links.lock().unwrap_or_else(|p| p.into_inner()); + let len_before = table.len(); + table.retain(|e| e.peer_id != own_id); + len_before != table.len() +} + #[cfg(test)] mod tests { use super::*; @@ -212,4 +222,30 @@ mod tests { let flag = new_trap_exit_flag(); assert!(!flag.load(Ordering::Acquire)); } + + #[test] + fn take_self_from_peer_table_returns_true_when_present() { + let peer_links = new_link_table(); + let own_id = ActorId::next(); + // Insert a fake entry for own_id + let dummy_signal: ExitSignalFn = Arc::new(|_, _| {}); + peer_links.lock().unwrap().push(LinkEntry { + peer_id: own_id, + signal: dummy_signal, + peer_links: new_link_table(), + }); + // First call: present, returns true and removes + assert!(take_self_from_peer_table(own_id, &peer_links)); + // Second call: gone, returns false + assert!(!take_self_from_peer_table(own_id, &peer_links)); + assert!(peer_links.lock().unwrap().is_empty()); + } + + #[test] + fn take_self_from_peer_table_returns_false_when_absent() { + let peer_links = new_link_table(); + let own_id = ActorId::next(); + // Empty table — no entry for own_id + assert!(!take_self_from_peer_table(own_id, &peer_links)); + } } diff --git a/concurrency/src/tasks/actor.rs b/concurrency/src/tasks/actor.rs index c029d74..a3a041c 100644 --- a/concurrency/src/tasks/actor.rs +++ b/concurrency/src/tasks/actor.rs @@ -392,15 +392,22 @@ impl Context { peer_signal, ); - // If the target is already dead, deliver the signal immediately to us. + // If the target is already dead, we need to deliver the signal + // ourselves — but only if the target's own `propagate_exit` hasn't + // already done so. Atomically remove ourselves from the target's link + // table: if we removed an entry, the target hasn't drained yet (or + // hadn't yet seen our entry), so we deliver. If nothing was removed, + // the target already signaled us — don't double-deliver. if let Some(reason) = target.exit_reason() { - let signal = link::make_signal( - self.trap_exit.clone(), - self.own_cancel_fn(), - self.own_send_exit_fn(), - self.linked_reason.clone(), - ); - signal(target.id(), reason); + if link::take_self_from_peer_table(self.id, target.links()) { + let signal = link::make_signal( + self.trap_exit.clone(), + self.own_cancel_fn(), + self.own_send_exit_fn(), + self.linked_reason.clone(), + ); + signal(target.id(), reason); + } } } @@ -822,11 +829,13 @@ pub trait ActorStart: Actor { ActorRef::spawn(self, backend) } - /// Atomically start the actor and link it to the caller's context. + /// Start the actor and link it to the caller's context. /// - /// The link is established before the new actor processes any messages, - /// closing the race window where a child could die before the parent's - /// `link()` call completes. + /// The link is registered immediately after the actor is spawned. This is + /// **not strictly atomic** — the child may begin executing `started()` and + /// process messages before the link is established. However, if the child + /// dies in that window, [`Context::link`] detects the dead target and + /// delivers the exit signal as a fallback, so no signal is lost. fn start_linked(self, parent_ctx: &Context

) -> ActorRef { let actor_ref = self.start(); parent_ctx.link(&actor_ref.child_handle()); @@ -2066,6 +2075,48 @@ mod tests { }); } + #[test] + pub fn link_to_already_dead_delivers_exactly_once_to_trapping_peer() { + // Regression: previously, `ctx.link()` would deliver a duplicate Exit + // to trapping actors when the target died concurrently — once from + // propagate_exit (since register_link inserted self in target's table + // before target's drain ran) and once from the fallback exit_reason() + // check. + struct Boom7; + impl Message for Boom7 { + type Result = (); + } + impl Handler for TrapActor { + async fn handle(&mut self, _msg: Boom7, _ctx: &Context) { + panic!("boom7"); + } + } + + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let target = make_trapper(false); + // Kick off the panic + let _ = target.send(Boom7); + // Wait for target to fully die so exit_reason() is Some + target.wait_exit().await; + + // Now link from a trapping observer — should receive Exit EXACTLY ONCE + let observer = make_trapper(true); + observer + .request(LinkTo(target.child_handle())) + .await + .unwrap(); + + rt::sleep(Duration::from_millis(100)).await; + let exits = observer.request(GetExits).await.unwrap(); + assert_eq!(exits.len(), 1, "expected exactly one Exit, got {:?}", exits); + + let oh = observer.child_handle(); + oh.stop(); + oh.wait_exit_async().await; + }); + } + #[test] pub fn start_linked_links_atomically() { // Parent + child via start_linked. When parent stops via panic, diff --git a/concurrency/src/threads/actor.rs b/concurrency/src/threads/actor.rs index 0d6e696..20788d4 100644 --- a/concurrency/src/threads/actor.rs +++ b/concurrency/src/threads/actor.rs @@ -345,15 +345,19 @@ impl Context { peer_signal, ); - // If the target is already dead, deliver the signal immediately. + // If the target is already dead, we need to deliver the signal + // ourselves — but only if the target's own `propagate_exit` hasn't + // already done so. See the tasks-mode counterpart for the rationale. if let Some(reason) = target.exit_reason() { - let signal = link::make_signal( - self.trap_exit.clone(), - self.own_cancel_fn(), - self.own_send_exit_fn(), - self.linked_reason.clone(), - ); - signal(target.id(), reason); + if link::take_self_from_peer_table(self.id, target.links()) { + let signal = link::make_signal( + self.trap_exit.clone(), + self.own_cancel_fn(), + self.own_send_exit_fn(), + self.linked_reason.clone(), + ); + signal(target.id(), reason); + } } } @@ -675,6 +679,10 @@ impl ActorRef { let _thread_handle = rt::spawn(move || { let mut guard = CompletionGuard { completion, + // If run_actor panics at the thread boundary (escaping its + // internal catch_unwind), the guard's Drop fires with + // reason=None, which the guard converts to an abnormal exit + // reason. See CompletionGuard::drop. reason: None, }; let mut reason = run_actor(actor, ctx, rx, cancellation_token); @@ -778,7 +786,13 @@ pub trait ActorStart: Actor { ActorRef::spawn(self) } - /// Atomically start the actor and link it to the caller's context. + /// Start the actor and link it to the caller's context. + /// + /// The link is registered immediately after the actor is spawned. This is + /// **not strictly atomic** — the child may begin executing `started()` and + /// process messages before the link is established. However, if the child + /// dies in that window, [`Context::link`] detects the dead target and + /// delivers the exit signal as a fallback, so no signal is lost. fn start_linked(self, parent_ctx: &Context

) -> ActorRef { let actor_ref = self.start(); parent_ctx.link(&actor_ref.child_handle());