Skip to content

feat(qwp): add QuestDB facade with pooled Sender and Query APIs#28

Merged
bluestreak01 merged 14 commits into
mainfrom
vi_api
May 23, 2026
Merged

feat(qwp): add QuestDB facade with pooled Sender and Query APIs#28
bluestreak01 merged 14 commits into
mainfrom
vi_api

Conversation

@bluestreak01
Copy link
Copy Markdown
Member

@bluestreak01 bluestreak01 commented May 21, 2026

Summary

  • New QuestDB handle pools both ingest (Sender) and query connections behind one configuration string. Construct once, share across threads.
  • Query API: db.executeSql(sql, handler) for no-bind one-shots; db.query().sql(...).binds(...).handler(...).submit() for parameterised queries returning a Completion you can await, await(timeout, unit), cancel, or isDone.
  • Sender API: db.borrowSender() (try-with-resources, close flushes and returns to pool) and db.sender() (thread-affine, zero borrow overhead for hot producer threads).
  • Pools are elastic with min / max / idleTimeoutMillis / maxLifetimeMillis. A single daemon housekeeper sweeps both pools.
  • Connect strings can carry pool tuning: sender_pool_min, sender_pool_max, query_pool_min, query_pool_max, acquire_timeout_ms, idle_timeout_ms, max_lifetime_ms. Explicit builder calls after fromConfig still win.
  • A unified connect string supports schema translation: http:: maps to ws:: on the query side and vice versa, so the common single-port deployment is one line.

Backwards compatibility

  • The existing Sender API is unchanged. Code using Sender.fromConfig(...) directly is not affected.
  • The new surface is opt-in via QuestDB.connect(...) or QuestDB.builder().

Tradeoffs

  • Each QuestDB handle owns one daemon housekeeper thread plus one dispatch thread per pooled query client. Defaults are min=1, max=4 per pool, so at idle one handle holds two server-facing connections.
  • db.query() is single-flight per thread by design (zero-GC steady state). One thread that needs multiple in-flight queries calls db.newQuery(), which allocates per call.

Test plan

  • mvn -pl core test
    • ConfigStringTranslatorTest -- schema translation, pool-key extraction, error paths.
    • SenderPoolTest -- borrow / return / exhaustion / thread affinity / elastic growth / idle reaping / availableSize invariants.
    • QuestDBBuilderTest -- validation, partial-build cleanup, connect-string pool keys.
    • QueryWorkerTest -- client() getter coverage.
  • End-to-end coverage (10 tests against an embedded server, with randomised send/recv fragmentation per run) lives in the companion questdb PR as QuestDBFacadeE2ETest.
  • Example usage in QuestDBExamples.java covers the common patterns.

@bluestreak01 bluestreak01 changed the title chore(qwp): new pooling API for the java QWP client feat(client): add QuestDB facade with pooled Sender and Query APIs May 21, 2026
bluestreak01 and others added 6 commits May 21, 2026 04:42
QwpWebSocketSenderTest reached into QwpWebSocketSender via
getDeclaredField/getDeclaredMethod for five members. Restoring
totalBufferedBytes after an accidental delete made the reflection
visible -- removing it altogether is the better fix.

QwpWebSocketSender changes:
- applyServerBatchSizeLimit promoted to @testonly public (production
  callers still use it; the annotation flags it as a test seam too).
- getPendingBytes promoted to @testonly public, deduped against the
  former private getter (the one internal caller at the auto-flush
  guard now binds to the public version).
- New @testonly public getters: getEffectiveAutoFlushBytes,
  getServerMaxBatchSize.
- New @testonly public setConnectedForTest(boolean) so tests can fake
  the post-handshake state without reflection.
- totalBufferedBytes restored as @testonly public (kept by the test
  and QwpTotalBufferedBytesBenchmark; otherwise dead).

QwpWebSocketSenderTest:
- Five reflection-based static helpers (~50 LOC) deleted.
- Three java.lang.reflect imports removed.
- All call sites now go through direct method invocation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
testLargeSegmentCountReopensInOrder used to assert wall-clock elapsed
< 5s as a proxy for "sortByBaseSeq stayed O(N log N)". The bound was
generous for any modern machine but tight enough to flake on loaded
Windows CI runners doing 2048 mmap-creates + 2048 mmap-opens.

Replace the timing assertion with a comparison-count assertion:
- SegmentRing tracks the total baseSeq comparisons performed by
  sortByBaseSeq in a static counter (one += per partition pass,
  bumping by 3 + (hi - lo - 1) to cover median-of-three plus the
  partition loop).
- @testonly public accessors getSortComparisons() / resetSortComparisons()
  let the test reset before the run and read after.
- testLargeSegmentCountReopensInOrder now asserts comparisons <
  5 * N * log2(N), which sits about 30x below the O(N^2) regression
  value and 1.5x above the expected O(N log N) count. Detects a true
  regression on first commit, deterministic across hardware.

Production cost is one volatile-free += per partition pass, dwarfed
by the mmap I/O the same path does on every segment.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three independent fixes to the new QWP pooling layer surfaced by code
review against the design/egress-api-ergonomics-review notes.

SenderPool.close() left ThreadLocal references on every thread that
had previously called pinToCurrentThread(). ThreadLocal#remove only
clears the calling thread's slot, so a subsequent db.sender() on a
different thread took the cached-pin early-return and handed back a
wrapper whose delegate had just been closed by close(). The next
operation on it dereferenced a closed native handle. Add an
invalidated flag on PooledSender that close() flips on every wrapper
under the lock; pinToCurrentThread() and releaseCurrentThread() check
the flag and drop the stale ThreadLocal slot so borrow() reports
"QuestDB handle is closed" instead.

PooledSender.close() returned broken wrappers to the pool when
delegate.flush() threw. The Sender contract intentionally does NOT
clear its buffer on flush failure (to permit retry), so the next
borrower inherited the failed rows; on WebSocket transport the
delegate itself was terminally broken. Route the failure path through
a new SenderPool.discardBroken() that marks the wrapper invalidated,
removes it from the pool under the lock, signals one waiter, and
closes the delegate outside the lock. The invalidated flag does
double duty: a broken thread-pinned slot is also rejected by
pinToCurrentThread() so the next db.sender() re-borrows a fresh slot.

QwpQueryClient.cancel() between submit() returning and executeOnce()
writing currentRequestId was a silent no-op: cancel() read -1 and
skipped the wire-send. The window covers worker-wakeup latency, bind
encoding (user code), and every failover retry's backoff. Latch a
pendingCancel flag in cancel() before reading currentRequestId;
execute() clears it once at the outermost entry; executeOnce() reads
it immediately after the currentRequestId assignment and re-issues
io.requestCancel() if set. The latch is intentionally not cleared
inside executeOnce(), so failover retries also honor the cancel.

Tests:
- SenderPoolTest: testPinAfterCloseRejectsStaleEntry,
  testReleaseAfterCloseIsSafe, testBrokenSenderIsNotReturnedToPool.
- QwpQueryClientUnitTest:
  testCancelDuringDispatchWindowLatchesPendingIntent,
  testExecuteEntryClearsStalePendingCancel, backed by a new
  @testonly isPendingCancelForTest seam.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
drain(timeoutMillis) flushes every buffered row and blocks until the
server has acknowledged the resulting frame, or until the caller's
timeout elapses. The shape mirrors the implicit drain that close()
already runs -- target = flushAndGetSequence(), then
awaitAckedFsn(target, timeoutMillis) -- but with the timeout chosen
at the call site instead of at builder time via
close_flush_timeout_millis.

Useful for callers that want different drain budgets in different
shutdown scenarios (long pre-batch checkpoint vs. tight per-iteration
cleanup), or that want to observe drain progress without closing the
sender. Pairs cleanly with close()'s implicit drain: drain(short)
returns false on timeout without throwing, the caller can take an
action (log, retry, switch to a different shutdown path), and a
subsequent close() still runs its own bounded drain through
close_flush_timeout_millis.

Default implementation lives on the Sender interface so every
transport picks it up:
- WebSocket/QWP: target = real FSN; awaitAckedFsn blocks until acked.
- HTTP / TCP / UDP (no FSN tracking): flushAndGetSequence returns -1,
  awaitAckedFsn(-1, _) returns true immediately. drain becomes "flush
  and report success" on these transports, which is the most honest
  thing a non-FSN transport can do.

PooledSender forwards to its delegate so pool callers see the
underlying Sender's semantics unchanged.

Tests (CloseDrainTest):
- testDrainBlocksUntilAckArrivesAndReturnsTrue: delayed-ACK server,
  drain(5000) must wait, return true; the subsequent close() is
  near-instant because the drain consumed the wait.
- testDrainReturnsFalseOnTimeoutAndSenderStillUsable: silent server,
  drain(200) returns false (not throws). Sender continues to accept
  writes after.
- testDrainNonZeroTimeoutOnFastServerReturnsImmediately: fast server,
  drain(5000) returns promptly -- the budget is an upper bound, not a
  floor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The old 5s default was a "tight" budget that worked for benchmark-shape
workloads (small batches, fast loopback, fresh server) but routinely
fell short on real ones: slow consumers, catch-up replicas, chunky
payloads against small server send buffers, GC pauses, network
weather. When the budget runs out, close() throws and silently drops
acked-but-unverified rows in memory mode -- which is a much worse
failure mode than "close took a few seconds longer than I expected".
The recent CI fragmentation work surfaced this concretely: a 25M-row
async-mode test hit the close drain timeout because the server
couldn't ACK the backlog within 5s; pinning the value to 60s gives the
backlog enough wall-clock to drain even under the adversarial chunk
sizes the fuzzer picks.

60s was chosen as the upper end of what feels "still bounded" without
being open-ended -- humans typing Ctrl-C will tolerate up to about a
minute of "shutdown in progress" before reaching for kill -9. The
existing escape hatches stay in place: set close_flush_timeout_millis=0
or -1 for fast close (data lost in memory mode, recovered by the next
sender in SF mode); set close_flush_timeout_millis to any positive
value for a custom budget; or, new in fd3f8bb, call
Sender.drain(timeoutMillis) explicitly before close() to choose the
wait per call-site instead of per-builder.

Test note: CloseDrainTest had two stale comments referring to the 5s
default by name; updated to 60s. Behaviour-wise the tests still rely on
"non-zero default, server takes ~800-1500 ms to ACK, close() must
wait at least half that" -- a wider default does not invalidate any
of those assertions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SenderPool.close()'s teardown loop iterated `all` outside the
ReentrantLock with the comment "Snapshot of underlying Senders to
close" -- but the code iterated the live ArrayList, not a snapshot.
Concurrently, discardBroken acquired the lock and called
all.remove(s) regardless of whether the pool was already shutting
down. ArrayList is not thread-safe; close()'s
for (int i = 0; i < all.size(); i++) loop reads `size` and indexed
elements without synchronisation while discardBroken's
fastRemove updates size and shifts elements down.

Reachable interleaving:

1. Application calls db.close() -> pool.close() on thread A.
2. Thread B is in a try-with-resources block holding a borrowed
   PooledSender. The block exits, PooledSender.close() runs.
3. A acquires the lock, sets closed=true, marks every wrapper
   invalidated, releases the lock, starts the teardown loop, closes
   B's delegate.
4. B's delegate.flush() throws (closed socket); broken=true routes
   to discardBroken.
5. discardBroken acquires the lock (A no longer holds it), removes
   B's wrapper from `all`, releases the lock, calls delegate.close()
   on B's already-closed delegate.

Outcomes range from IndexOutOfBoundsException out of close()
(propagating up the shutdown path), through native-handle leaks
when the iteration cursor and the remove shift miss each other,
through two threads simultaneously inside delegate.close() on the
same Sender (whose teardown frees native scratch and joins a
daemon -- not safe to enter from concurrent threads).

Fix has two parts:

- close() takes a snapshot under the lock
  (PooledSender[] snapshot = all.toArray(...)) and iterates the
  snapshot outside the lock. The teardown loop is now immune to
  concurrent mutation of `all`, regardless of whether mutators
  cooperate.

- discardBroken bails on `closed`. Once the pool is shutting down,
  close()'s snapshot loop owns the delegate close; mutating `all`
  here would race that iteration (with the snapshot in place that
  is no longer a correctness issue, but it is still wasted work)
  and the delegate.close() below would be a double-close. Add an
  if (closed) return; immediately under the lock; the read is
  serialised against close()'s closed=true write because both
  happen under `lock`.

Red test (testDiscardBrokenAfterCloseDoesNotMutatePool) drives the
scenario deterministically on a single thread: borrow a sender,
write a row so flush has something to send, pool.close() (which
closes the delegate), then sender.close() (whose flush throws,
broken=true, route to discardBroken). Before the fix
pool.totalSize() drops from 1 to 0 because discardBroken removed
the wrapper from `all` post-close; after the fix it stays at 1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@bluestreak01 bluestreak01 changed the title feat(client): add QuestDB facade with pooled Sender and Query APIs feat(qwp): add QuestDB facade with pooled Sender and Query APIs May 21, 2026
bluestreak01 and others added 7 commits May 22, 2026 13:55
QueryClientPool.createUnlocked() constructs a QwpQueryClient via
QwpQueryClient.fromConfig() and then calls client.connect(). The
construction step field-initialises QwpBindValues, which allocates
8192 NATIVE_DEFAULT bytes through NativeBufferWriter. When connect()
throws -- server unreachable, role mismatch (target=primary against
a REPLICA), TLS handshake error, etc. -- the client reference
escapes: the catch in acquire() (lines 126-133) only has handles
to inFlightCreations and the lock, and the pre-warm cleanup in the
constructor (lines 94-102) iterates `for (i < built)` over `all`,
which never received the failed worker. Every connect failure
during pool growth leaks one bindValues buffer; a flapping endpoint
accumulates the leak per attempt.

createUnlocked() now catches RuntimeException from connect(), calls
client.close(), and rethrows. QwpQueryClient.close() guards against
pre-connect state via closedFlag and runs bindValues.close() in a
finally block, so closing a never-connected client releases the
NativeBufferWriter scratch without touching the still-null ioThread
or webSocketClient.

QueryClientPoolLeakTest covers both the pre-warm and lazy-create
paths. It points a QwpQueryClient at a FakeStatusServer that
returns HTTP 421 and X-QuestDB-Role: REPLICA while the client
requests target=primary -- the endpoint walk rejects the only
option and connect() throws HttpClientException
deterministically. The test asserts that
Unsafe.getMemUsedByTag(NATIVE_DEFAULT) returns to the pre-failure
baseline. Before the fix both tests fail with an 8192-byte delta;
after the fix the delta is zero.

The QueryClientPool constructor and acquire() become public to
match SenderPool's testability surface (its constructor and
borrow() are already public). QueryWorker was already public.
pinToCurrentThread() caches the borrowed PooledSender in a
ThreadLocal (threadAffine) so subsequent db.sender() calls on the
same thread skip the borrow path. PooledSender.close() -- the
public Sender#close() the user reaches for in
try (Sender s = db.sender()) { ... } -- flushed and returned the
wrapper to the pool, but never cleared the caller's
ThreadLocal entry. A subsequent db.sender() on the same thread
would re-read the cached entry, see isInvalidated() = false, and
return the wrapper even though another consumer had since
borrowed the slot. Both consumers then wrote to the same
underlying Sender; ILP rows interleaved on the wire.

Reachable interleaving:

1. Thread A: db.sender() -> S, inUse=true, A's TL=S.
2. Thread A: S.close() -> inUse=false, S returned to pool;
   A's TL still references S.
3. Thread B: db.sender() -> borrow() polls S from `available`,
   markInUse=true, returns S to B.
4. Thread A: db.sender() -> TL.get() = S, !invalidated -> returns
   S. A and B now share S.

The earlier defensive idea of "also check pinned.isInUse() in
pinToCurrentThread()" only narrows the window: it catches the
case where A re-pins before B borrows, but once B's borrow has
set inUse=true the guard passes and the bug repeats.

Fix is structural: clear the pin in PooledSender.close() before
the slot becomes borrowable again.

- New package-private SenderPool.clearPinIfCurrent(s) removes the
  current thread's TL entry iff it currently references s.
- PooledSender.close() calls it in the finally block, BEFORE
  pool.giveBack(this) / pool.discardBroken(this). The order
  matters: if we cleared after giveBack, a concurrent borrower
  could grab the slot while the TL still pointed at the wrapper,
  and a re-pin on this thread would hand the (now in-use)
  wrapper back -- the same race this clear is meant to close.

clearPinIfCurrent is also called on the discardBroken path. There
it is redundant -- markInvalidated() means the existing
isInvalidated() check in pinToCurrentThread() would already clear
the TL on the next pin -- but uniformly clearing is simpler and
makes the close() contract symmetric across the broken/healthy
branches.

Behaviour change: db.sender() after s.close() on the same thread
no longer returns the same wrapper instance. It goes through
borrow() again and may return a different slot, or block on an
empty pool. This is the correct semantics -- close() releases
the pin; releaseSender() remains the documented release path
for code that wants to keep using the same wrapper without
flushing.

Red tests (in SenderPoolTest):

- testPinAfterUserCloseDoesNotShareWrapper: same-thread
  reproducer, pool size 1 so the in-between borrow
  deterministically receives the just-returned slot. Bug
  exposed at the wrapper-identity level without timing. Before
  fix, pinToCurrentThread() returns the (now in-use) wrapper;
  after fix, it tries to borrow from an empty pool and trips
  the 100ms acquireTimeout, throwing LineSenderException.

- testPinAfterUserCloseDoesNotShareWrapperCrossThread: mirrors
  the originally reported A/B scenario with two threads sequenced
  via CountDownLatch. Same identity assertion. Same green/red
  flip across the fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The parent questdb repo's PR title validator already accepts qwp as a
subtype, but the client repo's validator did not. Add it here so QWP-
scoped PRs can be titled consistently across the two repos.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@mtopolnik
Copy link
Copy Markdown
Contributor

[PR Coverage check]

😍 pass : 531 / 1043 (50.91%)

file detail

path covered line new line coverage
🔵 io/questdb/client/impl/PoolHousekeeper.java 0 38 00.00%
🔵 io/questdb/client/cutlass/qwp/client/ColumnView.java 0 3 00.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpColumnBatch.java 0 23 00.00%
🔵 io/questdb/client/cutlass/qwp/client/RowView.java 0 3 00.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpColumnBatchHandler.java 0 8 00.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpQueryClient.java 3 34 08.82%
🔵 io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java 1 9 11.11%
🔵 io/questdb/client/QuestDB.java 1 6 16.67%
🔵 io/questdb/client/Sender.java 1 5 20.00%
🔵 io/questdb/client/impl/PooledSender.java 33 115 28.70%
🔵 io/questdb/client/impl/QuestDBImpl.java 11 38 28.95%
🔵 io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java 11 34 32.35%
🔵 io/questdb/client/impl/QueryWorker.java 22 63 34.92%
🔵 io/questdb/client/impl/QueryImpl.java 41 115 35.65%
🔵 io/questdb/client/impl/QueryClientPool.java 47 122 38.52%
🔵 io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java 1 2 50.00%
🔵 io/questdb/client/impl/SenderPool.java 128 156 82.05%
🔵 io/questdb/client/QueryException.java 6 7 85.71%
🔵 io/questdb/client/impl/ConfigStringTranslator.java 149 174 85.63%
🔵 io/questdb/client/QuestDBBuilder.java 70 82 85.37%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentRing.java 4 4 100.00%
🔵 io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java 2 2 100.00%

@bluestreak01 bluestreak01 merged commit c4cda49 into main May 23, 2026
12 checks passed
@bluestreak01 bluestreak01 deleted the vi_api branch May 23, 2026 00:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants