Skip to content

fix: robust notification loop — per-connection locking, alive check, drain window, and startup cleanup#77

Closed
ruan330 wants to merge 1 commit intoopenabdev:mainfrom
ruan330:fix/robust-notification-loop
Closed

fix: robust notification loop — per-connection locking, alive check, drain window, and startup cleanup#77
ruan330 wants to merge 1 commit intoopenabdev:mainfrom
ruan330:fix/robust-notification-loop

Conversation

@ruan330
Copy link
Copy Markdown

@ruan330 ruan330 commented Apr 6, 2026

Problem

The notification loop in stream_prompt is built on three assumptions that don't hold in production with long-running ACP backends like Claude Code (#76):

  1. ACP events arrive in orderend_turn sometimes arrives before the final agent_message_chunk, causing empty responses
  2. Prompts always complete — long tool calls (build commands, test suites) block the loop forever, and with the global pool write lock, freeze the entire broker
  3. Session lifecycle is self-managing — after restart, stale Discord threads create zombie sessions that consume pool slots

These are three symptoms of one root cause: the notification loop has no resilience against real-world conditions.

Solution

A single cohesive fix addressing all three, not three separate patches:

Per-connection locking (pool.rs)

with_connection() holds a global write lock for the entire streaming duration. One busy session blocks all others (#58).

Replace HashMap<String, AcpConnection> with HashMap<String, Arc<Mutex<AcpConnection>>>. Pool RwLock is now only held for brief lookups. Each connection has its own Mutex.

// Before: global write lock held for minutes
pool.with_connection(thread_key, |conn| { /* streaming */ })

// After: brief read lock for lookup, per-connection lock for use
let conn_arc = pool.get_connection(thread_key).await?;
let mut conn = conn_arc.lock().await;  // only locks THIS connection

Alive check + hard timeout (discord.rs)

Replace bare while let rx.recv() with tokio::select!:

  • Every 30s: check if agent process is alive. Dead → break immediately
  • After 30 minutes: hard timeout safety net. Alive but stuck → break

Correctly handles long tool calls (process alive → wait) without false timeouts, while preventing infinite blocking.

Drain window (discord.rs)

After receiving end_turn, drain the notification channel for 200ms to capture late-arriving text chunks:

if notification.id.is_some() {
    let drain_until = Instant::now() + Duration::from_millis(200);
    while let Ok(Some(n)) = timeout_at(drain_until, rx.recv()).await {
        if let Some(AcpEvent::Text(t)) = classify_notification(&n) {
            text_buf.push_str(&t);
        }
    }
    break;
}

Empty response fallback (discord.rs)

If text_buf is empty after draining but tool activity was recorded, compose a fallback from tool lines instead of showing "(no response)".

Startup thread cleanup (discord.rs, ready handler)

On startup, archive all active threads in allowed channels created by this bot. Prevents stale threads from spawning zombie sessions after restart.

Tradeoffs

Decision Cost Why
200ms drain window Adds 200ms to every prompt completion Small; avoids losing entire responses
30-min hard timeout Very long tasks get interrupted Safety net; should not trigger in normal use
Auto-archive on startup Can't resume old threads Old threads have no session context anyway
Arc<Mutex> vs checkout pattern Slightly more memory per connection Correct abstraction; enables future prompt queueing via try_lock()

Testing

Tested with claude-agent-acp backend, concurrent sessions:

  • Two threads running simultaneously — no cross-session blocking ✅
  • Long tool call (flutter build, 5+ min) — other sessions respond normally ✅
  • Process crash — detected within 30s, session released ✅
  • Stale threads after restart — auto-archived on startup ✅
  • ACP event ordering violation — text captured via drain window ✅

Changes

File Lines What
pool.rs +54/-43 Arc<Mutex<AcpConnection>>, get_connection(), remove with_connection()
discord.rs +145/-90 Per-connection lock, alive check, hard timeout, drain window, fallback, startup cleanup

Supersedes #59. Closes #58. Ref #76.

…drain window, and startup cleanup

The notification loop in stream_prompt assumes ordered ACP events,
bounded prompts, and self-managing session lifecycle. None of these
hold in production with Claude Code:

1. end_turn response arrives before final agent_message_chunk
2. Long tool calls (flutter build, test suites) block forever
3. After restart, stale Discord threads create zombie sessions

This commit addresses all three as a single cohesive fix:

Per-connection locking (pool.rs):
  Replace HashMap<String, AcpConnection> with
  HashMap<String, Arc<Mutex<AcpConnection>>>. Pool RwLock held only
  for brief lookups. Sessions operate independently — one busy
  session no longer blocks all others.

Alive check + hard timeout (discord.rs):
  Replace bare while-let-recv with tokio::select that checks process
  liveness every 30s. Alive → keep waiting. Dead → break immediately.
  Hard 30-min timeout as safety net against infinite tool calls.

Drain window (discord.rs):
  After receiving end_turn, drain the notification channel for 200ms
  to capture late-arriving text chunks. Prevents empty responses
  caused by ACP event ordering violations.

Empty response fallback (discord.rs):
  If text_buf is empty after draining but tool activity was recorded,
  compose a fallback from tool lines. Users always see something.

Startup thread cleanup (discord.rs, ready handler):
  On startup, archive all active threads in allowed channels created
  by this bot. Prevents stale threads from spawning zombie sessions.

Supersedes openabdev#59. Closes openabdev#58. Ref openabdev#76.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f991c783d6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 89 to +92
let mut conns = self.connections.write().await;
let stale: Vec<String> = conns
.iter()
.filter(|(_, c)| c.last_active < cutoff || !c.alive())
.map(|(k, _)| k.clone())
.collect();
let mut stale = Vec::new();
for (key, conn_arc) in conns.iter() {
let conn = conn_arc.lock().await;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Don't hold pool write lock while waiting on connection mutex

cleanup_idle now grabs self.connections.write() and then awaits each conn_arc.lock(). If any session is actively streaming (its connection mutex is held in stream_prompt), the cleanup task blocks on that await while still holding the pool write lock, which prevents get_or_create/get_connection for unrelated threads. Because main.rs runs this cleanup every 60s, one long prompt can periodically stall the whole broker again.

Useful? React with 👍 / 👎.

Comment on lines +184 to +186
let is_ours = thread.parent_id
.map_or(false, |pid| self.allowed_channels.contains(&pid.get()));
if !is_ours { continue; }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Treat empty allowed_channels as wildcard in startup cleanup

The message path treats an empty allowlist as "all channels allowed", but the startup cleanup path requires self.allowed_channels.contains(parent_id), so when allowed_channels is empty (the default), is_ours is always false and no stale threads are archived. That makes the new zombie-thread cleanup feature silently ineffective in default deployments.

Useful? React with 👍 / 👎.

@ruan330
Copy link
Copy Markdown
Author

ruan330 commented Apr 10, 2026

Closing this in favor of 3 smaller, focused PRs — on reflection this one bundled too many concerns for a clean review. Sorry for the churn.

The split plan:

  1. Per-connection Arc<Mutex<AcpConnection>> — just the pool.rs architecture change + discord.rs call sites. This implements the locking approach discussed in RFC RFC: Session Management — Design Proposal #78 §2b, and closes fix: pool write lock held during entire prompt streaming causes cross-session deadlock #58 / supersedes fix: per-connection locking to prevent cross-session deadlock during long prompts #59.
  2. Notification loop resilience — drain window + empty-response fallback (end_turn can arrive before the last agent_message_chunk via tokio::select!). Fixes fix: notification loop assumes ordered events, bounded prompts, and managed session lifecycle — none hold in production #76, which we have a production repro for. ~20-30 lines.
  3. Alive check + hard timeout — defensive safety net. 30s alive check + 30min hard timeout via tokio::select!.

Dropped from original scope: startup thread cleanup. We reversed that decision in our fork — archiving active threads on restart loses user context, and with per-thread state persistence (related to RFC #78 Phase 2) there's no longer a correctness reason to do it.

I'll send them in order, starting with #1 soon. Happy to discuss any of these beforehand if you'd prefer a different approach.

@ruan330 ruan330 closed this Apr 10, 2026
ruan330 added a commit to ruan330/openab that referenced this pull request Apr 10, 2026
`SessionPool::with_connection` currently holds the pool's write lock
for the entire callback duration. Because `stream_prompt` in discord.rs
runs inside that callback and can take many seconds (or minutes) to
drain an ACP turn, every other Discord thread is blocked from touching
the pool while one session streams — even for `get_or_create` on a
completely unrelated thread_id, which only needs the read lock.

The fix: wrap each `AcpConnection` in `Arc<Mutex<_>>`. `with_connection`
now takes only the pool's read lock long enough to clone the Arc, then
locks that specific connection's mutex for the callback. The pool lock
is released immediately, so:

  - Other sessions can still stream concurrently.
  - `get_or_create` on unrelated thread_ids proceeds without waiting.
  - Rebuilds still take the write lock briefly (correct — structural
    change to the HashMap).

This matches the architecture discussed in openabdev#78 §2b and closes openabdev#58
(pool write lock deadlock during long-running notification loops).

Supersedes openabdev#59 and openabdev#77. Scoped to just the locking change so it can
be reviewed in isolation — notification-loop resilience and alive
check will follow as separate PRs.

No call-site changes: the `with_connection` signature is unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
ruan330 added a commit to ruan330/openab that referenced this pull request Apr 10, 2026
`SessionPool::with_connection` currently holds the pool's write lock
for the entire callback duration. Because `stream_prompt` in discord.rs
runs inside that callback and can take many seconds (or minutes) to
drain an ACP turn, every other Discord thread is blocked from touching
the pool while one session streams — even for `get_or_create` on a
completely unrelated thread_id, which only needs the read lock.

The fix: wrap each `AcpConnection` in `Arc<Mutex<_>>`. `with_connection`
now takes only the pool's read lock long enough to clone the Arc, then
locks that specific connection's mutex for the callback. The pool lock
is released immediately, so:

  - Other sessions can still stream concurrently.
  - `get_or_create` on unrelated thread_ids proceeds without waiting.
  - Rebuilds still take the write lock briefly (correct — structural
    change to the HashMap).

`cleanup_idle` uses a snapshot-then-probe pattern so the same rule
holds on the cleanup path: clone the Arcs under the read lock, release
it, then `try_lock` each connection individually. A busy connection
is, by definition, not idle — `try_lock` lets us skip it without ever
awaiting on a per-connection mutex while holding the pool lock. The
write lock is only re-acquired if there are stale entries to remove.
This addresses the P1 review comment left by the Codex bot on the
original openabdev#77, which noted that awaiting `conn_arc.lock()` from inside
a held pool write lock would re-introduce the very starvation this
refactor is meant to eliminate.

This matches the architecture discussed in openabdev#78 §2b and closes openabdev#58
(pool write lock deadlock during long-running notification loops).

Supersedes openabdev#59 and openabdev#77. Scoped to just the locking change so it can
be reviewed in isolation — notification-loop resilience and alive
check will follow as separate PRs.

No call-site changes: the `with_connection` signature is unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fix: pool write lock held during entire prompt streaming causes cross-session deadlock

2 participants