feat(client-pool): expose PoolHandle API with fair scheduling and graceful shutdown#498
feat(client-pool): expose PoolHandle API with fair scheduling and graceful shutdown#498
Conversation
Introduce a comprehensive 457-line execution plan document laying out the design, constraints, risks, progress, decision log, and API plan for roadmap item 11.2.2. This plan covers exposing a new PoolHandle API to provide fair pooled acquisition for logical sessions, clarifying fairness policies, scheduler design, testing requirements, and documentation guidance to improve the Wireframe client pool subsystem. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Reviewer's GuideImplements a fairness-aware PoolHandle API for WireframeClientPool, introduces configurable PoolFairnessPolicy and a PoolScheduler, refactors the pool internals around a shared ClientPoolInner used by both pool and handles, and adds focused unit/BDD tests and docs; pool_handle tests are refactored to use a shared build_preamble_pool helper to set up preamble-aware pools. Sequence diagram for PoolHandle-based fair lease acquisitionsequenceDiagram
actor Client
participant WireframeClientPool
participant PoolHandle
participant ClientPoolInner
participant PoolScheduler
participant PoolSlot
participant PooledClientLease
Client->>WireframeClientPool: connect_pool(...)
WireframeClientPool-->>Client: WireframeClientPool
Client->>WireframeClientPool: handle()
WireframeClientPool->>ClientPoolInner: scheduler.register_handle()
ClientPoolInner->>PoolScheduler: register_handle()
PoolScheduler-->>ClientPoolInner: handle_id
ClientPoolInner-->>WireframeClientPool: handle_id
WireframeClientPool-->>Client: PoolHandle
Client->>PoolHandle: acquire()
PoolHandle->>ClientPoolInner: scheduler.acquire_for_handle(inner, handle_id)
ClientPoolInner->>PoolScheduler: acquire_for_handle(inner, handle_id)
alt immediate_capacity
PoolScheduler->>ClientPoolInner: inner.try_acquire_immediately()
ClientPoolInner->>ClientPoolInner: ordered_slots()
ClientPoolInner->>PoolSlot: try_acquire_permit()
PoolSlot-->>ClientPoolInner: OwnedSemaphorePermit
ClientPoolInner->>PooledClientLease: new(slot, permit, None)
PooledClientLease-->>ClientPoolInner: lease
ClientPoolInner-->>PoolScheduler: lease
PoolScheduler-->>PoolHandle: lease
PoolHandle-->>Client: PooledClientLease
else contended_capacity
PoolScheduler->>PoolScheduler: enqueue_waiter(handle_id)
PoolScheduler-->>PoolHandle: pending future
note over PoolScheduler,ClientPoolInner: later, when capacity returns
loop service_waiters
PoolScheduler->>ClientPoolInner: acquire_slot_permit()
ClientPoolInner->>ClientPoolInner: ordered_slots()
ClientPoolInner->>PoolSlot: acquire_permit()
PoolSlot-->>ClientPoolInner: OwnedSemaphorePermit
ClientPoolInner-->>PoolScheduler: (slot, permit)
PoolScheduler->>PooledClientLease: new(slot, permit, Some(inner))
PooledClientLease-->>PoolScheduler: lease
PoolScheduler-->>PoolHandle: lease
end
PoolHandle-->>Client: PooledClientLease
end
Client->>PooledClientLease: call(request)
PooledClientLease-->>Client: response
Sequence diagram for PooledClientLease drop notifying PoolSchedulersequenceDiagram
actor Client
participant PoolHandle
participant PooledClientLease
participant ClientPoolInner
participant PoolScheduler
Client->>PoolHandle: acquire()
PoolHandle-->>Client: PooledClientLease
Client->>PooledClientLease: use lease
Client-->>PooledClientLease: drop(lease)
PooledClientLease->>PooledClientLease: Drop::drop()
alt release_inner_present
PooledClientLease->>ClientPoolInner: inner = release_inner
PooledClientLease->>PoolScheduler: notify_capacity_available(inner)
PoolScheduler->>PoolScheduler: kick(inner)
PoolScheduler->>ClientPoolInner: acquire_slot_permit()
ClientPoolInner-->>PoolScheduler: (slot, permit) or error
PoolScheduler-->>waiting_handle: Result<PooledClientLease>
else no_release_inner
PooledClientLease-->>PooledClientLease: no scheduler notification
end
Class diagram for fairness-aware WireframeClientPool and PoolHandle APIclassDiagram
class WireframeClientPool {
+handle() PoolHandle
+acquire() PooledClientLease
+close() void
+new(addr, pool_config, parts) Result
}
class ClientPoolInner {
+slots Arc~[Arc~PoolSlot~~]~
+next_slot AtomicUsize
+scheduler Arc~PoolScheduler~
+try_acquire_immediately() Option~PooledClientLease~
+acquire_slot_permit() Result~(Arc~PoolSlot~, OwnedSemaphorePermit)~
+ordered_slots() Vec~Arc~PoolSlot~~
}
class PoolHandle {
-inner Arc~ClientPoolInner~
-handle_id u64
+new(inner, handle_id) PoolHandle
+acquire() PooledClientLease
+call(request) Resp
}
class PooledClientLease {
-slot Arc~PoolSlot~
-_permit OwnedSemaphorePermit
-release_inner Option~Arc~ClientPoolInner~~
+new(slot, permit, release_inner) PooledClientLease
+send(request) Result
+receive() Result
+call(request) Result
+call_correlated(request) Result
<<Drop>>
}
class PoolScheduler {
-fairness_policy PoolFairnessPolicy
-next_handle_id AtomicU64
-is_servicing AtomicBool
-state Mutex~SchedulerState~
+new(fairness_policy) PoolScheduler
+register_handle() u64
+deregister_handle(handle_id) void
+acquire_for_handle(inner, handle_id) Result~PooledClientLease~
+notify_capacity_available(inner) void
-kick(inner) void
-restart_if_waiters() bool
-take_next_waiter_or_stop() Option~WaiterSender~
-service_waiters(inner) void
}
class SchedulerState {
-waiters HashMap~u64, WaiterSender~
-fifo_waiters VecDeque~u64~
-round_robin_handles VecDeque~u64~
+new() SchedulerState
+register_handle(handle_id) void
+deregister_handle(handle_id) void
+enqueue_waiter(handle_id, sender) void
+has_waiters() bool
+take_next_waiter(policy) Option~WaiterSender~
+take_next_fifo_waiter() Option~WaiterSender~
+take_next_round_robin_waiter() Option~WaiterSender~
}
class ClientPoolConfig {
-pool_size usize
-max_in_flight_per_socket usize
-idle_timeout Duration
-fairness_policy PoolFairnessPolicy
+default() ClientPoolConfig
+pool_size(value) ClientPoolConfig
+max_in_flight_per_socket(value) ClientPoolConfig
+idle_timeout(value) ClientPoolConfig
+fairness_policy(fairness_policy) ClientPoolConfig
+pool_size_value() usize
+max_in_flight_per_socket_value() usize
+idle_timeout_value() Duration
+fairness_policy_value() PoolFairnessPolicy
}
class PoolFairnessPolicy {
<<enum>>
RoundRobin
Fifo
}
class PoolSlot {
}
class ClientError {
}
WireframeClientPool --> ClientPoolInner : inner
ClientPoolInner --> PoolScheduler : scheduler
ClientPoolInner --> PoolSlot : slots
PoolHandle --> ClientPoolInner : inner
PooledClientLease --> ClientPoolInner : release_inner
PoolScheduler --> SchedulerState : state
PoolScheduler --> PoolFairnessPolicy : fairness_policy
ClientPoolConfig --> PoolFairnessPolicy : fairness_policy
PoolScheduler --> PooledClientLease : constructs
PoolScheduler --> ClientError : returns
WireframeClientPool ..> PoolHandle : handle()
WireframeClientPool ..> PooledClientLease : acquire()
PoolHandle ..> PooledClientLease : acquire()
PooledClientLease ..> ClientError : results
ClientPoolInner ..> ClientError : results
PoolScheduler ..> ClientPoolInner : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
WalkthroughSummarise the addition of a fairness-oriented PoolHandle API with PoolFairnessPolicy (RoundRobin, Fifo); refactor pool internals into ClientPoolInner with a PoolScheduler; implement handle-based acquire/call and close paths; extend lease drop semantics; and add tests and docs to validate fairness, back‑pressure and reuse behaviours. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant PoolHandle as Pool Handle
participant PoolScheduler as Pool Scheduler
participant PoolInner as Pool Inner
participant PoolSlot as Pool Slot
Client->>PoolHandle: acquire()
PoolHandle->>PoolScheduler: acquire_for_handle(inner, handle_id)
alt Immediate availability
PoolScheduler->>PoolInner: try_acquire_immediately()
PoolInner->>PoolSlot: check availability
PoolSlot-->>PoolInner: available
PoolInner-->>PoolScheduler: AcquirePermit
PoolScheduler-->>PoolHandle: PooledClientLease
PoolHandle-->>Client: Ok(lease)
else Enqueue and wait
PoolScheduler->>PoolScheduler: enqueue_waiter(handle_id) / kick()
PoolScheduler->>PoolScheduler: service loop spawns
loop Service loop
PoolScheduler->>PoolInner: acquire_slot_permit()
PoolInner->>PoolSlot: acquire permit (rotated)
PoolSlot-->>PoolInner: permit acquired
PoolInner-->>PoolScheduler: AcquirePermit
PoolScheduler->>Waiter: send lease via oneshot
end
PoolHandle->>PoolScheduler: await oneshot
PoolScheduler-->>PoolHandle: PooledClientLease
PoolHandle-->>Client: Ok(lease)
end
Client->>PoolHandle: drop(lease)
PoolHandle->>PoolScheduler: notify_capacity_available()
PoolScheduler->>PoolScheduler: service next waiter
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
- Introduce PoolHandle as a logical-session identity for fair and orderly pooled lease acquisition. - Implement PoolScheduler to queue blocked handles and grant leases according to fairness policies. - Add PoolFairnessPolicy enum with RoundRobin (default) and Fifo variants. - Update WireframeClientPool to expose handle() for creating PoolHandle instances. - Modify lease and pool inner types to cooperate with scheduler for fairness and back-pressure. - Include comprehensive unit, BDD scenario, and integration tests for PoolHandle fairness behaviors. - Update docs and examples to showcase PoolHandle usage and fairness configurations. This enhancement enables clients to multiplex logical sessions fairly over pooled connections without compromising back-pressure or warm socket reuse. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: src/client/tests/pool_handle.rs Comment on file async fn handle_path_recycles_after_idle_timeout(
client_pool_config: ClientPoolConfig,
) -> TestResult {
let preamble_callback_count = Arc::new(AtomicUsize::new(0));
let server = PoolTestServer::start().await?;
let idle_timeout = Duration::from_millis(50);
let pool = build_pooled_client(
server.addr,
client_pool_config.pool_size(1).idle_timeout(idle_timeout),
preamble_callback_count.clone(),
)
.await?;
let mut handle = pool.handle();
let first: Pong = handle.call(&Ping(1)).await?;
assert_eq!(first, Pong(1));
advance(idle_timeout + idle_timeout).await;
tokio::task::yield_now().await;
let second: Pong = handle.call(&Ping(2)).await?;
assert_eq!(second, Pong(2));
assert_eq!(preamble_callback_count.load(Ordering::SeqCst), 2);
assert_eq!(server.preamble_count(), 2);
assert_eq!(server.connection_count(), 2);
Ok(())
}❌ New issue: Code Duplication |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: tests/fixtures/client_pool_handle.rs Comment on file pub async fn run_warm_reuse_then_idle_recycle_scenario(&mut self) -> TestResult {
tokio::time::pause();
self.start_server().await?;
let idle_timeout = Duration::from_millis(50);
self.connect_pool(
ClientPoolConfig::default()
.pool_size(1)
.idle_timeout(idle_timeout),
)
.await?;
let pool = self.pool.as_ref().ok_or("pool missing")?;
let mut handle = pool.handle();
let first: Pong = handle.call(&Ping(1)).await?;
let second: Pong = handle.call(&Ping(2)).await?;
if first != Pong(1) || second != Pong(2) {
return Err("unexpected warm reuse response sequence".into());
}
let warm_reuse_preserved = self.preamble_callback_count.load(Ordering::SeqCst) == 1
&& self.server.as_ref().is_some_and(|server| {
server.preamble_count() == 1 && server.connection_count() == 1
});
tokio::time::advance(idle_timeout + idle_timeout).await;
tokio::task::yield_now().await;
let third: Pong = handle.call(&Ping(3)).await?;
self.warm_reuse_then_recycle = third == Pong(3)
&& warm_reuse_preserved
&& self.preamble_callback_count.load(Ordering::SeqCst) == 2
&& self.server.as_ref().is_some_and(|server| {
server.preamble_count() == 2 && server.connection_count() == 2
});
Ok(())
}❌ New issue: Complex Method |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
…ount tracking Refactored pool_handle tests to introduce a helper function build_preamble_pool that constructs the test pool along with the preamble callback counter. Updated existing tests handle_path_preserves_warm_reuse_and_preamble and handle_path_recycles_after_idle_timeout to use the new helper. Added helper methods in client_pool_handle fixture to simplify assertions on preamble callback counts and server state. Also fixed minor markdown formatting in related docs. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- The PR title/summary mention only refactoring tests, but the diff introduces a new public
PoolHandleAPI, scheduler, and config surface; consider updating the PR metadata so reviewers immediately see the API/behavioural scope being changed. - The helper logic for building handle-aware pools and repeatedly acquiring/recording grants is duplicated between
tests/fixtures/client_pool_handle.rsandsrc/client/tests/pool_handle.rs; consider consolidating these into a shared test helper to keep the scenarios and unit tests in sync.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The PR title/summary mention only refactoring tests, but the diff introduces a new public `PoolHandle` API, scheduler, and config surface; consider updating the PR metadata so reviewers immediately see the API/behavioural scope being changed.
- The helper logic for building handle-aware pools and repeatedly acquiring/recording grants is duplicated between `tests/fixtures/client_pool_handle.rs` and `src/client/tests/pool_handle.rs`; consider consolidating these into a shared test helper to keep the scenarios and unit tests in sync.
## Individual Comments
### Comment 1
<location path="src/client/pool/scheduler.rs" line_range="143-144" />
<code_context>
+ inner: Arc<ClientPoolInner<S, P, C>>,
+ handle_id: u64,
+ ) -> Result<PooledClientLease<S, P, C>, ClientError> {
+ if let Some(lease) = inner.try_acquire_immediately() {
+ return Ok(lease);
+ }
+
</code_context>
<issue_to_address>
**question (bug_risk):** Immediate acquisition here can bypass fairness for already-queued waiters.
In `acquire_for_handle`, the fast path calls `inner.try_acquire_immediately()` before considering `state.waiters`. When waiters already exist, this lets a new handle take a free permit without going through `SchedulerState::take_next_waiter` or honoring `PoolFairnessPolicy`, so newer callers can effectively jump the queue under contention. If strict fairness is required whenever `has_waiters()` is true, consider only using this immediate path when there are no recorded waiters, or clearly document that fairness is only guaranteed when using a long-lived `PoolHandle` and not per-call `acquire()`.
</issue_to_address>
### Comment 2
<location path="src/client/tests/pool_handle.rs" line_range="138-153" />
<code_context>
+
+#[rstest]
+#[tokio::test(flavor = "current_thread")]
+async fn handle_acquire_respects_back_pressure(client_pool_config: ClientPoolConfig) -> TestResult {
+ let (_server, pool) = build_handle_pool(client_pool_config.pool_size(1)).await?;
+ let mut first = pool.handle();
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for the case where a waiting handle is dropped before it acquires a lease, to cover the scheduler’s cancellation path.
The scheduler has explicit logic for when `sender.send(result)` fails and a lease is dropped, to avoid leaking capacity, but this path isn’t currently exercised because all `acquire_and_record` tasks complete successfully. Please add a test where a `PoolHandle` (or its task) is dropped while blocked in `acquire()`, then verify that capacity isn’t leaked (a subsequent `acquire()` still succeeds) and that remaining waiters are still serviced. For example, spawn two waiters, drop one before it acquires, and assert the other can still acquire and the pool remains usable, thereby covering the `sender.send(..)` error branch in `PoolScheduler::service_waiters`.
```suggestion
#[rstest]
#[tokio::test(flavor = "current_thread")]
async fn handle_acquire_respects_back_pressure(client_pool_config: ClientPoolConfig) -> TestResult {
let (_server, pool) = build_handle_pool(client_pool_config.pool_size(1)).await?;
let mut first = pool.handle();
let mut second = pool.handle();
let held_lease = first.acquire().await?;
let blocked = timeout(Duration::from_millis(25), second.acquire()).await;
assert!(blocked.is_err(), "second handle should stay blocked");
drop(held_lease);
let recovered = timeout(Duration::from_millis(100), second.acquire()).await?;
let _recovered = recovered?;
Ok(())
}
#[rstest]
#[tokio::test(flavor = "current_thread")]
async fn handle_acquire_dropped_waiter_does_not_leak_capacity(
client_pool_config: ClientPoolConfig,
) -> TestResult {
let (_server, pool) = build_handle_pool(client_pool_config.pool_size(1)).await?;
// First handle acquires the only available lease, causing subsequent acquires to block.
let mut holder = pool.handle();
let mut waiter1 = pool.handle();
let mut waiter2 = pool.handle();
let held_lease = holder.acquire().await?;
// Spawn two waiters that will block on acquire().
let waiter1_task = tokio::spawn(async move { waiter1.acquire().await });
let waiter2_task = tokio::spawn(async move { waiter2.acquire().await });
// Give the scheduler a moment to register both waiters.
tokio::time::sleep(Duration::from_millis(25)).await;
// Drop the first waiter before any lease is released to exercise the sender.send(..) error path.
drop(waiter1_task);
// Releasing the held lease should not leak capacity and should service remaining waiters.
drop(held_lease);
// The remaining waiter should still be able to acquire a lease.
let lease2 = timeout(Duration::from_millis(100), waiter2_task).await???;
drop(lease2);
// The pool should remain usable: a new acquire should succeed.
let mut later = pool.handle();
let _lease3 = timeout(Duration::from_millis(100), later.acquire()).await??;
Ok(())
}
```
</issue_to_address>
### Comment 3
<location path="src/client/pool/scheduler.rs" line_range="27" />
<code_context>
+
+type WaiterSender<S, P, C> = oneshot::Sender<Result<PooledClientLease<S, P, C>, ClientError>>;
+
+struct SchedulerState<S, P, C>
+where
+ S: Serializer + Clone + Send + Sync + 'static,
</code_context>
<issue_to_address>
**issue (complexity):** Consider restructuring the scheduler state to use per-handle waiter queues plus a single FIFO queue instead of three shared collections keyed by handle IDs.
The main complexity comes from tracking handle IDs in three places and the round‑robin logic having to keep those structures consistent. You can keep the same behaviour while simplifying the state shape and `take_next_*` logic.
### 1. Unify state: per‑handle queues
Instead of `waiters: HashMap<u64, WaiterSender<…>> + fifo_waiters: VecDeque<u64> + round_robin_handles: VecDeque<u64>`, you can:
- Keep FIFO as a simple queue of senders.
- Keep round‑robin as a queue of handles, each with its own waiter queue.
- Stop storing `handle_id` in multiple collections and doing `retain` scans.
Example sketch:
```rust
struct HandleQueue<S, P, C> {
waiters: VecDeque<WaiterSender<S, P, C>>,
}
struct SchedulerState<S, P, C> {
// All handles that participate in round-robin, in rotation order.
round_robin_handles: VecDeque<u64>,
// Per-handle queues (only used for round-robin).
handle_queues: HashMap<u64, HandleQueue<S, P, C>>,
// FIFO queue (policy = Fifo).
fifo: VecDeque<WaiterSender<S, P, C>>,
}
impl<S, P, C> SchedulerState<S, P, C> {
fn register_handle(&mut self, handle_id: u64) {
self.round_robin_handles.push_back(handle_id);
self.handle_queues.entry(handle_id).or_insert_with(|| HandleQueue {
waiters: VecDeque::new(),
});
}
fn deregister_handle(&mut self, handle_id: u64) {
self.handle_queues.remove(&handle_id);
self.round_robin_handles
.retain(|queued_id| *queued_id != handle_id);
// nothing to do for FIFO; waiters are anonymous
}
fn enqueue_waiter(
&mut self,
handle_id: u64,
sender: WaiterSender<S, P, C>,
policy: PoolFairnessPolicy,
) {
match policy {
PoolFairnessPolicy::Fifo => self.fifo.push_back(sender),
PoolFairnessPolicy::RoundRobin => {
let queue = self.handle_queues
.entry(handle_id)
.or_insert_with(|| HandleQueue { waiters: VecDeque::new() });
queue.waiters.push_back(sender);
}
}
}
fn has_waiters(&self) -> bool {
!self.fifo.is_empty()
|| self.handle_queues.values().any(|h| !h.waiters.is_empty())
}
}
```
### 2. Simplify round‑robin selection
With the above state, `take_next_round_robin_waiter` no longer needs to:
- Look up a sender in a global map, **and**
- Prune `fifo_waiters` with `retain`.
It can simply rotate over handles and pop from per‑handle queues:
```rust
impl<S, P, C> SchedulerState<S, P, C> {
fn take_next_waiter(&mut self, policy: PoolFairnessPolicy)
-> Option<WaiterSender<S, P, C>>
{
match policy {
PoolFairnessPolicy::Fifo => self.take_next_fifo_waiter(),
PoolFairnessPolicy::RoundRobin => self.take_next_round_robin_waiter(),
}
}
fn take_next_fifo_waiter(&mut self) -> Option<WaiterSender<S, P, C>> {
self.fifo.pop_front()
}
fn take_next_round_robin_waiter(&mut self) -> Option<WaiterSender<S, P, C>> {
let len = self.round_robin_handles.len();
for _ in 0..len {
let handle_id = self.round_robin_handles.pop_front()?;
self.round_robin_handles.push_back(handle_id);
if let Some(queue) = self.handle_queues.get_mut(&handle_id) {
if let Some(sender) = queue.waiters.pop_front() {
return Some(sender);
}
}
}
None
}
}
```
This removes:
- The `waiters: HashMap<u64, WaiterSender<…>>`.
- The `fifo_waiters: VecDeque<u64>` and its repeated `retain` calls.
- Cross‑collection consistency concerns (no need to synchronize a global map with two queues of IDs).
All functionality is preserved: FIFO still gets strict queueing, and round‑robin still rotates over handles, but the scheduling logic is easier to follow and cheaper to maintain.
</issue_to_address>
### Comment 4
<location path="docs/users-guide.md" line_range="1431" />
<code_context>
admission budget, not as a guarantee of parallel writes on one TCP stream.
+Create a `PoolHandle` when one logical session needs repeated pooled access and
+you want that session to participate in the configured fairness policy over
+time:
+
</code_context>
<issue_to_address>
**suggestion (review_instructions):** This sentence uses the 2nd-person pronoun "you", which the documentation style guide forbids.
To comply with the style guide, rephrase this line in the third person, for example: "when one logical session needs repeated pooled access and the session should participate in the configured fairness policy over time" or similar.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*.md`
**Instructions:**
Avoid 2nd person or 1st person pronouns ("I", "you", "we").
</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 13a635c326
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/roadmap.md`:
- Line 505: Update the checklist item text "11.2.2. Expose a `PoolHandle` API
with fairness policies so callers can" to insert a comma before the conjunction
so, changing it to "11.2.2. Expose a `PoolHandle` API with fairness policies, so
callers can" to improve readability and satisfy prose linting.
In `@src/client/pool/client_pool.rs`:
- Around line 130-133: The close() implementation currently just drops the
wrapper allowing existing PoolHandle instances to keep ClientPoolInner alive and
continue to acquire sockets; modify ClientPoolInner to hold a shutdown flag
(e.g., AtomicBool or Mutex<bool>) and update close() to set that flag,
notify/await any background tasks, and actively tear down slot state (close warm
sockets and wake pending waiters). Ensure handle() and PoolHandle::acquire()
check the shutdown flag and return an error immediately if set, and make close()
signal any waiting/acquiring futures so they fail fast; update any methods
manipulating slots to short-circuit when ClientPoolInner is closed.
- Around line 143-147: try_acquire_immediately currently constructs
PooledClientLease with release_inner = None, so immediately-acquired leases
won't call notify_capacity_available on drop; change the method signature to
take self: &Arc<Self> and, when creating the lease in try_acquire_immediately,
pass Some(Arc::clone(self)) to PooledClientLease::new (instead of None) so the
drop path (lease.rs drop calling notify_capacity_available()) will wake the next
queued handle—this mirrors the service_waiters/acquire_for_handle behavior in
scheduler.rs.
In `@src/client/pool/scheduler.rs`:
- Around line 143-151: The fast-path early return using try_acquire_immediately
bypasses queued waiters and breaks FIFO fairness; remove the early return and
always route acquisition through the waiter queue by first creating the oneshot,
calling recover_mutex(&self.state).enqueue_waiter(handle_id, sender), then
attempt try_acquire_immediately and, if it succeeds, fulfill the just-enqueued
waiter (send the lease on the oneshot) so the queued order is preserved;
otherwise call self.kick(inner) and await the receiver as before (use the
existing symbols try_acquire_immediately, enqueue_waiter, kick, sender/receiver,
handle_id, inner, state).
In `@src/client/tests/pool_handle.rs`:
- Around line 88-90: The test currently asserts a fixed ordering for the grants
vector (observed) which is order-dependent; instead, change the assertion in the
pool handle test to verify alternation and correct counts: confirm
observed.len() is 6, that counts for "a" and "b" are each 3, and that for each
adjacent pair observed[i] != observed[i - 1] to ensure strict alternation;
update the assertions around the grants.lock().await.clone() check in the test
(the observed variable) to use these checks rather than assert_eq! with a fixed
vector.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 97794d0b-3933-4ea6-82cb-7a13f3400901
📒 Files selected for processing (20)
docs/execplans/11-2-2-expose-pool-handle-api.mddocs/roadmap.mddocs/users-guide.mddocs/wireframe-client-design.mdsrc/client/mod.rssrc/client/pool/client_pool.rssrc/client/pool/config.rssrc/client/pool/handle.rssrc/client/pool/lease.rssrc/client/pool/mod.rssrc/client/pool/policy.rssrc/client/pool/scheduler.rssrc/client/tests/mod.rssrc/client/tests/pool_handle.rstests/bdd_pool/fixtures.rstests/bdd_pool/scenarios.rstests/features/client_pool_handle.featuretests/fixtures/client_pool_handle.rstests/scenarios/client_pool_handle_scenarios.rstests/steps/client_pool_handle_steps.rs
This commit introduces a shutdown mechanism to the client connection pool to prevent new leases from being acquired after shutdown. It adds an AtomicBool flag to track pool shutdown state, modifies acquisition logic to respect shutdown status, and enhances the scheduler to notify all waiting tasks with disconnection errors during shutdown. Additionally, new tests were added to verify that dropped waiters do not leak capacity and that shutdown behavior is correctly handled. Minor documentation improvements and test helper refactors accompany these changes. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
src/test_helpers/pool_client.rs (1)
139-150: 🛠️ Refactor suggestion | 🟠 MajorFinish centralizing the preamble-pool setup.
This helper still requires a pre-started
PoolTestServer, so the warm-reuse
tests continue to repeat server startup boilerplate. The Rustdoc also says
server startup can fail, but this function never starts one. Move
PoolTestServer::start()into this helper and return the server alongside the
pool and counter.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/test_helpers/pool_client.rs` around lines 139 - 150, The helper build_preamble_pool currently expects a pre-started PoolTestServer but the doc and tests expect it to start the server itself; change build_preamble_pool to call PoolTestServer::start() inside the function (handle and propagate any start errors), construct the preamble_callback_count, call build_pooled_client with the started server.addr, and return the started PoolTestServer along with the TestClientPool and Arc<AtomicUsize> (i.e. change the Result return type to Result<(PoolTestServer, TestClientPool, Arc<AtomicUsize>), ClientError> and update all call sites accordingly).tests/fixtures/client_pool_handle.rs (1)
189-201:⚠️ Potential issue | 🟠 MajorAssert alternation, not a fixed first grant.
Either session can win the first slot while round-robin is still correct.
Hard-codingsession-aas the first grant makes the behavioural scenario flap
on task start order.🛠️ Proposed fix
- pub fn sessions_alternate_fairly(&self) -> bool { - self.grant_order - == [ - "session-a", - "session-b", - "session-a", - "session-b", - "session-a", - "session-b", - ] - } + pub fn sessions_alternate_fairly(&self) -> bool { + self.grant_order.len() == 6 + && self + .grant_order + .iter() + .filter(|label| label.as_str() == "session-a") + .count() + == 3 + && self + .grant_order + .iter() + .filter(|label| label.as_str() == "session-b") + .count() + == 3 + && self.grant_order.windows(2).all(|pair| pair[0] != pair[1]) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/fixtures/client_pool_handle.rs` around lines 189 - 201, The test sessions_alternate_fairly currently asserts a fixed start ("session-a") which makes the test flaky; change it to verify alternation generically by checking that grant_order has the expected length (6), that grant_order[0] != grant_order[1], and that for every even index i grant_order[i] == grant_order[0] and for every odd index i grant_order[i] == grant_order[1]; update the sessions_alternate_fairly function to implement this alternating pattern check against self.grant_order instead of the hard-coded array.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/client/pool/scheduler.rs`:
- Around line 216-225: The loop in service_waiters removes the waiter early with
take_next_waiter_or_stop() then awaits inner.acquire_slot_permit(), which can be
missed by shutdown notifications; fix by racing the permit acquisition with the
pool shutdown notifier: use a select (or futures::select / tokio::select)
between inner.acquire_slot_permit() and inner.shutdown_notified() (or
notify_shutdown()) so if shutdown fires you send an Err (or cancellation) to the
removed sender and stop/continue appropriately, and if the permit wins you
construct the PooledClientLease::new(slot, permit, Some(Arc::clone(&inner))) and
send it as before; reference service_waiters, take_next_waiter_or_stop,
inner.acquire_slot_permit, ClientPoolInner and PooledClientLease in the change.
- Around line 151-156: The code currently calls std::mem::forget(receiver) in
the success path of try_acquire_immediately which leaks the oneshot receiver and
can strand a lease when service_waiters() races and later sender.send(Ok(lease))
succeeds; replace the mem::forget with dropping the receiver (e.g.,
drop(receiver)) so the oneshot is closed and any later sender.send will return
Err(lease), returning the lease to the sender instead of permanently consuming
pool capacity; update the block inside the if let Some(lease) =
inner.try_acquire_immediately() (the branch that calls
take_next_waiter_or_stop()) to explicitly drop the receiver rather than
forgetting it.
In `@src/client/tests/pool_handle.rs`:
- Around line 159-168: The test currently drops the JoinHandle waiter1_task
which detaches the Tokio task and leaves waiter1.acquire() queued; instead abort
the task so the waiter is cancelled: call waiter1_task.abort() and then await
the handle (e.g., let _ = waiter1_task.await) before dropping held_lease so the
pool capacity will be assigned to waiter2_task; update the test to replace
drop(waiter1_task) with abort + await to ensure waiter1 is actually cancelled
and the timeout on waiter2_task exercises the intended path.
---
Duplicate comments:
In `@src/test_helpers/pool_client.rs`:
- Around line 139-150: The helper build_preamble_pool currently expects a
pre-started PoolTestServer but the doc and tests expect it to start the server
itself; change build_preamble_pool to call PoolTestServer::start() inside the
function (handle and propagate any start errors), construct the
preamble_callback_count, call build_pooled_client with the started server.addr,
and return the started PoolTestServer along with the TestClientPool and
Arc<AtomicUsize> (i.e. change the Result return type to Result<(PoolTestServer,
TestClientPool, Arc<AtomicUsize>), ClientError> and update all call sites
accordingly).
In `@tests/fixtures/client_pool_handle.rs`:
- Around line 189-201: The test sessions_alternate_fairly currently asserts a
fixed start ("session-a") which makes the test flaky; change it to verify
alternation generically by checking that grant_order has the expected length
(6), that grant_order[0] != grant_order[1], and that for every even index i
grant_order[i] == grant_order[0] and for every odd index i grant_order[i] ==
grant_order[1]; update the sessions_alternate_fairly function to implement this
alternating pattern check against self.grant_order instead of the hard-coded
array.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5dde50f4-268e-4353-9f56-3d484cbe3bcd
📒 Files selected for processing (8)
docs/roadmap.mddocs/users-guide.mdsrc/client/pool/client_pool.rssrc/client/pool/scheduler.rssrc/client/tests/pool_handle.rssrc/test_helpers.rssrc/test_helpers/pool_client.rstests/fixtures/client_pool_handle.rs
This comment was marked as resolved.
This comment was marked as resolved.
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chainI'll inspect the current state of the relevant files to determine what has and hasn't been resolved. 🏁 Script executed: Length of output: 22561
Overall Comments
Individual Comments
Two items remain open: the PR title (Overall Comment 1) and the |
…uler - Introduce a Tokio Notify to signal shutdown state in ClientPool. - Modify PoolScheduler to asynchronously respond to shutdown via notification. - Prevent waiter starvation by servicing waiters one at a time asynchronously. - Improve resource cleanup on waiter cancellations. - Update tests to handle aborted waiter tasks correctly. - Refactor client pool test helpers and fixtures for correctness. These changes enable more graceful and responsive shutdown handling in the connection pool implementation. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 23782
Here is the full status assessment against the current file contents: Overall Comments
Individual Comments
The sole remaining open item is the |
|
@coderabbitai Have the following now been resolved? Annul any requirements that violate the en-GB-oxendict spelling (-ize / -yse / -our) conventions (for example a request to replace "normalize" with "normalise" or "artefact" with "artifact"), or where the requirement unnecessarily increases cyclomatic complexity. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chainI'll inspect the current state of 🏁 Script executed: Length of output: 0 🏁 Script executed: Length of output: 3 🏁 Script executed: Length of output: 78
Nothing can be annulled under the stated criteria: none of the three individual comments request en-GB-oxendict spelling changes, and none of the suggestions increase cyclomatic complexity (all are purely additive test cases). To get a proper resolution assessment, please share the link or number of the PR where |
|
@coderabbitai please create a new GitHub issue for the following: Consider restructuring the scheduler state to use per-handle waiter queues plus a single FIFO queue instead of three shared collections keyed by handle IDs. The main complexity comes from tracking handle IDs in three places and the round‑robin logic having to keep those structures consistent. You can keep the same behaviour while simplifying the state shape and 1. Unify state: per‑handle queuesInstead of
Example sketch: struct HandleQueue<S, P, C> {
waiters: VecDeque<WaiterSender<S, P, C>>,
}
struct SchedulerState<S, P, C> {
// All handles that participate in round-robin, in rotation order.
round_robin_handles: VecDeque<u64>,
// Per-handle queues (only used for round-robin).
handle_queues: HashMap<u64, HandleQueue<S, P, C>>,
// FIFO queue (policy = Fifo).
fifo: VecDeque<WaiterSender<S, P, C>>,
}
impl<S, P, C> SchedulerState<S, P, C> {
fn register_handle(&mut self, handle_id: u64) {
self.round_robin_handles.push_back(handle_id);
self.handle_queues.entry(handle_id).or_insert_with(|| HandleQueue {
waiters: VecDeque::new(),
});
}
fn deregister_handle(&mut self, handle_id: u64) {
self.handle_queues.remove(&handle_id);
self.round_robin_handles
.retain(|queued_id| *queued_id != handle_id);
// nothing to do for FIFO; waiters are anonymous
}
fn enqueue_waiter(
&mut self,
handle_id: u64,
sender: WaiterSender<S, P, C>,
policy: PoolFairnessPolicy,
) {
match policy {
PoolFairnessPolicy::Fifo => self.fifo.push_back(sender),
PoolFairnessPolicy::RoundRobin => {
let queue = self.handle_queues
.entry(handle_id)
.or_insert_with(|| HandleQueue { waiters: VecDeque::new() });
queue.waiters.push_back(sender);
}
}
}
fn has_waiters(&self) -> bool {
!self.fifo.is_empty()
|| self.handle_queues.values().any(|h| !h.waiters.is_empty())
}
}2. Simplify round‑robin selectionWith the above state,
It can simply rotate over handles and pop from per‑handle queues: impl<S, P, C> SchedulerState<S, P, C> {
fn take_next_waiter(&mut self, policy: PoolFairnessPolicy)
-> Option<WaiterSender<S, P, C>>
{
match policy {
PoolFairnessPolicy::Fifo => self.take_next_fifo_waiter(),
PoolFairnessPolicy::RoundRobin => self.take_next_round_robin_waiter(),
}
}
fn take_next_fifo_waiter(&mut self) -> Option<WaiterSender<S, P, C>> {
self.fifo.pop_front()
}
fn take_next_round_robin_waiter(&mut self) -> Option<WaiterSender<S, P, C>> {
let len = self.round_robin_handles.len();
for _ in 0..len {
let handle_id = self.round_robin_handles.pop_front()?;
self.round_robin_handles.push_back(handle_id);
if let Some(queue) = self.handle_queues.get_mut(&handle_id) {
if let Some(sender) = queue.waiters.pop_front() {
return Some(sender);
}
}
}
None
}
}This removes:
All functionality is preserved: FIFO still gets strict queueing, and round‑robin still rotates over handles, but the scheduling logic is easier to follow and cheaper to maintain. This is marked as a suggestion for architectural improvement, not a required fix. The reviewer's comment indicates this would simplify the code but acknowledges the current implementation "preserves all functionality." Let me evaluate whether this should be implemented now: Current state: The existing three-collection design works correctly with the fairness fixes applied:
Proposed state: Per-handle queues would be cleaner architecturally but represents a significant refactor that:
Given that:
Recommendation: This architectural improvement should be considered for a future refactoring task, not as part of this PR which has already grown in scope significantly. The current implementation is correct, tested, and maintainable. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 46
|
Summary
Changes
Public API
Internal
Tests
Documentation
Validation and evidence
Notes
📎 Task: https://www.devboxer.com/task/c4b1e5d1-4af6-49be-9248-337cdb6e12c4