Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): allow server to perform graceful shutdown under heavy ILP load #3361

Merged
merged 7 commits into from
May 18, 2023
108 changes: 94 additions & 14 deletions core/src/main/java/io/questdb/cairo/O3CopyJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -498,20 +498,29 @@ private static void copyTail(

final int commitMode = tableWriter.getConfiguration().getCommitMode();
if (commitMode != CommitMode.NOSYNC) {
boolean async = commitMode == CommitMode.ASYNC;
if (dstFixAddr != 0) {
ff.msync(dstFixAddr, Math.abs(dstFixSize), async);
// sync FD in case we wrote data not via mmap
if (dstFixFd != -1 && dstFixFd != 0) {
ff.fsync(Math.abs(dstFixFd));
}
}
if (dstVarAddr != 0) {
ff.msync(dstVarAddr, Math.abs(dstVarSize), async);
if (dstVarFd != -1 && dstVarFd != 0) {
ff.fsync(Math.abs(dstVarFd));
}
}
syncColumns(
columnCounter,
timestampMergeIndexAddr,
timestampMergeIndexSize,
srcDataFixFd,
srcDataFixAddr,
srcDataFixSize,
srcDataVarFd,
srcDataVarAddr,
srcDataVarSize,
dstFixFd,
dstFixAddr,
dstFixSize,
dstVarFd,
dstVarAddr,
dstVarSize,
srcTimestampFd,
srcTimestampAddr,
srcTimestampSize,
tableWriter,
ff,
commitMode
);
}

// unmap memory
Expand Down Expand Up @@ -684,6 +693,77 @@ private static void mergeCopy(
}
}

private static void syncColumns(
AtomicInteger columnCounter,
long timestampMergeIndexAddr,
long timestampMergeIndexSize,
int srcDataFixFd,
long srcDataFixAddr,
long srcDataFixSize,
int srcDataVarFd,
long srcDataVarAddr,
long srcDataVarSize,
int dstFixFd,
long dstFixAddr,
long dstFixSize,
int dstVarFd,
long dstVarAddr,
long dstVarSize,
int srcTimestampFd,
long srcTimestampAddr,
long srcTimestampSize,
TableWriter tableWriter,
FilesFacade ff,
int commitMode
) {
try {
boolean async = commitMode == CommitMode.ASYNC;
if (dstFixAddr != 0 && dstFixSize > 0) {
ff.msync(dstFixAddr, dstFixSize, async);
// sync FD in case we wrote data not via mmap
if (dstFixFd != -1 && dstFixFd != 0) {
ff.fsync(Math.abs(dstFixFd));
}
}
if (dstVarAddr != 0 && dstVarSize > 0) {
ff.msync(dstVarAddr, dstVarSize, async);
if (dstVarFd != -1 && dstVarFd != 0) {
ff.fsync(Math.abs(dstVarFd));
}
}
} catch (Throwable e) {
LOG.error()
.$("sync error [table=").utf8(tableWriter.getTableToken().getTableName())
.$(", e=").$(e)
.I$();
tableWriter.o3BumpErrorCount();
copyIdleQuick(
columnCounter,
timestampMergeIndexAddr,
timestampMergeIndexSize,
srcDataFixFd,
srcDataFixAddr,
srcDataFixSize,
srcDataVarFd,
srcDataVarAddr,
srcDataVarSize,
srcTimestampFd,
srcTimestampAddr,
srcTimestampSize,
dstFixFd,
dstFixAddr,
dstFixSize,
dstVarFd,
dstVarAddr,
dstVarSize,
0,
0,
tableWriter
);
throw e;
}
}

private static void updateIndex(
AtomicInteger columnCounter,
long timestampMergeIndexAddr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class LineTcpConnectionContext extends IOContext<LineTcpConnectionContext
private static final Log LOG = LogFactory.getLog(LineTcpConnectionContext.class);
private static final long QUEUE_FULL_LOG_HYSTERESIS_IN_MS = 10_000;
protected final NetworkFacade nf;
private final Authenticator authenticator;
private final DirectByteCharSequence byteCharSequence = new DirectByteCharSequence();
private final long checkIdleInterval;
private final long commitInterval;
Expand All @@ -67,7 +68,6 @@ public class LineTcpConnectionContext extends IOContext<LineTcpConnectionContext
private long lastQueueFullLogMillis = 0;
private long nextCheckIdleTime;
private long nextCommitTime;
private final Authenticator authenticator;

public LineTcpConnectionContext(LineTcpReceiverConfiguration configuration, LineTcpMeasurementScheduler scheduler, Metrics metrics) {
this.configuration = configuration;
Expand Down Expand Up @@ -333,13 +333,10 @@ protected final IOContextResult parseMeasurements(NetworkIOJob netIoJob) {
return IOContextResult.NEEDS_DISCONNECT;
}

if (!read()) {
if (peerDisconnected) {
return IOContextResult.NEEDS_DISCONNECT;
}
return IOContextResult.NEEDS_READ;
if (peerDisconnected) {
return IOContextResult.NEEDS_DISCONNECT;
}
break;
return IOContextResult.NEEDS_READ;
}
}
} catch (CairoException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public boolean readOnlySecurityContext() {
};
}

protected boolean handleContextIO() {
protected boolean handleContextIO0() {
switch (context.handleIO(noNetworkIOJob)) {
case NEEDS_READ:
context.getDispatcher().registerChannel(context, IOOperation.READ);
Expand Down Expand Up @@ -368,7 +368,7 @@ protected void waitForIOCompletion() {
// Guard against slow writers on disconnect
int maxIterations = 2000;
while (maxIterations-- > 0) {
if (!handleContextIO()) {
if (!handleContextIO0()) {
break;
}
LockSupport.parkNanos(1_000_000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected void assertGeoHash(int columnBits,
Assert.assertTrue(isWalTable(tableName));
}
recvBuffer = inboundLines;
handleContextIO();
handleContextIO0();
waitForIOCompletion();
closeContext();
mayDrainWalQueue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ public void testDisconnectedOnChallenge1() throws Exception {
runInAuthContext(() -> {
maxSendBytes = 0;
recvBuffer = AUTH_KEY_ID1 + "\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
Assert.assertNull(sentBytes);
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
Assert.assertNull(sentBytes);
maxSendBytes = -1;
handleContextIO();
handleContextIO0();
Assert.assertNull(sentBytes);
Assert.assertTrue(disconnected);
});
Expand All @@ -126,18 +126,18 @@ public void testDisconnectedOnChallenge2() throws Exception {
runInAuthContext(() -> {
maxSendBytes = 5;
recvBuffer = AUTH_KEY_ID1 + "\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
handleContextIO();
handleContextIO0();
Assert.assertEquals(maxSendBytes, sentBytes.length);
sentBytes = null;
Assert.assertFalse(disconnected);
handleContextIO();
handleContextIO0();
Assert.assertEquals(maxSendBytes, sentBytes.length);
sentBytes = null;
Assert.assertFalse(disconnected);
maxSendBytes = -1;
handleContextIO();
handleContextIO0();
Assert.assertNull(sentBytes);
Assert.assertTrue(disconnected);
});
Expand All @@ -158,7 +158,7 @@ public void testGoodAuthentication() throws Exception {
}
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand All @@ -175,7 +175,7 @@ public void testGoodAuthenticationFragmented1() throws Exception {
Assert.assertTrue(authSequenceCompleted);
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand All @@ -192,7 +192,7 @@ public void testGoodAuthenticationFragmented2() throws Exception {
Assert.assertTrue(authSequenceCompleted);
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand All @@ -209,7 +209,7 @@ public void testGoodAuthenticationFragmented3() throws Exception {
Assert.assertTrue(authSequenceCompleted);
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand All @@ -226,7 +226,7 @@ public void testGoodAuthenticationFragmented4() throws Exception {
Assert.assertTrue(authSequenceCompleted);
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us\\ midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand All @@ -243,7 +243,7 @@ public void testGoodAuthenticationFragmented5() throws Exception {
Assert.assertTrue(authSequenceCompleted);
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us\\ midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand All @@ -260,7 +260,7 @@ public void testGoodAuthenticationFragmented6() throws Exception {
Assert.assertTrue(authSequenceCompleted);
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us\\ midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand All @@ -286,7 +286,7 @@ public void testGoodAuthenticationFragmented7() throws Exception {

Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us\\ midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand Down Expand Up @@ -316,7 +316,7 @@ public void testGoodAuthenticationP1363() throws Exception {
}
Assert.assertFalse(disconnected);
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand Down Expand Up @@ -354,7 +354,7 @@ public void testIncorrectConfig() throws Exception {
try {
runInAuthContext(() -> {
recvBuffer = "weather,location=us-midwest temperature=82 1465839830100400200\n";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
waitForIOCompletion();
closeContext();
Expand Down Expand Up @@ -397,13 +397,13 @@ public void testJunkSignature() throws Exception {
public void testTruncatedKeyId() throws Exception {
runInAuthContext(() -> {
recvBuffer = "test";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
recvBuffer = "Key";
handleContextIO();
handleContextIO0();
Assert.assertFalse(disconnected);
recvBuffer = null;
handleContextIO();
handleContextIO0();
Assert.assertTrue(disconnected);
});
}
Expand Down Expand Up @@ -458,7 +458,7 @@ private boolean authenticate(String authKeyId,
}
byte[] signature = Base64.getEncoder().encode(rawSignature);
send(new String(signature, Files.UTF_8) + "\n" + extraData, fragmentSignature);
handleContextIO();
handleContextIO0();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
Expand All @@ -476,7 +476,7 @@ private byte[] readChallenge(boolean fragment) {
if (fragment) {
maxSendBytes = rnd.nextInt(10) + 1;
}
handleContextIO();
handleContextIO0();
if (null != sentBytes) {
if (null == challengeBytes) {
challengeBytes = sentBytes;
Expand Down Expand Up @@ -511,11 +511,11 @@ private void send(String sendStr, boolean fragmented) {
recvBuffer = sendStr.substring(nSent, nSent + n);
}
nSent += n;
handleContextIO();
handleContextIO0();
} while (nSent < sendStr.length());
} else {
recvBuffer = sendStr;
handleContextIO();
handleContextIO0();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private void testBrokenUTF8Encoding(boolean disconnectOnError) throws Exception
table + ",location=us-eastcoast temperature=80,hőmérséklet=25" + nonPrintable + ",hőmérséklet=23 1465839830102400200\n" +
table + ",location=us-westcost temperature=82 1465839830102500200\n";

handleContextIO();
handleContextIO0();
Assert.assertEquals(disconnectOnError, disconnected);
closeContext();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private void testInvalidSymbol(boolean disconnectOnError) throws Exception {
table + ",ip_address=Invalid IP address. cpu=13 1465839830101400200\n" +
table + ",ip_address=192.168.0.1 cpu=42 1465839830100500200\n";

handleContextIO();
handleContextIO0();
Assert.assertEquals(disconnectOnError, disconnected);
closeContext();

Expand Down