fix: resolve flaky incomplete-call errors in sequential quorum calls#292
fix: resolve flaky incomplete-call errors in sequential quorum calls#292
Conversation
Guard requeuePendingMsgs so only the goroutine that actually clears the current stream can requeue pending requests. This prevents a stale receiver from observing an error on an old stream, stealing requests already registered on a newer stream, and making their real replies unroutable.
Add TestSystemLocalDispatchContention to stress sequential quorum calls where an early-returning call is immediately followed by an All call on the same configuration. Add TestSystemLocalDispatchContentionSlowReplica to verify that a delayed local replica does not prevent a new quorum call from making progress on remote replies and eventually completing once the local reply arrives.
Replace bare channel sends with a double-select delivery pattern that attempts a non-blocking send first, falling back to a blocking send-or-cancel. This prevents responses from being silently dropped when a request's context is canceled around the same time a response arrives. Affected call sites: RouteResponse, replyError, dispatchLocalRequest.
Replace the separate detachStream method with a closure returned by attachStream that uses CompareAndSwap to detach only if the channel is still the one that was attached. This prevents a stale cleanup goroutine from an older stream from detaching a replacement channel that a reconnecting peer has already installed.
When a per-request cancel watcher fires after Send returns, clearStream may succeed on a stream that already has newer pending requests. Without requeueing, those newer requests are stranded with no response. Fix by calling requeuePendingMsgs when clearStream returns true, ensuring requests queued after the cancelled send are requeued rather than lost. Add TestChannelLateCancelWatcherRequeuesPending with a deterministic fake context (lateAfterFuncContext) and stream (lateCancelStream) to reproduce the race precisely and verify the fix.
This moves server-side sequence number helpers to the stream package and adds helper functions for checking sequence number direction and generating new sequence numbers for the server-side.
Add RouteMessage to MessageRouter as the single entry point for all messages received on the bidirectional stream. It handles three cases internally: - Server-initiated calls are short-circuited before acquiring the pending lock. HandleRequest is called in a new goroutine with a send closure that enqueues replies through the channel. - Client-initiated calls that match a pending call deliver the response. The response allocation is deferred until after the pending map lookup confirms a matching call. - Unmatched client-initiated calls (stale responses from cancelled calls) are silently dropped. Remove the variadic dispatch parameter from RouteResponse, collapsing its server-initiated branch to return isServerSequenceNumber(msgID). Remove Channel.dispatchRequest and simplify receiver() to a plain RouteMessage call statement, dropping the now-meaningless bool return and the surrounding if/continue scaffolding. Update tests to reflect the simplified API and rename subtests to describe observable behaviour rather than return values.
Replace the old Channel.replyError helper with the a new one scoped to the Request: Request.replyError(nodeID, err).
Move the in-process local dispatch logic from Channel.dispatchLocalRequest to a new method MessageRouter.DispatchLocalRequest, passing local nodeID as an argument from the call site in Channel.Enqueue. Move localMu from Channel to MessageRouter, where it now lives alongside the handler it serializes. Remove the RequestHandler() accessor method, which is no longer needed since DispatchLocalRequest has direct field access. Remove the corresponding TestRouterRequestHandler test.
|
|
Overall Grade |
Security Reliability Complexity Hygiene |
Code Review Summary
| Analyzer | Status | Updated (UTC) | Details |
|---|---|---|---|
| Go | Mar 21, 2026 4:31p.m. | Review ↗ | |
| Shell | Mar 21, 2026 4:31p.m. | Review ↗ |
There was a problem hiding this comment.
Pull request overview
This PR addresses the flaky incomplete call (errors: 0) failures seen in sequential quorum calls by fixing multiple races in the internal stream/channel routing and cleanup logic, and by adding targeted regression/stress tests to ensure sequential quorum-call FIFO behavior remains correct under contention and cancellation.
Changes:
- Hardened stream/channel lifecycle handling to prevent stale goroutines (receiver / cancel watcher / detach cleanup) from disrupting current streams and pending requests.
- Centralized and simplified message routing (
RouteMessage), moved local dispatch serialization intoMessageRouter, and made response delivery context-aware to avoid deadlocks/dropped responses. - Added regression and stress tests to reproduce prior races and validate sequential quorum calls under local dispatch contention.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
internal/stream/channel.go |
Adds request-scoped delivery helpers, makes clearStream CAS-like (bool return), updates cancellation/requeue behavior, and routes all inbound messages through RouteMessage. |
internal/stream/channel_test.go |
Adds regression tests for stale receiver requeueing, late cancel watcher behavior, and correct dispatch/dropping rules for unknown messages. |
internal/stream/router.go |
Introduces RouteMessage, moves local dispatch + serialization (localMu) into MessageRouter, and uses context-aware delivery. |
internal/stream/router_test.go |
Expands router tests for server-initiated ID handling and cancellation-safe delivery behavior. |
internal/stream/message_id.go |
Centralizes server-initiated sequence-number helpers. |
internal/stream/server.go |
Updates server stream loop commentary/behavior around routing server-initiated responses. |
node.go |
Changes attachStream to return a CAS-guarded detach closure to prevent stale cleanup detaching replacement channels. |
inbound_manager.go |
Uses stream.ServerSequenceNumber and integrates the new detach closure semantics to avoid stale cleanup effects. |
inbound_manager_test.go |
Adds a regression test ensuring stale cleanup does not detach a replacement channel. |
system_test.go |
Adds stress/regression tests for sequential quorum calls under local dispatch contention and slow local replica behavior. |
.vscode/gorums.txt |
Adds subtest to workspace dictionary. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // handler is called in a goroutine; give it a moment. | ||
| time.Sleep(10 * time.Millisecond) | ||
| if !handler.called { | ||
| t.Error("handler was not called for server-initiated message") | ||
| } |
There was a problem hiding this comment.
handler.called is written from the handler goroutine and read here without synchronization, which is a data race under -race. Also, the time.Sleep makes the test timing-dependent. Prefer a deterministic signal (e.g., a channel in the handler, or atomic.Bool/mutex + eventually wait) so the test can wait for the handler to run without sleeping and without races.
| // handler is called in a goroutine; give it a moment. | |
| time.Sleep(10 * time.Millisecond) | |
| if !handler.called { | |
| t.Error("handler was not called for server-initiated message") | |
| } |
Summary: resolve flaky incomplete-call errors in sequential quorum calls
Fixes #291.
Repeated sequential quorum calls with
.All()on a configuration could intermittently fail withincomplete call (errors: 0). That is, none of the nodes in the configuration returned an error response, but the caller's context expired before all responses were delivered; two responses were always received, but the third response never arrived indicating a deadlock scenario in the stream/channel subsystem. The failure is non-deterministic, and required thousands of iterations to reproduce.Root cause analysis identified several independent races and hazards in the stream/channel subsystem:
Sendreturned could callclearStreamon a stream that already had newer pending requests queued, stranding those requests with no response.Channel, causing a race between the outbound sender and an incoming local reply when the first call had already been cancelled by the caller.Changes
Bug fixes
fix(stream): avoid stale receiver requeueing current requestsGuard
requeuePendingMsgsso only the goroutine that actually clears thecurrent stream can requeue pending requests.
fix(channel): late cancel watcher requeues pending requestsclearStreamnow returns a bool indicating whether it was the one thatactually cleared the stream.
The cancel watcher calls
requeuePendingMsgswhenclearStreamreturnstrue, preventing requests queued after the cancelled send from beingstranded.
fix(node): return CAS-guarded detach closure from attachStreamReplace the separate
detachStreammethod with a closure returned byattachStreamthat usesCompareAndSwapto detach only if the channel isstill the one that was attached.
fix(stream): context-aware response deliveryReplace bare channel sends with a double-select delivery pattern (non-blocking
attempt first, then blocking send-or-cancel) at all three delivery sites:
RouteResponse,replyError, anddispatchLocalRequest.fix(router): RouteResponse to handle server-initiated calls directlyServer-initiated (back-channel) responses are handled before acquiring the
pending-map lock, preventing contention on inbound server calls.
fix(router): return on server-initiated messages in RouteMessageRouteMessageshort-circuits on server-initiated sequence numbers beforedoing any pending-map work.
Refactoring
refactor(stream): replace Channel.replyError w/ Request.replyErrorError reply is now a method on
Request, scoped to the request rather thanthe channel, and uses the new context-aware delivery pattern.
refactor(stream): centralize back-channel dispatch in MessageRouterRouteMessageis the single entry point for all messages received on thebidirectional stream.
The variadic dispatch parameter is removed from
RouteResponse.Channel.dispatchRequestis removed;receiver()is simplified to a plainRouteMessagecall.refactor(stream): move DispatchLocalRequest to MessageRouterThe in-process local dispatch logic moves from
Channel.dispatchLocalRequestto
MessageRouter.DispatchLocalRequest, andlocalMumoves with it.The
RequestHandler()accessor is removed.chore(stream): move server sequence number handling to streamServer-side sequence number helpers (
isServerSequenceNumber,newServerSequenceNumber) are consolidated in thestreampackage.Tests
test: add local dispatch contention regression testsTestSystemLocalDispatchContentionstress-tests sequential quorum calls(
Majority→AllandFirst→All) with 500 iterations, including aGOMAXPROCS(1)subcase to amplify scheduling hazards.TestSystemLocalDispatchContentionSlowReplicaverifies that a delayed localreplica does not prevent a subsequent
Allcall from making progress onremote replies.
fix(stream)commits add unit tests:TestChannelLateCancelWatcherRequeuesPendinguses a deterministic fake contextand stream to reproduce the late-cancel race precisely.
New
router_test.gosubtests cover the simplifiedRouteMessageAPI anddescribe observable behaviour rather than return values.
Files changed
internal/stream/channel.golocalMu; addedRequest.deliver/replyError;clearStreamreturns bool; context-aware deliveryinternal/stream/channel_test.gointernal/stream/router.goRouteMessage; movedDispatchLocalRequest; removed variadic dispatchinternal/stream/router_test.gointernal/stream/message_id.gointernal/stream/response.gointernal/stream/server.gonode.goattachStreaminbound_manager.goMessageRouterAPIinbound_manager_test.gosystem_test.go