fix(qwp): fix ingestion stalls after append failures and on sender startup#32
Merged
Conversation
When CursorSendEngine.appendBlocking() throws inside sealAndSwapBuffer(), the catch block now puts the sealed buffer back into a state isInUse() reports as false: markRecycled() when it is SENDING, rollbackSealForRetry() when it is still SEALED. Without this, the buffer stayed in SENDING forever. No I/O thread ever recycles a buffer the engine never accepted, so the next flush would wait the 30 s recycle timeout and throw "Timeout waiting for buffer to be recycled". The encoded payload is dropped, but flushPendingRows bails out of its post-enqueue state updates after sealAndSwapBuffer throws, so the source rows and the sent-schema watermark stay intact and the next batch re-emits the same rows along with the full schema and symbol-dict delta. Adds testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse, the reproducer from questdb/questdb#7143. A memory-only CursorSendEngine configured to fail every append lets the test confirm both microbatch buffers leave the SENDING and SEALED states after a flush failure. Refs questdb/questdb#7143 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SegmentManager.register() now unparks the worker thread after publishing the new ring. Without this, register-after-start has a race: start() schedules the worker, and if that thread reaches workerLoop and takes `lock` before register() does, it snapshots an empty `rings`, services nothing, and parks for the full poll interval. A ring whose first append does not cross the high-water mark fires no producer-side wakeup either, so the spare never lands until the poll expires. testFirstSpareLandsBeforeFirstPoll fails on CI under JaCoCo on the mac-other runner whenever the worker wins the lock first; the prior fix (commit 19c5c65) only widened the budget to 2s, but the poll interval is 5s so no budget below 5s can rescue that ordering. The LockSupport.unpark is cheap, no-ops when the worker has not been started, and grants a permit that the next parkNanos consumes immediately, so it covers both interleavings. Adds testRegisterAfterWorkerParkedWakesWorker as a deterministic regression test: sleeps 250ms between start() and register() so the worker is guaranteed to have parked, then asserts the spare lands within 2s. Without the wakeWorker() call in register() this test fails reliably; with it, all 9 SegmentManagerTest cases pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
[PR Coverage check]😍 pass : 3 / 5 (60.00%) file detail
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Refs questdb/questdb#7143
This PR bundles two QWP fixes for the upcoming 1.3.1 client release. Both are small, localized, and validated by their own regression tests.
Microbatch buffer release on append failure
QwpWebSocketSender.sealAndSwapBuffer()left the microbatch buffer inSENDINGstate whenCursorSendEngine.appendBlocking()threw. No I/O thread ever recycles a buffer the engine never accepted, so the next flush would wait 30 s on the recycle latch and then throwTimeout waiting for buffer to be recycled.markRecycled()fromSENDING,rollbackSealForRetry()fromSEALED.flushPendingRows()aborts its post-enqueue state updates aftersealAndSwapBuffer()throws, so the source rows intableBuffersand the sent-schema watermark stay intact. The next batch re-emits the same rows together with the full schema and symbol-dict delta — matching the existing "self-sufficient frames" invariant the cursor SF pipeline already relies on.Segment manager wakeup on register
SegmentManager.register()now unparks the worker after publishing the new ring. Without the wakeup, if the worker thread acquires the manager lock beforeregister()does, it snapshots an emptyrings, services nothing, and parks for the full poll interval. A ring whose first append does not cross the high-water mark fires no producer-side wakeup either, so the spare never lands until the poll expires.wakeWorker()aftersetManagerWakeup().LockSupport.unparkis cheap, is a no-op when the worker has not been started, and grants a permit that the nextparkNanosconsumes immediately — both interleavings are covered.Tradeoffs
close()-after-failed-flush retries the drain. If the underlying engine is still wedged (e.g. permanent backpressure, like the deliberately-undersized engine in the new test), that retry fails again and surfaces as a separateLineSenderException. The test acknowledges this and swallows the close-time rethrow. Real-world transient backpressure is expected to clear between the user's flush call andclose(), in which case the drain succeeds.MicrobatchBuffer.markRecycled()/rollbackSealForRetry()andSegmentManager.wakeWorker()already existed.Test plan
QwpWebSocketSenderTest.testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse(the reproducer from QWP WS: cursor append failure leaves microbatch buffer stuck in SENDING questdb#7143). Fails onmainwithbuffer0=SENDING, buffer1=FILLINGand the 30 s recycle-timeout suppressed exception; passes in ~80 ms with this change.SegmentManagerTest.testRegisterAfterWorkerParkedWakesWorker. Sleeps 250 ms betweenstart()andregister()to guarantee the worker has parked on an emptyringssnapshot, then asserts the spare lands within 2 s. Without thewakeWorker()call this test fails reliably; with it, all 9SegmentManagerTestcases pass.QwpWebSocketSenderTestand the broader QWP client suite still pass.🤖 Generated with Claude Code