Implement interleaved queue parity tests and ExecPlan living doc#468
Implement interleaved queue parity tests and ExecPlan living doc#468
Conversation
Reviewer's GuideImplements roadmap item 10.3.2 by adding an ExecPlan living document plus new unit and BDD tests that exercise interleaved high/low push queue fairness and shared rate limiting via the client streaming surface, wiring them through existing streaming fixtures and documenting the parity guarantees across the design and user docs. Sequence diagram for interleaved queue parity validation via client streamingsequenceDiagram
participant UnitTest as UnitTest_streaming_parity
participant BDD as BDD_client_streaming
participant Fixture as ClientStreamingFixture
participant Conn as ConnectionActor
participant Fair as FairnessConfig
participant PQHigh as PushQueueHigh
participant PQLow as PushQueueLow
participant RL as RateLimiter
participant RS as ResponseStream
UnitTest->>Fixture: configure_interleaved_scenario(high_load, low_load, shared_budget)
BDD->>Fixture: configure_interleaved_scenario(high_load, low_load, shared_budget)
Fixture->>Fair: build_fairness_config(thresholds)
Fixture->>Conn: start_with_fairness(Fair)
loop enqueue_interleaved_pushes
Fixture->>PQHigh: enqueue_high_priority_push(frame)
Fixture->>PQLow: enqueue_low_priority_push(frame)
end
Conn->>RL: request_budget_for_high(batch)
RL-->>Conn: grant_or_delay_high
Conn->>PQHigh: dequeue_under_budget
PQHigh-->>Conn: high_frame
Conn->>RL: request_budget_for_low(batch)
RL-->>Conn: grant_or_delay_low
Conn->>PQLow: dequeue_under_budget
PQLow-->>Conn: low_frame
Conn->>RS: emit_frame_to_response_stream(high_frame_or_low_frame)
RS-->>UnitTest: yield_next_frame()
RS-->>BDD: yield_next_frame()
UnitTest->>UnitTest: assert_fairness_parity(interleaved_progress_low_under_burst)
UnitTest->>UnitTest: assert_rate_limit_symmetry(shared_budget_across_priorities)
BDD->>BDD: assert_user_visible_progress(low_priority_not_starved)
BDD->>BDD: assert_user_visible_rate_limit_effects(shared_budget_marker_frame)
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review infoConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (14)
Summary by CodeRabbitRelease Notes
WalkthroughThis pull request implements end-to-end validation of interleaved high-/low-priority push queue fairness and shared rate-limit symmetry through the client streaming surface, including comprehensive unit tests, behaviour-driven development scenarios, and documentation updates without changing public APIs. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Comment |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: tests/fixtures/client_streaming/server.rs Comment on file pub(crate) async fn build_interleaved_priority_frames(
cid: CorrelationId,
) -> TestResult<Vec<StreamTestEnvelope>> {
let (queues, handle) = PushQueues::<StreamTestEnvelope>::builder()
.high_capacity(8)
.low_capacity(8)
.unlimited()
.build()?;
let push_handle = handle.clone();
let mut actor = ConnectionActor::with_hooks(
ConnectionChannels::new(queues, handle),
None,
CancellationToken::new(),
hooks_with_stream_end(),
);
actor.set_fairness(FairnessConfig {
max_high_before_low: 1,
time_slice: None,
});
push_handle
.push_high_priority(StreamTestEnvelope::data(
MessageId::new(1),
cid,
Payload::new(vec![1]),
))
.await?;
push_handle
.push_high_priority(StreamTestEnvelope::data(
MessageId::new(3),
cid,
Payload::new(vec![3]),
))
.await?;
push_handle
.push_low_priority(StreamTestEnvelope::data(
MessageId::new(2),
cid,
Payload::new(vec![2]),
))
.await?;
push_handle
.push_low_priority(StreamTestEnvelope::data(
MessageId::new(4),
cid,
Payload::new(vec![4]),
))
.await?;
let (tx, rx) = mpsc::channel(4);
tx.send(StreamTestEnvelope::data(
MessageId::new(10),
cid,
Payload::new(vec![10]),
))
.await?;
tx.send(StreamTestEnvelope::data(
MessageId::new(11),
cid,
Payload::new(vec![11]),
))
.await?;
drop(tx);
actor
.set_multi_packet_with_correlation(Some(rx), Some(cid.get()))
.map_err(|e| format!("set_multi_packet_with_correlation failed: {e}"))?;
drop(push_handle);
let mut out = Vec::new();
actor
.run(&mut out)
.await
.map_err(|e| format!("connection actor run failed: {e:?}"))?;
Ok(out)
}❌ New issue: Large Method |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: src/client/tests/streaming_parity.rs Comment on file async fn collect_interleaved_fairness_frames(
correlation: CorrelationId,
) -> TestResult<Vec<TestStreamEnvelope>> {
let (queues, handle) = PushQueues::<TestStreamEnvelope>::builder()
.high_capacity(8)
.low_capacity(8)
.unlimited()
.build()?;
let push_handle = handle.clone();
let mut actor = ConnectionActor::with_hooks(
ConnectionChannels::new(queues, handle),
None,
CancellationToken::new(),
hooks_with_stream_end(),
);
actor.set_fairness(FairnessConfig {
max_high_before_low: 1,
time_slice: None,
});
// High-priority burst with low-priority frames queued to prove fairness.
push_handle
.push_high_priority(TestStreamEnvelope::data(
MessageId::new(1),
correlation,
Payload::new(vec![1]),
))
.await?;
push_handle
.push_high_priority(TestStreamEnvelope::data(
MessageId::new(3),
correlation,
Payload::new(vec![3]),
))
.await?;
push_handle
.push_low_priority(TestStreamEnvelope::data(
MessageId::new(2),
correlation,
Payload::new(vec![2]),
))
.await?;
push_handle
.push_low_priority(TestStreamEnvelope::data(
MessageId::new(4),
correlation,
Payload::new(vec![4]),
))
.await?;
let (tx, rx) = mpsc::channel(4);
tx.send(TestStreamEnvelope::data(
MessageId::new(10),
correlation,
Payload::new(vec![10]),
))
.await?;
tx.send(TestStreamEnvelope::data(
MessageId::new(11),
correlation,
Payload::new(vec![11]),
))
.await?;
drop(tx);
actor
.set_multi_packet_with_correlation(Some(rx), Some(correlation.get()))
.map_err(|e| format!("failed to set multi-packet source: {e}"))?;
// Allow the actor to observe queue closure once all buffered frames drain.
drop(push_handle);
let mut out = Vec::new();
actor
.run(&mut out)
.await
.map_err(|e| format!("connection actor run failed: {e:?}"))?;
Ok(out)
}❌ New issue: Large Method |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The rate-limiting tests use
tokio::time::pause()/resume()directly and can early-return on errors without resuming time; consider wrapping pause/resume in a small RAII guard so the runtime clock is always restored even on failure paths. - There is quite a bit of duplicated test harness logic between
tests/fixtures/client_streaming/server.rsandsrc/client/tests/streaming_parity.rs(e.g.,stream_terminator,hooks_with_stream_end, interleaved/rate-limited frame builders); it would be cleaner and easier to maintain if these helpers were shared in a common test module. - The shared rate-limit BDD fixture uses magic marker payload values
98/99to signal contention; extracting these into named constants or a small helper would make the intent clearer and reduce the chance of accidental collisions with other test payloads.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The rate-limiting tests use `tokio::time::pause()`/`resume()` directly and can early-return on errors without resuming time; consider wrapping pause/resume in a small RAII guard so the runtime clock is always restored even on failure paths.
- There is quite a bit of duplicated test harness logic between `tests/fixtures/client_streaming/server.rs` and `src/client/tests/streaming_parity.rs` (e.g., `stream_terminator`, `hooks_with_stream_end`, interleaved/rate-limited frame builders); it would be cleaner and easier to maintain if these helpers were shared in a common test module.
- The shared rate-limit BDD fixture uses magic marker payload values `98`/`99` to signal contention; extracting these into named constants or a small helper would make the intent clearer and reduce the chance of accidental collisions with other test payloads.
## Individual Comments
### Comment 1
<location> `docs/execplans/10-3-2-exercise-interleaved-queues.md:3` </location>
<code_context>
+# Exercise Interleaved Push Queue Fairness and Rate-Limit Symmetry
+
+This ExecPlan is a living document. The sections `Constraints`, `Tolerances`,
+`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and
+`Outcomes & Retrospective` must be kept up to date as work proceeds.
</code_context>
<issue_to_address>
**suggestion (review_instructions):** ExecPlan is introduced as an acronym without being defined on first use.
Consider expanding ExecPlan when it first appears, for example: "This Execution Plan (ExecPlan) is a living document." This would satisfy the requirement to define uncommon acronyms on first use.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*.md`
**Instructions:**
Define uncommon acronyms on first use.
</details>
</issue_to_address>
### Comment 2
<location> `docs/execplans/10-3-2-exercise-interleaved-queues.md:137` </location>
<code_context>
+ parallel feature namespace. Rationale: keeps behavioural coverage cohesive
+ and lowers maintenance cost. Date/Author: 2026-02-19 / Codex.
+
+- Decision: use virtual time in timing-sensitive unit tests and outcome-based
+ assertions in BDD scenarios. Rationale: deterministic CI behaviour without
+ over-coupling BDD tests to scheduler details. Date/Author: 2026-02-19 / Codex.
</code_context>
<issue_to_address>
**suggestion (review_instructions):** CI is used later in the document without being expanded on first use.
Later in the document the phrase "deterministic CI behaviour" is used, but CI is not expanded anywhere. To comply with the acronym guidance, consider changing that first occurrence to "continuous integration (CI)".
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*.md`
**Instructions:**
Define uncommon acronyms on first use.
</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a7157dc47e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/fixtures/client_streaming.rs (1)
418-424:⚠️ Potential issue | 🟠 MajorSplit this module to stay under 400 lines.
Extract the new streaming-mode helpers or verification logic into a sibling module (for example
tests/fixtures/client_streaming/modes.rs) and re-export to keep this file below the size cap.As per coding guidelines: "Files must not exceed 400 lines in length".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/fixtures/client_streaming.rs` around lines 418 - 424, This file exceeds the 400-line limit; extract the streaming-mode helpers and verification logic into a sibling module (e.g. create tests/fixtures/client_streaming/modes.rs), move all streaming-specific functions/types (including helpers that reference server or abort_server behavior) out of client_streaming.rs, then in client_streaming.rs add a mod modes; and pub use modes::*; to re-export them so existing tests keep working; update any internal references to use the moved symbols (e.g., ensure abort_server-related code still references the same identifiers) and adjust imports in tests if needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/execplans/10-3-2-exercise-interleaved-queues.md`:
- Line 1: Change the headings to sentence case: replace the H1 "Exercise
Interleaved Push Queue Fairness and Rate-Limit Symmetry" with "Exercise
interleaved push queue fairness and rate-limit symmetry" and update the
subheading "Surprises & Discoveries" to "Surprises & discoveries" so both comply
with the "Write headings in sentence case" guideline.
- Around line 148-151: The document uses the British-Oxford guideline requiring
-ize suffixes: find and replace the British spelling "synthesising" with the
-ize form "synthesizing" in the "Outcomes & retrospective" section (and any
other occurrences in this markdown), ensuring the change is applied to the text
and any inline code/quotes if present (search for the token "synthesising" and
update to "synthesizing").
- Around line 292-314: Replace the indented command block that starts with "set
-o pipefail" and includes the lines like "make fmt", "make check-fmt", "make
lint", "make test-bdd", "make test", "make markdownlint", and "make nixie" with
a fenced code block using a language identifier (use "shell"); wrap the exact
commands between ```shell and ``` and remove any leading indentation or plus
signs so Markdown tooling can validate the shell block correctly.
In `@src/client/tests/streaming_parity.rs`:
- Around line 156-224: In collect_rate_limited_frames fix handling of the
low-priority push future result: when inspecting
blocked_low_future.as_mut().now_or_never(), treat Some(Err(e)) as a real push
error and propagate/return it instead of treating it as "not blocked" and
discarding it; only treat None as the blocked case and proceed to time::advance
and await the future (checking its Result when awaited); ensure references to
push_low_priority, blocked_low/blocked_low_future, and now_or_never are updated
so any Err returned is surfaced as an error from collect_rate_limited_frames.
In `@tests/fixtures/client_streaming/server.rs`:
- Around line 206-270: build_rate_limited_priority_frames currently treats Any
Some(_) from blocked_low_future.as_mut().now_or_never() as "not blocked" and
drops potential Err values; change the logic to inspect the Option<Result<_, _>>
returned by now_or_never() from blocked_low_future: if Some(Err(e)) return Err
with that error (propagate test failure), if Some(Ok(_)) treat as not-blocked,
and only when None treat as blocked and advance time then await the future;
adjust handling of blocked_low/blocked_low_future accordingly so
push_low_priority errors are not swallowed.
---
Outside diff comments:
In `@tests/fixtures/client_streaming.rs`:
- Around line 418-424: This file exceeds the 400-line limit; extract the
streaming-mode helpers and verification logic into a sibling module (e.g. create
tests/fixtures/client_streaming/modes.rs), move all streaming-specific
functions/types (including helpers that reference server or abort_server
behavior) out of client_streaming.rs, then in client_streaming.rs add a mod
modes; and pub use modes::*; to re-export them so existing tests keep working;
update any internal references to use the moved symbols (e.g., ensure
abort_server-related code still references the same identifiers) and adjust
imports in tests if needed.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (12)
docs/execplans/10-3-2-exercise-interleaved-queues.mddocs/multi-packet-and-streaming-responses-design.mddocs/roadmap.mddocs/users-guide.mddocs/wireframe-client-design.mdsrc/client/tests/mod.rssrc/client/tests/streaming_parity.rstests/features/client_streaming.featuretests/fixtures/client_streaming.rstests/fixtures/client_streaming/server.rstests/scenarios/client_streaming_scenarios.rstests/steps/client_streaming_steps.rs
…y fairness and rate limiting - Implement end-to-end parity validation for roadmap item 10.3.2. - Add unit tests in src/client/tests/streaming_parity.rs to verify client observes server-side fairness and shared rate limiting. - Extend behaviour-driven tests with new scenarios and fixtures covering interleaved high- and low-priority push queues. - Update client streaming test fixtures and test server helpers to support interleaved priority and shared rate limiting modes. - Document parity validation rationale and design decisions in multiple design and user guide docs. - Mark roadmap item 10.3.2 as complete. This work confirms fairness and symmetric rate limiting behaviour without changing the public WireframeClient API. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
- Extracted fairness actor setup into `setup_fairness_actor`. - Moved multi-packet channel setup to separate async functions. - Simplified `collect_interleaved_fairness_frames` by using helper functions. - In client_streaming server tests, extracted push logic into a helper and improved multi-packet channel setup. - Overall improved code organization and reusability in streaming parity tests. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
- Introduced PausedTimeGuard struct to encapsulate time::pause and time::resume calls. - Replaced manual time::pause() and time::resume() in streaming parity and client streaming tests with PausedTimeGuard for safer and cleaner time control. - Defined named constants for shared rate limit contention markers for clarity. - Minor docs wording improvements in execution plan markdown. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…o modes.rs Extract streaming server mode definitions, implementations, and verification helpers from the client_streaming fixture module into a new dedicated modes.rs file to improve code organization and maintainability. This includes moving enum StreamingServerMode and related async functions like run_streaming_mode, as well as associated verification methods on ClientStreamingWorld. Also, fix subtle bug in build_rate_limited_priority_frames where now_or_never was previously only checked for None but now properly accounts for Some(Ok) and Some(Err) cases. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Handle results explicitly in interleaved queue tests so\n no longer fails on .\n\nAlso fix serializer imports after splitting client streaming helpers\ninto .
9b8bbe5 to
f7ea261
Compare
Summary
Changes
-/docs/users-guide.md: note parity validation observed via client streaming surface
Motivation
Plan of work
Stage A: Lock the parity contract and test matrix
Stage B: Extend unit tests (rstest) for interleaved parity
Stage C: Extend behavioural tests (rstest-bdd 0.5.0)
Stage D: Apply minimal implementation fixes if required
Stage E: Update design docs, users guide, and roadmap
Stage F: Run full quality gates with captured logs
Acceptance criteria
Validation and acceptance
Context and orientation
Notes for implementers
📎 Task: https://www.devboxer.com/task/c4610f93-377f-41b4-ab9a-63cd88cecee6
Summary by Sourcery
Add parity validation for interleaved push queue fairness and shared rate limiting via client streaming paths, and document completion of roadmap item 10.3.2.
New Features:
Enhancements:
Documentation: