Skip to content

feat: add monitor/demonitor for unidirectional actor death observation#165

Open
ElFantasma wants to merge 4 commits intomainfrom
feat/monitors
Open

feat: add monitor/demonitor for unidirectional actor death observation#165
ElFantasma wants to merge 4 commits intomainfrom
feat/monitors

Conversation

@ElFantasma
Copy link
Copy Markdown
Collaborator

Summary

  • Adds MonitorRef and Down types in new concurrency/src/monitor.rs
  • Context::monitor(&child_handle) -> MonitorRef spawns a watcher that delivers Down { monitor_ref, reason } to the monitoring actor when the target stops
  • Context::demonitor(monitor_ref) cancels the monitor (flag-based: watcher checks before delivering)
  • Both tasks and threads execution modes
  • Already-dead targets deliver Down immediately
  • Multiple independent monitors allowed on the same target — each gets a unique MonitorRef
  • Monitors are unidirectional: the target is unaware
  • Extends exit_reason example with Scenario 8 demonstrating an Observer monitoring two workers

Phase 3c of the supervision-trees roadmap. Building block alongside ChildHandle (#164) and ExitReason (#163) for the supervisor work to come.

Design notes

  • Per-actor MonitorTable (Arc<Mutex<HashMap<MonitorRef, Arc<AtomicBool>>>>) shared across Context and ActorRef clones
  • The watcher (tokio task in tasks mode, OS thread in threads mode) awaits ChildHandle::wait_exit_*, then atomically checks the active flag before sending Down
  • demonitor removes the entry from the table and clears the flag — the watcher still runs to completion but won't deliver

Why flag-based cancellation

Tokio tasks and OS threads cannot be cleanly cancelled from outside while waiting on a blocking operation (the threads-mode wait_exit_blocking blocks on a Condvar). Using a "deliver if active" flag keeps the implementation symmetric across modes and avoids the runtime-flavor pitfalls we hit in PR #164. The watcher resource is held until the target eventually dies, but this is acceptable for typical supervision patterns where children eventually exit.

Test plan

  • 12 new unit tests (5 per execution mode + 2 type-level): monitor running actor, already-dead actor, demonitor, multiple monitors, panic observation
  • All 96 tests pass (no regressions)
  • Runnable example: cargo run -p exit_reason (Scenario 8 demonstrates monitor)
  • Clippy clean

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

🤖 Kimi Code Review

Overall this is a solid implementation of Erlang-style monitors with proper compile-time safety via the Handler<Down> bound. However, there are memory management and concurrency issues that need addressing.

Critical Issues

1. Memory Leak in Monitor Table (Both Async and Thread)

Files: concurrency/src/tasks/actor.rs, concurrency/src/threads/actor.rs

Monitor table entries are never cleaned up when the target actor exits. They are only removed on explicit demonitor() calls. If an actor monitors many others over its lifetime, the HashMap grows unboundedly.

Fix: The spawned watcher task should remove its entry after delivering (or skipping) the Down message.

// In tasks/actor.rs around line 270-282
rt::spawn(async move {
    let reason = target.wait_exit_async().await;
    let should_send = active.load(Ordering::Acquire);
    
    // Clean up the entry
    actor_ref.monitors.lock()
        .unwrap_or_else(|p| p.into_inner())
        .remove(&monitor_ref);
    
    if should_send {
        let _ = actor_ref.send(Down { monitor_ref, reason });
    }
});

2. Race Condition Window (Documented but Worsened by Missing Cleanup)

Files: concurrency/src/tasks/actor.rs:276, concurrency/src/threads/actor.rs:240

While the doc comment acknowledges "best-effort" cancellation, the current implementation widens the race window because the active flag is never cleared from memory, leaving a dangling Arc<AtomicBool> for every monitor ever created.

Recommendation: Combine the cleanup with the atomic check using Arc::try_unwrap or move the flag check and removal into a single locked section if strict ordering is required (though this adds contention).

Minor Issues

3. Unused last_ref in Async Tests

File: concurrency/src/tasks/actor.rs:1260-1261, 1268

The Watcher struct in async tests has a last_ref field that is never actually used in assertions (unlike the field usage pattern in the thread tests). Remove to simplify:

struct Watcher {
    downs: Arc<Mutex<Vec<crate::monitor::Down>>>,
    // Remove: last_ref: Arc<Mutex<Option<crate::monitor::MonitorRef>>>,
}

4. Poisoned Mutex Recovery Consistency

Files: Both actor implementations use unwrap_or_else(|p| p.into_inner()) on mutex poisoning.

While this recovers from poisoned locks, it may hide bugs where a panic in one monitor callback leaves the table in an inconsistent state. Consider whether poisoned locks should be propagated (panic) rather than silently recovered, given this is an internal implementation detail.

5. Ordering in ID Generation

File: concurrency/src/monitor.rs:13

AtomicU64::fetch_add uses Ordering::Relaxed. While acceptable for unique ID generation, if these IDs are ever used for cross-thread synchronization (e.g., ordering guarantees), consider Ordering::SeqCst. For current usage, Relaxed is correct.

Positive Aspects

  • Type Safety: The where A: Handler<Down> bound ensures compile-time verification that the monitoring actor can receive Down messages.
  • Proper Atomic Ordering: Uses Acquire/Release correctly for the active flag synchronization between demonitor and watcher tasks.
  • Backend Parity: Both async and thread implementations are consistent in their approach.
  • Documentation: Clear documentation of the "best-effort" cancellation semantics.

Suggested Refactoring

Consider extracting the monitor management logic into a shared internal module to reduce duplication between tasks/actor.rs and threads/actor.rs, though this may be complicated by the different runtime primitives (spawn vs spawn_blocking).

Verdict: Address the memory leak (Issue #1) before merging. The race condition is acceptable given documented semantics, but the resource leak is not.


Automated review by Kimi (Moonshot AI) · kimi-k2.5 · custom prompt

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

🤖 Claude Code Review

PR #165monitor/demonitor for unidirectional actor death observation

Overview

Adds Erlang-style monitor/demonitor to both tasks and threads execution modes. A MonitorRef is returned from ctx.monitor(&child_handle) and a Down { monitor_ref, reason } message is delivered to the monitoring actor when the target stops. Clean design overall — the API is idiomatic and the implementation is symmetric across backends.


Correctness Issues

1. Memory leak in MonitorTable (significant)

concurrency/src/tasks/actor.rs and threads/actor.rsmonitor() impl

After a watcher fires (target dies), the (monitor_ref, Arc<AtomicBool>) entry is never removed from the MonitorTable. Only demonitor removes entries. For a long-lived actor that monitors many transient children, the HashMap grows without bound.

The fix is to have the watcher own a reference to the table and remove its own entry atomically. This also eliminates the narrow race window between the active.load and send:

let monitors_ref = Arc::clone(&self.monitors);
rt::spawn(async move {
    let reason = target.wait_exit_async().await;
    let was_active = {
        let mut table = monitors_ref.lock().unwrap_or_else(|p| p.into_inner());
        table.remove(&monitor_ref)
            .map(|a| a.load(Ordering::Acquire))
            .unwrap_or(false)
    };
    if was_active {
        let _ = actor_ref.send(Down { monitor_ref, reason });
    }
});

With this approach:

  • If the watcher locks first: removes entry, sees active=true, delivers Down, table is clean.
  • If demonitor locks first: removes entry, sets active=false; watcher then finds nothing → was_active=false, no Down.
  • In both paths the entry is gone after the watcher runs.

Same fix applies to the threads-mode implementation.

2. Ordering::Relaxed for MonitorRef::next() — acceptable

monitor.rs:10Relaxed is correct here; uniqueness only requires atomicity, not ordering.


Concurrency / Resource Concerns

3. OS thread per watcher in threads mode (acknowledged, worth a call-out)

threads/actor.rsmonitor() impl

Each monitor spawns a raw OS thread that blocks on wait_exit_blocking() until the target dies. Unlike tokio tasks, OS threads are heavyweight (~8 MB stack each by default). If an actor monitors dozens of long-lived targets simultaneously, this could exhaust available threads or memory.

This is called out in the PR description as an accepted trade-off, but it is worth documenting in the monitor doc-comment for the threads-mode Context so callers are aware of the cost.

4. Watcher outlives the monitoring actor — benign but unclean

If the monitoring actor stops before its targets die, the watcher task/thread still runs and eventually calls actor_ref.send(Down {...}), which fails silently. The entry also remains in the (now orphaned) monitors HashMap. No crash, but a live watcher task/thread and a growing HashMap with no owner consuming it is sloppy resource management.

After the monitoring actor stops, the monitors Arc is the only remaining owner of the table, and no one will ever drain it. If the fix in point 1 is applied (watcher removes its own entry), this is automatically handled.


Test Reliability

5. Sleep-based synchronization is flaky

tasks/actor.rs:1403, 1418, 1449, 1467, 1497 and threads counterparts

rt::sleep(Duration::from_millis(50)).await;
let downs = watcher.request(GetDowns).await.unwrap();

Using a fixed sleep to wait for asynchronous delivery is fragile in CI under load. A better pattern is to poll with a timeout:

let deadline = Instant::now() + Duration::from_secs(1);
loop {
    let downs = watcher.request(GetDowns).await.unwrap();
    if downs.len() == expected { break; }
    assert!(Instant::now() < deadline, "timed out waiting for Down");
    rt::sleep(Duration::from_millis(10)).await;
}

Or expose a wait_for_down test helper. The threads tests use 150ms/200ms timeouts which suggests this has already been bumped to reduce flakiness — a deterministic approach is more robust.


Minor Points

6. Down not removed from monitors on demonitor after delivery

If demonitor is called after the watcher has already delivered Down (entry still in table due to the leak), demonitor finds the entry, sets active=false, and discards it. Functionally harmless but misleading. Again, fixing point 1 resolves this too.

7. pub use crate::monitor::{Down, MonitorRef} — public without MonitorTable

lib.rs:85MonitorTable is correctly kept private (type alias is pub(crate) only via where it's defined). Good.

8. Down::reason field re-exports ExitReason::Display implicitly

monitor.rs:44Down derives Clone but ExitReason also needs to be Clone. That's fine since ExitReason already derives Clone.


What Looks Good

  • The acquire/release pairing (store(false, Release) in demonitor, load(Acquire) in the watcher) is correct for the flag-based cancellation model.
  • MonitorRef as an opaque Copy type with AtomicU64 for ID generation is clean.
  • Symmetric implementation across both execution modes.
  • The example in exit_reason/src/main.rs demonstrates the API well.
  • 12 tests covering the core scenarios; panic observation is a good edge case.

Summary

The main actionable issue is the MonitorTable memory leak (point 1) — every fired monitor leaves a dead entry in the HashMap. Fixing it by having the watcher remove its own entry under the lock also makes the cancellation check atomic, which is a cleaner design. The sleep-based tests (point 5) are a reliability risk worth addressing before this lands.


Automated review by Claude (Anthropic) · sonnet · custom prompt

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 7, 2026

Greptile Summary

This PR introduces unidirectional actor monitoring (monitor/demonitor) as a Phase 3c supervision-trees building block. A new MonitorRef/Down type pair is added in concurrency/src/monitor.rs, and both the tasks-mode and threads-mode Context gain monitor/demonitor methods backed by a shared MonitorTable (Arc<Mutex<HashMap<MonitorRef, Arc<AtomicBool>>>>).

  • Context::monitor spawns a watcher (tokio task or OS thread) that awaits the target's exit and delivers a Down message to the monitoring actor's mailbox, unless the active flag has been cleared.
  • Context::demonitor removes the table entry and clears the flag, providing best-effort cancellation before the Down is delivered.
  • Both modes share the same flag-based cancellation design, and the example (exit_reason Scenario 8) demonstrates one clean stop and one panic being observed concurrently.

Confidence Score: 3/5

The core design is sound, but the MonitorTable grows without bound in any actor that monitors more than a handful of targets over its lifetime — a direct fit for the intended supervision use case.

After a watcher delivers (or skips) a Down notification its entry is never removed from the monitors HashMap; only an explicit demonitor call removes entries. A supervisor monitoring repeatedly-restarting children will accumulate one dead entry per monitored lifetime, which compounds quickly in the exact pattern this feature is designed for.

Both concurrency/src/tasks/actor.rs and concurrency/src/threads/actor.rs carry the leak. The fix is symmetric in both files: capture a clone of the MonitorTable inside the watcher closure and call .remove(&monitor_ref) after the delivery check.

Important Files Changed

Filename Overview
concurrency/src/monitor.rs New file defining MonitorRef (global atomic u64 counter) and Down message. Clean, minimal, correct.
concurrency/src/tasks/actor.rs Adds monitor/demonitor to tasks-mode Context. MonitorTable entries are leaked after a monitor fires — only demonitor removes them.
concurrency/src/threads/actor.rs Adds monitor/demonitor to threads-mode Context. Same MonitorTable leak as tasks mode; additionally spawns one OS thread per monitor.
concurrency/src/lib.rs Re-exports Down and MonitorRef from the new monitor module.
examples/exit_reason/src/main.rs Adds Scenario 8 demonstrating monitor on two workers (clean stop + panic). Straightforward and correct.

Sequence Diagram

sequenceDiagram
    participant S as Supervisor (monitoring actor)
    participant C as Context
    participant MT as MonitorTable
    participant W as Watcher task/thread
    participant T as Target actor

    S->>C: "ctx.monitor(&target_handle)"
    C->>MT: "insert(MonitorRef, active=true)"
    C->>W: spawn(wait for target + deliver Down)
    C-->>S: MonitorRef

    alt target dies naturally
        T-->>W: exit reason available
        W->>MT: (entry NOT removed after firing)
        W->>C: "check active.load() == true"
        W->>S: "send Down { monitor_ref, reason }"
    else demonitor called first
        S->>C: ctx.demonitor(monitor_ref)
        C->>MT: remove(monitor_ref), active.store(false)
        T-->>W: exit reason available
        W->>C: "check active.load() == false"
        note over W: Down suppressed
    end
Loading
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
concurrency/src/tasks/actor.rs:278-291
**MonitorTable entries never removed after a monitor fires**

After the watcher task completes — whether it delivered `Down` or was skipped because `active` was false — the `MonitorRef → Arc<AtomicBool>` entry stays in the `monitors` HashMap forever. Only an explicit `demonitor` call removes entries. For a supervisor that monitors hundreds of short-lived children across restarts, this produces unbounded growth of the table. The fix is to capture an `Arc<Mutex<HashMap<…>>>` clone in the watcher and call `.remove(&monitor_ref)` after the delivery check, mirroring the cleanup that `demonitor` performs.

### Issue 2 of 3
concurrency/src/threads/actor.rs:240-252
**Same MonitorTable leak applies to threads mode**

The threads-mode watcher thread has the same issue: the `MonitorRef` entry is never removed from the `monitors` table after the watcher finishes (regardless of whether it delivered `Down`). The leak affects both execution modes identically.

### Issue 3 of 3
concurrency/src/threads/actor.rs:233-246
**One unbounded OS thread per monitor**

Each `monitor()` call in threads mode spawns a dedicated OS thread (via `rt::spawn`) that blocks indefinitely on `wait_exit_blocking()`. OS threads carry a fixed stack reservation (typically 2–8 MB on Linux). A supervisor monitoring 50 long-lived children simultaneously holds 50 blocked OS threads. The PR description calls this acceptable for "typical supervision patterns," but this constraint is not expressed in any API-level documentation or enforced at runtime. At minimum, the doc-comment on `monitor` in threads mode should note that each active monitor occupies one OS thread for the duration of the target's lifetime.

Reviews (1): Last reviewed commit: "feat: add monitor/demonitor for unidirec..." | Re-trigger Greptile

Comment on lines +278 to +291
let reason = target.wait_exit_async().await;
if active.load(Ordering::Acquire) {
let _ = actor_ref.send(Down {
monitor_ref,
reason,
});
}
});

monitor_ref
}

/// Cancel a previously-set monitor.
///
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 MonitorTable entries never removed after a monitor fires

After the watcher task completes — whether it delivered Down or was skipped because active was false — the MonitorRef → Arc<AtomicBool> entry stays in the monitors HashMap forever. Only an explicit demonitor call removes entries. For a supervisor that monitors hundreds of short-lived children across restarts, this produces unbounded growth of the table. The fix is to capture an Arc<Mutex<HashMap<…>>> clone in the watcher and call .remove(&monitor_ref) after the delivery check, mirroring the cleanup that demonitor performs.

Prompt To Fix With AI
This is a comment left during a code review.
Path: concurrency/src/tasks/actor.rs
Line: 278-291

Comment:
**MonitorTable entries never removed after a monitor fires**

After the watcher task completes — whether it delivered `Down` or was skipped because `active` was false — the `MonitorRef → Arc<AtomicBool>` entry stays in the `monitors` HashMap forever. Only an explicit `demonitor` call removes entries. For a supervisor that monitors hundreds of short-lived children across restarts, this produces unbounded growth of the table. The fix is to capture an `Arc<Mutex<HashMap<…>>>` clone in the watcher and call `.remove(&monitor_ref)` after the delivery check, mirroring the cleanup that `demonitor` performs.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +240 to +252
let reason = target.wait_exit_blocking();
if active.load(Ordering::Acquire) {
let _ = actor_ref.send(Down {
monitor_ref,
reason,
});
}
});

monitor_ref
}

/// Cancel a previously-set monitor.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Same MonitorTable leak applies to threads mode

The threads-mode watcher thread has the same issue: the MonitorRef entry is never removed from the monitors table after the watcher finishes (regardless of whether it delivered Down). The leak affects both execution modes identically.

Prompt To Fix With AI
This is a comment left during a code review.
Path: concurrency/src/threads/actor.rs
Line: 240-252

Comment:
**Same MonitorTable leak applies to threads mode**

The threads-mode watcher thread has the same issue: the `MonitorRef` entry is never removed from the `monitors` table after the watcher finishes (regardless of whether it delivered `Down`). The leak affects both execution modes identically.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +233 to +246
.unwrap_or_else(|p| p.into_inner())
.insert(monitor_ref, active.clone());

let target = target.clone();
let actor_ref = self.actor_ref();

rt::spawn(move || {
let reason = target.wait_exit_blocking();
if active.load(Ordering::Acquire) {
let _ = actor_ref.send(Down {
monitor_ref,
reason,
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 One unbounded OS thread per monitor

Each monitor() call in threads mode spawns a dedicated OS thread (via rt::spawn) that blocks indefinitely on wait_exit_blocking(). OS threads carry a fixed stack reservation (typically 2–8 MB on Linux). A supervisor monitoring 50 long-lived children simultaneously holds 50 blocked OS threads. The PR description calls this acceptable for "typical supervision patterns," but this constraint is not expressed in any API-level documentation or enforced at runtime. At minimum, the doc-comment on monitor in threads mode should note that each active monitor occupies one OS thread for the duration of the target's lifetime.

Prompt To Fix With AI
This is a comment left during a code review.
Path: concurrency/src/threads/actor.rs
Line: 233-246

Comment:
**One unbounded OS thread per monitor**

Each `monitor()` call in threads mode spawns a dedicated OS thread (via `rt::spawn`) that blocks indefinitely on `wait_exit_blocking()`. OS threads carry a fixed stack reservation (typically 2–8 MB on Linux). A supervisor monitoring 50 long-lived children simultaneously holds 50 blocked OS threads. The PR description calls this acceptable for "typical supervision patterns," but this constraint is not expressed in any API-level documentation or enforced at runtime. At minimum, the doc-comment on `monitor` in threads mode should note that each active monitor occupies one OS thread for the duration of the target's lifetime.

How can I resolve this? If you propose a fix, please make it concise.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

🤖 Codex Code Review

Findings

  1. High: monitor() creates a detached waiter that is never tied to the observing actor’s lifecycle. See concurrency/src/tasks/actor.rs and concurrency/src/threads/actor.rs. If the watcher exits before the target, that waiter stays alive until the target dies. In tasks mode this can also pin a blocking-pool thread when the target is thread-backed (concurrency/src/child_handle.rs); in threads mode it leaks a full OS thread per monitor because rt::spawn is std::thread::spawn (rt/src/threads/mod.rs). The waiter should also observe the watcher’s cancellation/completion and exit early.

  2. Medium: successful monitors never clean themselves out of self.monitors. Entries are inserted at concurrency/src/tasks/actor.rs and concurrency/src/threads/actor.rs, but only explicit demonitor() removes them at concurrency/src/tasks/actor.rs and concurrency/src/threads/actor.rs. After a normal Down, long-lived supervisors will accumulate stale MonitorRef entries unless callers manually demonitor after every notification. The waiter path should remove the entry when it wins the race to emit Down.

The new tests only cover target-exits-first. I’d add a regression where the watcher stops before the target, because that is where issue 1 shows up.

I couldn’t run cargo test locally because the workspace needs crates that aren’t available in the offline Cargo cache (tokio resolution failed).


Automated review by OpenAI Codex · gpt-5.4 · custom prompt

@ElFantasma
Copy link
Copy Markdown
Collaborator Author

Bot Review Responses

Most findings already addressed in ee7ebc5; remaining items addressed in 4067f09.


Already fixed (ee7ebc5)

Memory leak in MonitorTable — Kimi #1, Claude #1, Greptile, Codex #2

Fixed. Watcher now removes its entry from the monitors HashMap after the target dies, regardless of whether Down was delivered. Cloned the Arc<Mutex<HashMap>> into the watcher closure to enable this. Added monitor_table_is_cleaned_up_after_target_dies test.

Down should derive PartialEq — Claude implicit

Fixed. Added #[derive(PartialEq)].


Now fixed (4067f09)

OS thread per watcher in threads mode — Claude #3, Codex #1, Greptile Issue 3

Fixed. Added doc note on threads-mode Context::monitor:

Resource cost (threads mode): each active monitor occupies one OS thread for the duration of the target's lifetime, blocked on the target's completion signal. For supervisors with many long-lived children, consider using tasks mode instead.

Unused last_ref field in tasks Watcher test — Kimi #3

Fixed. Removed the dead field.

Add regression test for "monitoring actor stops before target" — Codex

Fixed. Added monitoring_actor_stops_before_target_does_not_panic test that verifies the watcher cleans up gracefully when the monitoring actor stops first and the target dies later.


Discussed, deliberately not changing

Watcher outlives monitoring actor (orphaned watcher) — Claude #4, Codex #1

After discussion, decided against fixing this. The cost is minimal (one tokio task or OS thread holding a few Arc clones), the watcher does eventually clean up when the target dies, and a tokio::select! fix is asymmetric — threads mode can't cleanly cancel a blocking wait without bigger ChildHandle API changes. The fix would add complexity for marginal benefit. The new regression test verifies this path doesn't crash.

Sleep-based test synchronization is flaky — Claude #5

Acknowledged. Consistent with existing test patterns in this codebase. Could be improved with a polling helper but not blocking for this PR.

Code duplication between tasks and threads — Kimi suggestion

Consistent with the rest of the codebase. The spawn/wait primitives differ between modes, making meaningful sharing awkward.

Mutex poison handling — Kimi #4

unwrap_or_else(|p| p.into_inner()) is the consistent pattern across this codebase.

Ordering::Relaxed for ID generation — both bots agreed correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant