fix: per-connection locking to prevent cross-session deadlock during long prompts#59
fix: per-connection locking to prevent cross-session deadlock during long prompts#59ruan330 wants to merge 1 commit intoopenabdev:mainfrom
Conversation
…long prompts
pool.with_connection() holds a global write lock on the connections
HashMap for the entire duration of stream_prompt(), which can run for
minutes during long tool calls. One busy session blocks ALL other
sessions, cleanup tasks, and new session creation.
Root cause: HashMap::get_mut requires &mut self, forcing a write lock
on the entire map just to access one connection.
Fix: replace HashMap<String, AcpConnection> with
HashMap<String, Arc<Mutex<AcpConnection>>>. The pool RwLock is now
only held briefly for lookups (read lock, milliseconds). Each
connection has its own Mutex, so sessions operate independently.
Additionally, replace the bare `while let rx.recv().await` loop with
a `tokio::select!` that checks process liveness every 30 seconds.
Long tool calls (e.g. flutter build, test suites) send no ACP
notifications for minutes — that's fine as long as the process is
alive. If the process dies, break immediately instead of hanging
forever.
Changes:
pool.rs: Arc<Mutex<AcpConnection>> storage, get_connection()
returns Arc clone via brief read lock, remove with_connection()
discord.rs: stream_prompt() uses get_connection() + per-connection
lock, alive check loop, explicit drop(conn) after prompt_done()
Closes openabdev#58
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7fd8a48342
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let mut conns = self.connections.write().await; | ||
| let stale: Vec<String> = conns | ||
| .iter() | ||
| .filter(|(_, c)| c.last_active < cutoff || !c.alive()) | ||
| .map(|(k, _)| k.clone()) | ||
| .collect(); | ||
| let mut stale = Vec::new(); | ||
| for (key, conn_arc) in conns.iter() { | ||
| let conn = conn_arc.lock().await; |
There was a problem hiding this comment.
Avoid blocking on connection mutex under pool write lock
cleanup_idle now acquires the global pool write lock and then awaits conn_arc.lock(). During any long-running prompt, that mutex is held for minutes, so the cleanup task can end up holding the write lock for the same duration. While this happens, all get_or_create/get_connection calls are blocked, which reintroduces the cross-session stall this change was meant to fix (the cleanup loop in main.rs runs every 60s, so this can happen regularly under load).
Useful? React with 👍 / 👎.
| let conns = self.connections.read().await; | ||
| if let Some(conn) = conns.get(thread_id) { | ||
| if let Some(conn_arc) = conns.get(thread_id) { | ||
| let conn = conn_arc.lock().await; | ||
| if conn.alive() { |
There was a problem hiding this comment.
Release pool read lock before awaiting session mutex
In get_or_create, the read lock on connections is held while waiting on conn_arc.lock(). If a second message arrives for a thread that is already processing a long prompt, this read lock can be held for minutes; writers are then blocked, so creating/rebuilding sessions for other threads is delayed behind that one busy thread. This creates a cross-session bottleneck whenever same-thread requests overlap.
Useful? React with 👍 / 👎.
…drain window, and startup cleanup The notification loop in stream_prompt assumes ordered ACP events, bounded prompts, and self-managing session lifecycle. None of these hold in production with Claude Code: 1. end_turn response arrives before final agent_message_chunk 2. Long tool calls (flutter build, test suites) block forever 3. After restart, stale Discord threads create zombie sessions This commit addresses all three as a single cohesive fix: Per-connection locking (pool.rs): Replace HashMap<String, AcpConnection> with HashMap<String, Arc<Mutex<AcpConnection>>>. Pool RwLock held only for brief lookups. Sessions operate independently — one busy session no longer blocks all others. Alive check + hard timeout (discord.rs): Replace bare while-let-recv with tokio::select that checks process liveness every 30s. Alive → keep waiting. Dead → break immediately. Hard 30-min timeout as safety net against infinite tool calls. Drain window (discord.rs): After receiving end_turn, drain the notification channel for 200ms to capture late-arriving text chunks. Prevents empty responses caused by ACP event ordering violations. Empty response fallback (discord.rs): If text_buf is empty after draining but tool activity was recorded, compose a fallback from tool lines. Users always see something. Startup thread cleanup (discord.rs, ready handler): On startup, archive all active threads in allowed channels created by this bot. Prevents stale threads from spawning zombie sessions. Supersedes openabdev#59. Closes openabdev#58. Ref openabdev#76.
thepagent
left a comment
There was a problem hiding this comment.
Thanks for tackling #58 — the Arc<Mutex<AcpConnection>> approach is the right pattern and the alive-check loop is a solid addition.
Two things to address before merging (Codex flagged both as well):
1. cleanup_idle holds write lock while awaiting per-connection Mutex (P1)
let mut conns = self.connections.write().await; // write lock held
for (key, conn_arc) in conns.iter() {
let conn = conn_arc.lock().await; // ← blocks if session is streaming
}If any session is mid-prompt, this blocks the write lock for minutes — re-introducing the cross-session stall. Fix: collect stale keys under a read lock first (clone the Arc, lock outside the pool lock), then take a write lock only for removal.
2. get_or_create read lock held during alive check (P2)
Same pattern — the read lock is held while awaiting conn_arc.lock(). If a thread is busy, this blocks all writers. Fix: clone the Arc, drop the read lock, then lock the connection.
let conn_arc = {
let conns = self.connections.read().await;
conns.get(thread_id).cloned()
};
if let Some(arc) = conn_arc {
if arc.lock().await.alive() {
return Ok(());
}
}The core change (per-connection locking + get_connection) is good. Just need to fix the lock ordering in these two spots.
|
Closing — superseded by #77 which includes this fix along with alive check, drain window, and startup cleanup. |
`SessionPool::with_connection` currently holds the pool's write lock
for the entire callback duration. Because `stream_prompt` in discord.rs
runs inside that callback and can take many seconds (or minutes) to
drain an ACP turn, every other Discord thread is blocked from touching
the pool while one session streams — even for `get_or_create` on a
completely unrelated thread_id, which only needs the read lock.
The fix: wrap each `AcpConnection` in `Arc<Mutex<_>>`. `with_connection`
now takes only the pool's read lock long enough to clone the Arc, then
locks that specific connection's mutex for the callback. The pool lock
is released immediately, so:
- Other sessions can still stream concurrently.
- `get_or_create` on unrelated thread_ids proceeds without waiting.
- Rebuilds still take the write lock briefly (correct — structural
change to the HashMap).
This matches the architecture discussed in openabdev#78 §2b and closes openabdev#58
(pool write lock deadlock during long-running notification loops).
Supersedes openabdev#59 and openabdev#77. Scoped to just the locking change so it can
be reviewed in isolation — notification-loop resilience and alive
check will follow as separate PRs.
No call-site changes: the `with_connection` signature is unchanged.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
`SessionPool::with_connection` currently holds the pool's write lock
for the entire callback duration. Because `stream_prompt` in discord.rs
runs inside that callback and can take many seconds (or minutes) to
drain an ACP turn, every other Discord thread is blocked from touching
the pool while one session streams — even for `get_or_create` on a
completely unrelated thread_id, which only needs the read lock.
The fix: wrap each `AcpConnection` in `Arc<Mutex<_>>`. `with_connection`
now takes only the pool's read lock long enough to clone the Arc, then
locks that specific connection's mutex for the callback. The pool lock
is released immediately, so:
- Other sessions can still stream concurrently.
- `get_or_create` on unrelated thread_ids proceeds without waiting.
- Rebuilds still take the write lock briefly (correct — structural
change to the HashMap).
`cleanup_idle` uses a snapshot-then-probe pattern so the same rule
holds on the cleanup path: clone the Arcs under the read lock, release
it, then `try_lock` each connection individually. A busy connection
is, by definition, not idle — `try_lock` lets us skip it without ever
awaiting on a per-connection mutex while holding the pool lock. The
write lock is only re-acquired if there are stale entries to remove.
This addresses the P1 review comment left by the Codex bot on the
original openabdev#77, which noted that awaiting `conn_arc.lock()` from inside
a held pool write lock would re-introduce the very starvation this
refactor is meant to eliminate.
This matches the architecture discussed in openabdev#78 §2b and closes openabdev#58
(pool write lock deadlock during long-running notification loops).
Supersedes openabdev#59 and openabdev#77. Scoped to just the locking change so it can
be reviewed in isolation — notification-loop resilience and alive
check will follow as separate PRs.
No call-site changes: the `with_connection` signature is unchanged.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Problem
SessionPool::with_connection()acquires a global write lock on the connections HashMap and holds it for the entire duration ofstream_prompt()— which can run for minutes during long tool calls (flutter build, test suites, git operations).One busy session blocks all other sessions, cleanup tasks, and new session creation. The entire broker becomes unresponsive.
Reproduction
flutter build linux)get_or_createwaits for the write lockRoot cause
HashMap::get_mutrequires&mut self, forcing a write lock on the entire map to access one connection. The lock is held insidewith_connectionfor the full streaming duration:Solution
Per-connection locking
Replace
HashMap<String, AcpConnection>withHashMap<String, Arc<Mutex<AcpConnection>>>.The pool
RwLockis now only held briefly for lookups (read lock, milliseconds). Each connection has its ownMutex, so sessions operate independently:Alive check instead of unbounded blocking
The notification loop (
while let rx.recv().await) blocks forever if the prompt response is never received. This is dangerous because the per-connection lock is held during this time.Replace with a
tokio::select!that checks process liveness every 30 seconds:This correctly handles:
Why Arc<Mutex> over alternatives
with_connectionbut useRwLockper connectionMutexis correct for exclusive accessArc<Mutex<AcpConnection>>try_lock()The
Arc<Mutex>approach also naturally supports future enhancements: callers can usetry_lock()to detect busy connections and queue prompts via ACP'spromptQueueingcapability instead of blocking.Changes
pool.rsArc<Mutex<AcpConnection>>storage;get_connection()returnsArcclone via brief read lock; removewith_connection()discord.rsstream_prompt()usesget_connection()+ per-connection lock;tokio::select!alive check loop; explicitdrop(conn)afterprompt_done()Testing
Tested with
claude-agent-acpbackend, two concurrent sessions:flutter build linux(5+ minutes) — Thread B responds normally ✅Agentsubagent task — Thread B creates new session and responds ✅Closes #58