refactor(pool): per-connection Arc<Mutex> to unblock concurrent sessions#183
Open
ruan330 wants to merge 1 commit intoopenabdev:mainfrom
Open
refactor(pool): per-connection Arc<Mutex> to unblock concurrent sessions#183ruan330 wants to merge 1 commit intoopenabdev:mainfrom
ruan330 wants to merge 1 commit intoopenabdev:mainfrom
Conversation
`SessionPool::with_connection` currently holds the pool's write lock
for the entire callback duration. Because `stream_prompt` in discord.rs
runs inside that callback and can take many seconds (or minutes) to
drain an ACP turn, every other Discord thread is blocked from touching
the pool while one session streams — even for `get_or_create` on a
completely unrelated thread_id, which only needs the read lock.
The fix: wrap each `AcpConnection` in `Arc<Mutex<_>>`. `with_connection`
now takes only the pool's read lock long enough to clone the Arc, then
locks that specific connection's mutex for the callback. The pool lock
is released immediately, so:
- Other sessions can still stream concurrently.
- `get_or_create` on unrelated thread_ids proceeds without waiting.
- Rebuilds still take the write lock briefly (correct — structural
change to the HashMap).
`cleanup_idle` uses a snapshot-then-probe pattern so the same rule
holds on the cleanup path: clone the Arcs under the read lock, release
it, then `try_lock` each connection individually. A busy connection
is, by definition, not idle — `try_lock` lets us skip it without ever
awaiting on a per-connection mutex while holding the pool lock. The
write lock is only re-acquired if there are stale entries to remove.
This addresses the P1 review comment left by the Codex bot on the
original openabdev#77, which noted that awaiting `conn_arc.lock()` from inside
a held pool write lock would re-introduce the very starvation this
refactor is meant to eliminate.
This matches the architecture discussed in openabdev#78 §2b and closes openabdev#58
(pool write lock deadlock during long-running notification loops).
Supersedes openabdev#59 and openabdev#77. Scoped to just the locking change so it can
be reviewed in isolation — notification-loop resilience and alive
check will follow as separate PRs.
No call-site changes: the `with_connection` signature is unchanged.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
93f8f6b to
f823131
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
SessionPool::with_connectioncurrently holds the pool's write lock for the entire callback duration. Becausestream_promptindiscord.rsruns inside that callback and can take many seconds — or minutes — to drain an ACP turn, every other Discord thread is blocked from touching the pool while one session is streaming, even forget_or_createon a completely unrelatedthread_id(which only needs the read lock).In production on our fork (which runs a few dozen concurrent Discord threads against one broker), this manifests as:
get_or_createon a fresh thread can block for the duration of an unrelated streaming turn.Fix
Wrap each
AcpConnectioninArc<Mutex<_>>:with_connectiontakes the pool's read lock only long enough to clone the Arc for the targetthread_id, then releases it.get_or_createon unrelatedthread_ids is no longer blocked.HashMap).cleanup_idleis updated to follow the same rule on the cleanup path: snapshot the Arcs under the read lock, release it, thentry_lockeach connection individually. A connection that's currently in use is by definition not idle, sotry_locklets us skip it without ever awaiting on a per-connection mutex while holding the pool lock. The write lock is only re-acquired if there are stale entries to remove.The
with_connectionsignature is unchanged, so no call sites need to be updated — the fix is entirely internal topool.rs. Diff is +51 / -21 in a single file.Why this approach
A few alternatives we considered:
RwLock, convert callers to read lock. Doesn't work — callers need&mut AcpConnectionto drivesession_prompt, and a read lock can't hand out mutable refs.DashMap<String, AcpConnection>.DashMap's shard locks are synchronous, so we'd still need per-entry async coordination to let a callback hold exclusive access across.awaitpoints. Doesn't avoid the underlying need for a per-connection async mutex.Arc<Mutex<_>>(this PR). Minimal change, preserves the existingwith_connectionAPI, fixes the root cause (pool lock held during streaming). This matches the architecture discussed in RFC: Session Management — Design Proposal #78 §2b.Scope
This PR is only the locking change. Previously bundled in #77 alongside notification-loop resilience, an alive-check safety net, and a startup cleanup routine — on reflection that was too much for a single review. Closing #77 and splitting into three focused PRs; this is the first.
Next PRs (to follow in order, from separate branches off
main):end_turncan arrive before the finalagent_message_chunkviatokio::select!racing; fix is a small drain window + empty-response fallback. Fixes fix: notification loop assumes ordered events, bounded prompts, and managed session lifecycle — none hold in production #76.notification_loop(30s alive check / 30min hard ceiling).Supersedes #59 and #77. Closes #58.
Testing
main(0588893, current tip).cargo build --release— no warnings or errors.get_or_createlatency.Happy to add a focused test if you'd like — the behavioral change is subtle (a session that's streaming no longer blocks unrelated
get_or_createcalls), and a concurrent-access test with two tasks would make it visible.