feat(qwp): add deferred commit for transactional ingestion#7144
Merged
Conversation
Pulls in java-questdb-client #32 (Fix microbatch buffer stuck after cursor append failure) and updates every questdb.client.version property in the build to 1.3.1-SNAPSHOT so the parent build, the local-client profile, and downstream artifact metadata all reference the same release that carries the fix. Without the bump the parent would keep linking against the published 1.3.0 jar and the QWP WebSocket sender would still leave its microbatch buffer stuck in SENDING after a synchronous CursorSendEngine.appendBlocking() failure, surfacing as a 30 s "Timeout waiting for buffer to be recycled" on the next flush. Fixes #7143 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Picks up submodule commit ae09d47 "Wake segment-manager worker on register", which closes a pre-existing race in SegmentManager: register-after-start could leave the worker parked for the full poll interval when the worker took `lock` first. Unrelated to the microbatch-buffer fix this branch is built around -- the race surfaced on this PR's CI run only because of mac-other + JaCoCo scheduling jitter, but it has been latent in SF code since commit edbdc3a (feat(ilp): QWiP store-and-forward client buffer). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Large uncommitted WAL transactions from deferred QWP batches can cause memory pressure when the WAL apply job sorts them. Add a configurable per-table limit (qwp.max.uncommitted.rows, default 1M) that forces a mid-batch commit when exceeded. The existing cumulative ACK protocol handles this transparently. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add three tests covering gaps in deferred-commit E2E coverage: - testDeferredCommitMultipleCycles: two complete defer-commit cycles on the same connection, verifying state is clean between cycles - testDeferredCommitSchemaMismatchRollsBack: schema mismatch mid-deferred-sequence rolls back all accumulated WAL rows across tables, not just the current message - testDeferredCommitSymbolDictContinuity: SYMBOL columns with delta dictionary encoding work correctly across deferred message boundaries Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Points to 081d3ae which re-adds the public setDeferCommit API needed by QwpSenderE2ETest. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add the new qwp.max.uncommitted.rows property to the expected output in testShowParameters. Bump the client submodule to pick up the close() fix that respects the deferCommit flag, which fixes testDeferredCommitConnectionDropRollsBack. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
[PR Coverage check]😍 pass : 49 / 64 (76.56%) file detail
|
nwoolmer
added a commit
that referenced
this pull request
May 27, 2026
The previous run failed only on mac-other with QwpUdpMalformedTest.testTooShortDatagram saying "table does not exist [table=short_test]". The test passed 44/44 locally on linux and passed in PR 7148's mac-other run after the same #7144 deferred-commit change was already in master. The QwpUdpReceiver.close() does call tudCache.commitAllBestEffort() before returning, and the test then drainWalQueue() before the assertion, so the failure looks like a mac-only timing flake. Retrigger CI to confirm before chasing the upstream test as a real regression. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #7143
Tandem with questdb/java-questdb-client#32.
Summary
Adds
FLAG_DEFER_COMMIT(0x01) to the QWP ingress protocol. When aclient sets this flag on a message, the server appends rows to WAL
writers but skips the commit. The commit fires on the next message
without the flag. This allows clients to split a logical batch across
multiple messages that individually fit within the server's recv buffer,
removing the hard coupling between
http.recv.buffer.sizeand maximumtransaction size.
Server changes
QwpConstants: addFLAG_DEFER_COMMIT = 0x01.QwpIngressProcessorState: addisDeferCommit()to read the flagfrom the message header, and
clearMessageState()to resetper-message parsing state without rolling back WAL rows.
QwpIngressUpgradeProcessor.handleBinaryMessage(): skipcommit()when
FLAG_DEFER_COMMITis set; callclearMessageState()insteadof
clear()in the finally block so accumulated WAL rows survive.On error, the full
clear()still fires to roll back all deferredrows.
Client changes (java-questdb-client submodule)
QwpConstants: addFLAG_DEFER_COMMIT = 0x01.QwpWebSocketEncoder: addsetDeferCommit()to set/clear the flag.QwpBufferWriter/ all implementations: addpatchByte().QwpWebSocketSender.flushPendingRows(boolean deferCommit): whendeferCommitis true, the encoded message carriesFLAG_DEFER_COMMIT.When the batch exceeds
serverMaxBatchSize,flushPendingRowsSplit()sends each table as a separate deferred message with the last one
committing (or deferring, if the caller requested defer).
QwpWebSocketSender.sendCommitMessage(): sends an empty QWP message(0 table blocks, no defer flag) to trigger the server-side commit
when all pending rows were already sent via deferred auto-flushes.
Senderbuilder: addtransaction=on|offconnect-string key andtransactional(boolean)builder method. When enabled, auto-flushsends with
FLAG_DEFER_COMMITand only explicitflush()triggersthe commit.
Protocol spec update
FLAG_DEFER_COMMIT).handling, backward compatibility, and implementation recommendations.
tableCount=0for deferredmessages.
Benchmark
QwpEquitiesL1Benchmark: 10M rows of equities L1 market data(500 symbols, 15 exchanges, bid/ask/last prices, volumes) ingested
with
transaction=on— all rows stream via deferred auto-flushes andcommit once at the end.
Backward compatibility
Old servers ignore the unknown flag bit (it occupies a previously
reserved position). Each message commits independently — more commits
than intended, but no data loss. The feature degrades gracefully.
Error handling
calls
clear(), rolling back all accumulated WAL rows from theentire deferred group. The client must re-send the full sequence.
close()freestudCache, rollingback uncommitted WAL rows. The client's store-and-forward replays
all unacknowledged messages on reconnect.
Test plan
cd java-questdb-client && mvn -pl core test-- full client suitepasses
mvn -pl core compile -DskipTests-- server compilesQwpEquitiesL1Benchmarkagainst a local server, verify10M rows committed atomically on
flush()