Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3072,6 +3072,18 @@ private void sealAndSwapBuffer() {
cursorEngine.appendBlocking(toSend.getBufferPtr(), toSend.getBufferPos());
toSend.markRecycled();
} catch (Throwable t) {
// appendBlocking failed synchronously on the user thread — the
// payload never reached the engine, so no I/O thread will
// recycle toSend. Recycle it here so a later flush can swap
// back to it; flushPendingRows aborts its post-enqueue state
// updates after this throw, so the source rows and the
// sent-schema watermark stay intact and the next batch re-emits
// the same rows along with the full schema + symbol-dict delta.
if (toSend.isSending()) {
toSend.markRecycled();
} else if (toSend.isSealed()) {
toSend.rollbackSealForRetry();
}
// Surface any I/O thread error first — appendBlocking itself only
// throws on PAYLOAD_TOO_LARGE / backpressure deadline, but the
// I/O loop can have failed independently.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,17 @@ public void register(SegmentRing ring, String dir, AckWatermark watermark) {
}
}
ring.setManagerWakeup(this::wakeWorker);
// Nudge the worker so it picks up the new ring on its very next
// iteration. Without this, register-after-start has a race window:
// start() schedules the worker thread, and if that thread reaches
// workerLoop and takes `lock` before this method does, it observes
// an empty `rings` snapshot, services nothing, then parkNanos
// (potentially seconds). A new ring whose first append does not
// cross the high-water mark fires no producer-side wakeup either,
// leaving the ring without a spare for the full poll interval.
// wakeWorker is cheap (a single LockSupport.unpark) and a no-op
// when the worker has not been started yet.
wakeWorker();
}

public synchronized void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@
import io.questdb.client.cutlass.line.LineSenderException;
import io.questdb.client.cutlass.line.array.DoubleArray;
import io.questdb.client.cutlass.line.array.LongArray;
import io.questdb.client.cutlass.qwp.client.MicrobatchBuffer;
import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender;
import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine;
import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer;
import io.questdb.client.std.Decimal128;
import io.questdb.client.std.Decimal256;
import io.questdb.client.std.Decimal64;
import io.questdb.client.std.bytes.DirectByteSlice;
import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer;
import org.junit.Assert;
import org.junit.Test;

import java.lang.reflect.Field;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak;

Expand Down Expand Up @@ -326,6 +331,54 @@ public void testDoubleColumnAfterCloseThrows() throws Exception {
});
}

@Test
public void testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse() throws Exception {
assertMemoryLeak(() -> {
int port = TestPorts.findUnusedPort();
try (TestWebSocketServer server = new TestWebSocketServer(port, new TestWebSocketServer.WebSocketServerHandler() {
})) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

// Memory-only engine with a 33-byte budget and a 1 ns append
// deadline guarantees every appendBlocking() call trips the
// backpressure deadline and throws.
CursorSendEngine engine = new CursorSendEngine(null, 33, 33, 1L);
QwpWebSocketSender sender = QwpWebSocketSender.connect(
"localhost", port, null, Integer.MAX_VALUE, 0, 0L, null,
QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, false, engine, 0L);
try {
sender.table("t").longColumn("v", 1L).atNow();

try {
sender.flushAndGetSequence();
Assert.fail("Expected LineSenderException");
} catch (LineSenderException e) {
Assert.assertTrue(e.getMessage().contains("cursor SF append failed"));
}

MicrobatchBuffer buffer0 = getMicrobatchBuffer(sender, "buffer0");
MicrobatchBuffer buffer1 = getMicrobatchBuffer(sender, "buffer1");
Assert.assertFalse(
"failed append must not leave any buffer in use [buffer0="
+ MicrobatchBuffer.stateName(buffer0.getState())
+ ", buffer1=" + MicrobatchBuffer.stateName(buffer1.getState()) + "]",
buffer0.isInUse() || buffer1.isInUse());
} finally {
// close() drains pending rows, which appendBlocking still
// rejects because the engine is permanently wedged in this
// test. The bug under test is about microbatch buffer
// state, not about close() being lenient toward residual
// unflushed rows — swallow the predictable rethrow here.
try {
sender.close();
} catch (LineSenderException ignored) {
}
}
}
});
}

@Test
public void testGeoHashColumnLongAfterCloseThrows() throws Exception {
assertMemoryLeak(() -> {
Expand Down Expand Up @@ -705,6 +758,12 @@ private static void assertClosed(Runnable r) {
}
}

private static MicrobatchBuffer getMicrobatchBuffer(QwpWebSocketSender sender, String fieldName) throws Exception {
Field field = QwpWebSocketSender.class.getDeclaredField(fieldName);
field.setAccessible(true);
return (MicrobatchBuffer) field.get(sender);
}

/**
* Creates a sender without connecting.
* For unit tests that don't need actual connectivity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,20 @@ public void testFirstSpareLandsBeforeFirstPoll() throws Exception {
TestUtils.assertMemoryLeak(() -> {
// pollNanos is intentionally long enough that the 5s park can be
// ruled out as the mechanism by which the first spare arrives.
// The worker thread enters workerLoop on start(), takes the lock,
// sees the just-registered ring with needsHotSpare()==true, and
// provisions the spare BEFORE parking. The spare must therefore
// land within seconds of register(), not minutes -- the 5s park is
// never reached on the first iteration.
// register() unparks the worker after publishing the new ring,
// so the worker re-iterates and provisions the spare even when
// its first loop snapshot ran before register() acquired `lock`.
// The spare must therefore land within seconds of register(),
// not minutes -- the 5s park is never reached.
//
// The append below is incidental to the contract under test; it
// does NOT cross the SegmentRing high-water mark for this 4-frame
// segment (HEADER_SIZE 24 + FRAME_HEADER_SIZE 8 + 16 = 48 vs
// signalAtBytes = (120 >> 2) * 3 = 90), so no producer-side wakeup
// fires. The rotation/high-water wakeup paths are covered by
// testRotationWakeupTriggersImmediateSparePrep.
// testRotationWakeupTriggersImmediateSparePrep, and the
// deterministic register-after-park case is covered by
// testRegisterAfterWorkerParkedWakesWorker.
long pollNanos = 5_000_000_000L; // 5 seconds
long segSize = MmapSegment.HEADER_SIZE
+ 4 * (MmapSegment.FRAME_HEADER_SIZE + 16);
Expand All @@ -256,6 +258,40 @@ public void testFirstSpareLandsBeforeFirstPoll() throws Exception {
});
}

@Test
public void testRegisterAfterWorkerParkedWakesWorker() throws Exception {
TestUtils.assertMemoryLeak(() -> {
// Deterministic version of testFirstSpareLandsBeforeFirstPoll:
// sleep between start() and register() long enough for the worker
// to definitely complete its first (empty) iteration and enter
// parkNanos. Without register()'s wakeWorker() the spare would
// not land for the full 5s poll interval; with it the spare lands
// promptly because register() unparks the worker out of its park.
// No append at all, so no producer-side wakeup can mask a missing
// register-side wakeup.
long pollNanos = 5_000_000_000L; // 5 seconds
long segSize = MmapSegment.HEADER_SIZE
+ 4 * (MmapSegment.FRAME_HEADER_SIZE + 16);
MmapSegment seg0 = MmapSegment.create(tmpDir + "/0000000000000000.sfa", 0, segSize);
try (SegmentRing ring = new SegmentRing(seg0, segSize);
SegmentManager mgr = new SegmentManager(segSize, pollNanos)) {
mgr.start();
// Give the worker plenty of time to enter workerLoop, snapshot
// an empty rings list, and reach parkNanos. 250ms is far more
// than the OS scheduling + thread startup cost on any sane
// CI runner, and still well below the 5s poll interval.
Thread.sleep(250);
long t0 = System.nanoTime();
mgr.register(ring, tmpDir);
assertTrue("register must wake a worker that has already parked",
waitFor(() -> !ring.needsHotSpare(), 2000));
long elapsedMs = (System.nanoTime() - t0) / 1_000_000L;
assertTrue("spare arrived in " + elapsedMs + "ms -- should be <<5000ms",
elapsedMs < 4000);
}
});
}

@Test
public void testRotationWakeupTriggersImmediateSparePrep() throws Exception {
TestUtils.assertMemoryLeak(() -> {
Expand Down
Loading