feat(http): add QWP egress for streaming SQL query results over WebSocket#6991
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughComprehensive implementation of QWP egress protocol for WebSocket-based query result streaming. Adds frame serialization, schema management, compression negotiation, result buffering, request decoding, HTTP/WebSocket processing infrastructure, and native zstd compression bindings via JNI. Removes TYPE_STRING constant, consolidating string wire representation to TYPE_VARCHAR. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
✨ Finishing Touches🧪 Generate unit tests (beta)
|
…e; move SERVER_INFO to 0x18 # Conflicts: # core/src/main/java/io/questdb/cutlass/qwp/codec/QwpEgressMsgKind.java # core/src/main/java/io/questdb/cutlass/qwp/server/egress/QwpEgressUpgradeProcessor.java # docs/QWP_EGRESS_EXTENSION.md # java-questdb-client
The Javadoc block on testNoResetMidStream was at column 0 instead of being aligned to the 4-space method indent. IntelliJ's formatter rewrites it, so the JDK17 CI lint step fails on the resulting diff. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts: # THIRD_PARTY_LICENSES.txt # core/src/main/resources/io/questdb/bin/darwin-aarch64/libquestdbr.dylib # core/src/main/resources/io/questdb/bin/darwin-x86-64/libquestdbr.dylib # core/src/main/resources/io/questdb/bin/linux-aarch64/libquestdbr.so # core/src/main/resources/io/questdb/bin/linux-x86-64/libquestdbr.so # core/src/main/resources/io/questdb/bin/windows-x86-64/questdbr.dll
Bumps the java-questdb-client submodule to pick up the QWP client-review Tier 1 fixes: decoder bounds and cap fixes, bind encoder NULL framing and per-width scale checks, geohash value masking, QwpBatchBuffer capacity-growth fix, idempotent client close, releaseBuffer/closePool race guard, and per-generation terminalFailure latches. The NULL framing change aligns the wire format with what the server's bind parser already expects, so DECIMAL64/128/256 and GEOHASH NULLs now decode without misframing the rest of the batch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts: # core/src/main/java/io/questdb/cutlass/qwp/websocket/WebSocketFrameWriter.java
# Conflicts: # benchmarks/pom.xml # core/pom.xml # core/src/main/java/io/questdb/cutlass/qwp/protocol/QwpConstants.java # core/src/main/resources/io/questdb/bin/windows-x86-64/questdbr.dll # core/src/test/java/io/questdb/test/cutlass/qwp/QwpStringDecoderTest.java # java-questdb-client
[PR Coverage check]😍 pass : 2471 / 2867 (86.19%) file detail
|
Resolved conflicts: - core/src/test/java/io/questdb/test/cairo/CreateTableTest.java: Took the union of imports added on both sides (PropertyKey, Rnd from vi_idx; CairoException, LPSZ from master). All four are used post-merge. - core/src/main/resources/io/questdb/bin/windows-x86-64/libquestdb.dll: Kept the vi_idx version. The master-side change (PR #6991, QWP egress) only rebuilt the DLL without C++ source changes, while vi_idx's DLL contains real native code changes for the posting index work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Brings in 14 upstream commits (master..errand merge-base 5037278 through b6b3b15). Notable upstream changes: * JDK 25 migration (#6980) — drops Java 8/11 reflection helpers (isJava8Or11, is32BitJVM, getOrdinaryObjectPointersCompressionStatus, AccessibleObject_override_fieldOffset, OVERRIDE constant) from Unsafe.java. Our errand branch had inherited them; they are dead code in our tree too (the only call site was the dropped helper itself). Followed upstream and removed them. * feat(http): QWP egress / WebSocket SQL streaming (#6991, #7004) * perf(sql): faster GROUP BY / hash join finalizer (#6997) * fix(sql): nested window inside aggregate (#6943, #6955) * feat(sql): configurable parquet export encodings (#6949) Conflict resolutions: * core/src/main/java/io/questdb/std/Unsafe.java Both sides added different methods between setRssMemLimit() and checkAllocLimit(). Kept all three: storeFence (upstream), chargeExternalRss (ours), and dropped the dead AccessibleObject_override_fieldOffset() helper per upstream's JDK 25 cleanup. * core/src/main/java/io/questdb/std/str/GcUtf8String.java Our errand branch redesigned this class for lazy native allocation (data byte[] field, allocateNative() on first ptr() call) instead of upstream's eager DirectByteBuffer + reflection-fetched address. Upstream's only post-divergence change was a tiny refactor replacing Unsafe.getUnsafe().X with Unsafe.X — irrelevant to our redesign. Took ours. * core/src/main/resources/io/questdb/bin/linux-x86-64/libquestdbr.so Built artifact — both sides changed Rust source. Took ours as placeholder; will refresh via `mvn package -pl core -am -DskipTests` immediately after this commit.
Tandem with questdb/questdb-enterprise#991 — must merge together; the Enterprise PR adds integration tests that exercise QWP egress auth and TLS through
QwpQueryClientonce this branch lands.Summary
/read/v1(also/api/v1/read) that streams SQL query results to clients in QWP's binary columnar wire format. SELECT in, columnar batches out.QUERY_REQUEST,RESULT_BATCH,RESULT_END,QUERY_ERROR,CANCEL,CREDIT,EXEC_DONE,CACHE_RESET).docs/QWP_EGRESS_EXTENSION.md.QwpQueryClient) lives in thejava-questdb-clientsubmodule; this PR bumps the submodule pointer and the Maven dependency to1.1.1-SNAPSHOT. See companion submodule PR.RESULT_BATCHbodies, negotiated at upgrade time viaX-QWP-Accept-Encoding. Server compresses in Rust (zstd reused from parquet2); client decompresses in C against libzstd v1.5.7 vendored as a git submodule.CACHE_RESETcontrol frame (see Connection-scope caps section) so long-lived connections cannot monotonically accumulate dict / schema state.qwp_egress_*namespace (see Metrics section).Wire format
QWP egress is one new HTTP endpoint, one new set of WebSocket message kinds, and the same column-data encoding the ingestion side uses. A typical exchange:
Schema-reference mode kicks in from batch 1 onwards so wide schemas don't repeat per batch. All 24 QuestDB column types are wire-format supported (
BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,STRING,SYMBOL,TIMESTAMP,TIMESTAMP_NANOS,DATE,UUID,LONG256,GEOHASH(all four storage widths),VARCHAR,IPv4,DECIMAL64,DECIMAL128,DECIMAL256,DOUBLE_ARRAY,LONG_ARRAY,CHAR,BINARY).NULL semantics inherit QuestDB's existing sentinel conventions (documented in spec §11.5). Notably:
NaNis the FLOAT/DOUBLE NULL sentinel,0(i.e.0.0.0.0) is the IPv4 NULL sentinel, and-1is the universal GEOHASH NULL sentinel across all storage widths.Flow control and control-plane frames
CANCEL— client asks the server to abort an in-flight query. Server flags the streaming state;streamResultsobserves the flag between batches and aborts withQUERY_ERRORcarryingSTATUS_CANCELLED.QwpQueryClient.cancel()is thread-safe and callable from any thread.CREDIT— byte-budgeted flow control. Client setsinitial_creditinQUERY_REQUEST; server streams at most that many result-payload bytes before parking. Client auto-replenishes by the size of each batch after the user's handler releases it. Zero credit means unbounded (the default — no CREDIT bookkeeping on either side). When credit is exhausted the server yields cooperatively (no thread park) and resumes on the next CREDIT frame.EXEC_DONE— ack for non-SELECT statements. DDL, INSERT, UPDATE, ALTER, DROP, CREATE TABLE, CREATE MAT VIEW, TRUNCATE, and every parse-time-executed statement all run through egress and return{op_type, rows_affected}instead of opening a result stream.CACHE_RESET— server-to-client control frame, emitted at a query boundary when the connection's SYMBOL dict or schema-fingerprint cache crosses its soft cap. Body is a singlereset_maskbyte (bit 0 = dict, bit 1 = schemas). Recipient flushes the indicated caches; the nextRESULT_BATCHdelta section starts atdeltaStart=0and, if the schemas bit was set, the next schema is shipped in full mode with a fresh id. See Connection-scope caps.On-wire compression
FLAG_GORILLA— TIMESTAMP / TIMESTAMP_NANOS / DATE columns carry a 1-byte encoding discriminator (0x00raw /0x01Gorilla). Server auto-picks Gorilla when the column has ≥ 3 non-null values and the delta-of-delta bitstream beats rawnonNull × 8. Unordered or jumpy columns fall back to raw. Saves ~80× on periodic timestamps (e.g. 10 ms tick data).FLAG_DELTA_SYMBOL_DICT— SYMBOL values are dedup'd into a connection-scoped dictionary. Each batch ships only the new entries in a per-message delta section; per-row payload is a varintconnId. Per-columnIntIntHashMap(native-key → connId) on eachQwpColumnScratchkeeps the per-row hot path to a single int probe; a bytes-keyed hedge inside the connection dict catches cross-column / cross-query duplicates (first-sight probe only, not per-row).FLAG_ZSTD— whole-batch zstd compression of the region after the prelude (msg_kind + request_id + batch_seq stay raw so the client I/O thread can dispatch on them without paying the decompress cost). Negotiated once at upgrade time viaX-QWP-Accept-Encoding: zstd;level=N, raw; the server echoes its choice inX-QWP-Content-Encoding. Level is a client hint; server clamps to[1, 9]because zstd levels 10+ drop to<20 MB/scompress speed and would let a slow/malicious client pin a worker thread. A specific batch ships raw when its compressed form isn't actually smaller — avoids zstd's header overhead dominating tiny batches. Connection-string keys:compression={zstd|raw|auto}(defaultauto= advertisezstd,raw),compression_level=N(default 3).Server side
core/.../cutlass/qwp/codec/— outbound codec: frame writer, schema writer, message-kind constants, type mapper, per-column native scratch, batch buffer, connection-scoped SYMBOL dict, Gorilla encoder.core/.../cutlass/qwp/server/egress/— HTTP handler, upgrade processor, per-connection processor state, request decoder, compression negotiator, metrics.HttpServer.addDefaultEndpointsnext to the existing/write/v4ingest route. Same HTTP worker pool, same TLS / auth surface, sameLocalValue<State>pattern.JsonQueryProcessorandExportQueryProcessoruse). Page-frame parallel execution underfactory.getCursor()continues to dispatch to QuestDB's shared work pool transparently — the egress encoder is the consumer of that already-parallel pipeline.executeNonSelectsynchronously awaits the operation's future, mapping the compiled-query type into theEXEC_DONEresponse.qdbrRust crate asqwp_zstd.rs. Reuses thezstdcrate already transitively linked via parquet2, so no new native dependency on the server side.ZSTD_CCtxis allocated lazily per connection on first compressed batch and reused across every batch; freed inQwpEgressProcessorState.close().resumeSendunconditionally flushes deferred bytes before checking streaming state — coversQUERY_ERRORframes parked afterendStreaming.streamingBatchSeqCommittedflag enforces "seq incremented before send commits bytes" as a runtime invariant (not just an assertion).onHeadersReady(write bytes to sink) +onRequestComplete(commit viarawSocket.send, may park) +resumeSend(finalise protocol switch after flush). Fixes a pre-existing partial-write bug under smallDEBUG_HTTP_FORCE_SEND_FRAGMENTATION_CHUNK_SIZE.mapErrorStatusdistinguishesCairoException.isCancellation→STATUS_CANCELLED,isInterruption/isOutOfMemory→STATUS_LIMIT_EXCEEDED,isAuthorizationError→STATUS_SECURITY_ERROR,SqlException→STATUS_PARSE_ERROR, elseSTATUS_INTERNAL_ERROR.SYMBOLbind type code and routes it throughSTRING.batchBuffer.beginBatchand the final send (cursor exception, scratch-grow OOM, transient encode failure), the server rolls the connection SYMBOL dict back to its pre-batch size viaQwpResultBatchBuffer.rollbackCurrentBatch()before the catch(Throwable) shipsQUERY_ERROR. Without the rollback, partially-committed dict entries would never reach the wire, and a subsequent query that dedup'd against the same bytes would emit row payload referencing an id the client was never taught.Connection-scope caps and
CACHE_RESETLong-lived connections accumulate two forms of connection-scoped state: the SYMBOL delta-dict (heap bytes + entry count) and the schema-fingerprint cache. Under adversarial or just high-cardinality workloads, both can grow large enough to matter for fleet memory budgets. The server enforces soft caps and signals the client when it flushes.
DEFAULT_MAX_EGRESS_DICT_ENTRIES = 100_000— entry count soft cap.DEFAULT_MAX_EGRESS_DICT_HEAP_BYTES = 8 MiB— UTF-8 heap soft cap.DEFAULT_MAX_EGRESS_SCHEMAS_PER_CONNECTION = 4_096— schema-fingerprint cap (tighter than the inherited ingress cap at65_535because egress state is fully server-owned and reusable).At every query boundary (just after decoding the next
QUERY_REQUEST), the server computes a reset mask from the current cache sizes; if non-zero, it emits aCACHE_RESETframe and callsstate.applyCacheReset(mask). The client'sQwpEgressIoThreadreceives the frame and callsQwpResultBatchDecoder.applyCacheReset(mask), clearingconnDictSize/connDictHeapPosand / or the schema registry so the followingRESULT_BATCHdelta section starts atdeltaStart=0. Because the reset happens between queries, no in-flightRESULT_BATCHreferences an id that survives the flush.Per-query scratch buffers on
QwpColumnScratchalso shrink on query boundary: the scratch tracks the peak observed footprint across the query's batches, and atresetForNewQuerytrims any buffer whose capacity has outgrown the peak by more than 4x. A one-off wide batch no longer permanently retains its peak-sized native allocation on a long-lived connection.QwpEgressProcessorStateexposes test-only static overrides for all three caps so tests can trip resets at tiny entry counts without stuffing the connection with millions of rows. Production code never touches them.Metrics
Prometheus-scrapable counters and gauges are exposed under the
qwp_egress_*namespace (same pattern asjson_queries_*,pgwire_*). All counters are no-ops whenmetrics.enabled=false(the default); setQDB_METRICS_ENABLED=trueto turn them on.qwp_egress_connectionsfinalizeHandshake, decrements inonConnectionClosed.qwp_egress_queries_startedQUERY_REQUESTsuccessfully decoded.qwp_egress_queries_erroredQUERY_ERRORfor any reason other than explicit cancellation.qwp_egress_queries_cancelledQUERY_ERRORwithSTATUS_CANCELLED(either clientCANCELor server-side cancel).qwp_egress_batches_sentRESULT_BATCHframes committed to the wire.qwp_egress_bytes_sentRESULT_BATCHframes (post-compression whenFLAG_ZSTDis set, pre-WebSocket-framing in all cases).qwp_egress_bytes_zstd_savedFLAG_ZSTDactually shipped.qwp_egress_rows_streamedRESULT_BATCH.qwp_egress_cache_reset_dictCACHE_RESETframes emitted with the dict bit set.qwp_egress_cache_reset_schemasCACHE_RESETframes emitted with the schemas bit set.All metrics piggyback on the existing
GET /metricsPrometheus endpoint and are exported viaMetrics.scrapeIntoPrometheus. Dashboard entry points:rate(qwp_egress_rows_streamed[1m]),rate(qwp_egress_bytes_sent[1m]).rate(qwp_egress_queries_errored[5m])/rate(qwp_egress_queries_started[5m]).qwp_egress_connections(current),rate(qwp_egress_cache_reset_dict[1h])(per-hour cap trips as a leading indicator of long-lived connections accumulating dict state).rate(qwp_egress_queries_cancelled[5m]).Client side
java-questdb-client/core/src/main/c/share/zstd. CMake walks the upstreamlib/common/+lib/decompress/subset only (compress isn't linked — the client decodes).zstd_jni.csits alongside (not inside) the submodule so upstream resets don't touch the JNI glue.ZSTD_DISABLE_ASMis set on non-x86 platforms; x86_64 links the hand-tuned Huffman decoder.ZSTD_DCtxallocated lazily perQwpResultBatchDecoderinstance (one per I/O thread) and reused across every batch. Decompression scratch grows on demand up to a 64 MiB cap — the cap prevents a hostile/corrupted frame advertising a huge content size from forcing unbounded allocation.QwpQueryClient.connect()— if the user didn't setcompression=raw, the client allocates + frees aZSTD_DCtxbefore starting the I/O thread. Catches mismatched client jars (new server, old client built without the zstd submodule) with a clear error on the user thread rather than surfacing theUnsatisfiedLinkErrormid-stream through the batch handler's error callback. The decoder carries a second guard that translates the same mismatch to aQwpDecodeExceptionif aFLAG_ZSTDframe ever reaches it outside the probe's reach.CACHE_RESEThandler on the I/O thread flushes the connection-scoped dict and / or schema registry when the server signals a reset. No user-visible event — the reset frame is always followed by a freshRESULT_BATCHwhosedeltaStartaligns with the post-resetconnDictSize.Allocation footprint
The codec is structured so per-batch allocations amortise to zero after warmup:
QwpResultBatchBufferaccumulates rows column-by-column into per-columnQwpColumnScratchobjects backed by native memory (one per column, grown to the max observed batch size and reused).Long/Doubleper cell.Decimal128/Decimal256sinks are reused per column, not allocated per row.IntIntHashMapkeyed on the native symbol-table int; on first sight per-connection per column, oneStringallocation and oneUtf8Stringkey into the shared connection dict. Amortised: hundreds of bytes per unique symbol value per connection, zero per row.QwpEgressColumnDef.of(...)and reused for both schema-size estimation and emit.QwpEgressRequestDecoderreuses aDirectUtf8Stringview +StringSinkscratch for bind parameters.(offset, length)entries — a single 64-bit load + two int extractions per lookup, noDirectUtf8Stringper entry and noObjList.getQuickon the hot path.sendQueryErrorUTF-8 encodes the message directly into the wire buffer from aCharSequence, bypassing the intermediateString+byte[].QwpColumnScratch.resetForNewQuerytrims any buffer whose capacity is >4x the per-query peak, back to max(INITIAL_BYTES, 2x peak). A spike batch no longer permanently retains megabytes of native memory.Performance characteristics
/exec. Page-frame parallelism continues to fan out underneath. Credit-suspended streams cooperatively yield the worker back to the pool; no thread is blocked while a stream is parked waiting for CREDIT.benchmarks/QwpEgressReadBenchmark.java(narrow: ts/id/price/sym/note — 5 cols) andQwpEgressReadBenchmarkWide.java(wide: 15 cols including five 100k-cardinality symbols and five extra doubles) each ingest N rows then read them back via QWP egress, PostgreSQL wire, and HTTP/exec(JSON), printing rows/sec and MiB/sec for each. Results included in the PR discussion.Limitations / Phase 2 backlog
Documented in
docs/QWP_EGRESS_PHASE2_BACKLOG.md. Remaining items:streamResultscheck, status mapping, client API) is complete, but the IO dispatcher registers each fd for a single operation (READ xor WRITE). While the server is parked on WRITE during a streaming query, kernel-buffered CANCEL frames are dispatched only after the write unblocks — by then the query has often completed. Fixing this requires registering for both read and write during streaming, a dispatcher-level change.QwpResultCursorrow-iterator wrapper is a stretch goal; the column-batch handler API covers all functional cases today.Test plan
Nineteen test classes cover the egress surface. Breakdown:
QwpEgressBootstrapTest— end-to-end handshake, SELECT over every wire type, multi-batch streaming, malformed connect strings, syntax errors, table-not-found, etc. Includes a regression test for the empty-result + reused-schema case (review finding Ignore unsupported field types #2).QwpEgressTypesExhaustiveTest— min/max/zero/null boundary coverage for every wire type.QwpEgressFuzzTest— property-based fuzz: random schema, random per-row values generated in Java, four random query shapes (full scan / projection reorder / id-range filter / reverse limit), row-by-row verification. Each client connect picks a random compression mode (auto / raw / zstd at a random level in [1, 9]) so the handshake negotiation paths get hit across the run.QwpEgressFragmentationFuzzTest— stresses the state machines with both send- and recv-sideDEBUG_HTTP_FORCE_{SEND,RECV}_FRAGMENTATION_CHUNK_SIZEcapped at small random values. Includes a dedicated regression test for the handshake partial-write fix atchunk=5bytes.QwpEgressRequestDecoderTest— unit-level decoder coverage: no-binds, all scalar bind types, null, mixed.QwpEgressPageFrameTest— PageFrameCursor streaming, multi-frame, column-top handling.QwpEgressTimestampGorillaTest— ordered / unordered / DATE / TIMESTAMP_NANOS Gorilla coverage, per-column encoding-byte round-trips, batch-size boundaries (0/1/2/3 rows).QwpEgressSymbolEdgeCaseTest— Unicode, emoji, long values, empty strings, NULL, dict growth.QwpEgressDeltaSymbolDictTest— cross-batch and cross-query delta dict correctness, wire-byte savings checks.QwpEgressDdlExecTest— every non-SELECT path (CREATE, INSERT, UPDATE, ALTER, DROP, TRUNCATE, parse-time-executed).QwpEgressCancelTest— CANCEL frame plumbing +mapErrorStatusunit coverage for cancellation / interruption / OOM / auth / SQL / generic error paths.QwpEgressCreditFlowTest— credit-flow correctness (unbounded back-compat, tiny credit forcing many suspend/resume cycles, large result under 4 KiB budget, mixed credit across queries on one connection).QwpEgressErrorCoverageTest— runtime errors, type mismatches, peer disconnects mid-stream, handshake rejections.QwpEgressCompressionTest— zstd round-trip at level 1, default level, and clamped level 22; raw fallback; auto default over multi-batch streaming; compression +chunk=23fragmentation combined.ZstdTest— native JNI round-trip: highly-compressible runs, random bytes,CCtx/DCtxreuse across 100 buffers, level clamp.QwpQueryClientCloseTimeoutTest— clean-close path forwasLastCloseTimedOut()plus a reflection-injected timeout test that hijacks the I/O thread handle and shortensshutdownJoinMsto hit the leak-rather-than-SIGSEGV branch in ~150 ms.QwpEgressConnSymbolDictTest— unit coverage for the connection-scoped symbol dict: addEntry dedup, rollback semantics for mid-batch failure (review finding Event Appender #1), rollback boundary cases.QwpEgressProcessorStateCacheResetTest— unit coverage forcomputeCacheResetMaskandapplyCacheReset: each cap in isolation, exact-boundary triggers, no-op when under caps, unknown mask bits ignored, independence of dict vs schema caches.QwpEgressCacheResetWireTest— end-to-end coverage for theCACHE_RESETframe: dict-entry cap, dict-heap cap, schema cap; no-reset-under-defaults baseline; client dict stays consistent after a reset; in-flight streams are not disrupted by caps crossed mid-batch.QwpEgressMetricsScrapeTest— end-to-end Prometheus metric coverage: connection gauge open/close, query-started / errored / exec-done counters, batch/row/byte counters, scrape output format, disabled-metrics no-op path.Existing 358 QWP ingress unit tests remain green (no regressions on the ingestion side).
Reference benchmarks build clean against the local submodule (
mvn -pl benchmarks -P local-client compile); require a running QuestDB to execute end-to-end.🤖 Generated with Claude Code