diff --git a/benchmarks/src/main/java/org/questdb/PongMain.java b/benchmarks/src/main/java/org/questdb/PongMain.java index 59bccfce82aa..1d4528489e25 100644 --- a/benchmarks/src/main/java/org/questdb/PongMain.java +++ b/benchmarks/src/main/java/org/questdb/PongMain.java @@ -87,7 +87,7 @@ public void close() { LOG.info().$("closed").$(); } - public void receivePing() { + public void receivePing() throws PeerIsSlowToWriteException, PeerIsSlowToReadException, ServerDisconnectException { // expect "PING" int n = Net.recv(getFd(), buf, (int) (bufSize - (buf - bufStart))); if (n > 0) { @@ -97,7 +97,7 @@ public void receivePing() { // accrue protocol artefacts while they still make sense buf += n; // fair resource use - getDispatcher().registerChannel(this, IOOperation.READ); + throw registerDispatcherRead(); } else { // reset buffer this.buf = bufStart; @@ -105,57 +105,65 @@ public void receivePing() { LOG.info().$(flyweight).$(); Utf8s.strCpy(PONG, PONG.size(), bufStart); writtenLen = PONG.size(); - getDispatcher().registerChannel(this, IOOperation.WRITE); + throw registerDispatcherWrite(); } } else { - getDispatcher().disconnect(this, DISCONNECT_REASON_PROTOCOL_VIOLATION); + throw registerDispatcherDisconnect(DISCONNECT_REASON_PROTOCOL_VIOLATION); } } else { // handle peer disconnect - getDispatcher().disconnect(this, DISCONNECT_REASON_PEER_DISCONNECT_AT_RECV); + throw registerDispatcherDisconnect(DISCONNECT_REASON_PEER_DISCONNECT_AT_RECV); } } - public void sendPong() { + public void sendPong() throws PeerIsSlowToReadException, PeerIsSlowToWriteException, ServerDisconnectException { int n = Net.send(getFd(), buf, (int) (writtenLen - (buf - bufStart))); if (n > -1) { if (n > 0) { buf += n; if (buf - bufStart < writtenLen) { - getDispatcher().registerChannel(this, IOOperation.WRITE); + throw registerDispatcherWrite(); } else { flyweight.of(bufStart, bufStart + writtenLen); LOG.info().$(flyweight).$(); buf = bufStart; writtenLen = 0; - getDispatcher().registerChannel(this, IOOperation.READ); + throw registerDispatcherRead(); } } else { - getDispatcher().registerChannel(this, IOOperation.WRITE); + throw registerDispatcherWrite(); } } else { // handle peer disconnect - getDispatcher().disconnect(this, DISCONNECT_REASON_PEER_DISCONNECT_AT_SEND); + throw registerDispatcherDisconnect(DISCONNECT_REASON_PEER_DISCONNECT_AT_SEND); } } } private static class PongRequestProcessor implements IORequestProcessor { @Override - public boolean onRequest(int operation, PongConnectionContext context) { - switch (operation) { - case IOOperation.READ: - context.receivePing(); - break; - case IOOperation.WRITE: - context.sendPong(); - break; - case IOOperation.HEARTBEAT: - context.getDispatcher().registerChannel(context, IOOperation.HEARTBEAT); - return false; - default: - context.getDispatcher().disconnect(context, DISCONNECT_REASON_UNKNOWN_OPERATION); - break; + public boolean onRequest(int operation, PongConnectionContext context, IODispatcher dispatcher) { + try { + switch (operation) { + case IOOperation.READ: + context.receivePing(); + break; + case IOOperation.WRITE: + context.sendPong(); + break; + case IOOperation.HEARTBEAT: + dispatcher.registerChannel(context, IOOperation.HEARTBEAT); + return false; + default: + dispatcher.disconnect(context, DISCONNECT_REASON_UNKNOWN_OPERATION); + break; + } + } catch (PeerIsSlowToWriteException e) { + dispatcher.registerChannel(context, IOOperation.READ); + } catch (PeerIsSlowToReadException e) { + dispatcher.registerChannel(context, IOOperation.WRITE); + } catch (ServerDisconnectException e) { + dispatcher.disconnect(context, context.getDisconnectReason()); } return true; } diff --git a/core/src/main/java/io/questdb/cairo/TableNameRegistryStore.java b/core/src/main/java/io/questdb/cairo/TableNameRegistryStore.java index cb9f62a3e47b..866ea8292fe6 100644 --- a/core/src/main/java/io/questdb/cairo/TableNameRegistryStore.java +++ b/core/src/main/java/io/questdb/cairo/TableNameRegistryStore.java @@ -487,7 +487,7 @@ private void reloadFromTablesFile( tableNameToTableTokenMap.put(tableName, token); if (!Chars.startsWith(token.getDirName(), token.getTableName())) { // This table is renamed, log system to real table name mapping - LOG.info().$("table dir name does not match logical name [table=").utf8(tableName).$(", dirName=").utf8(dirName).I$(); + LOG.debug().$("table dir name does not match logical name [table=").utf8(tableName).$(", dirName=").utf8(dirName).I$(); } dirNameToTableTokenMap.put(token.getDirName(), ReverseTableMapItem.of(token)); } diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java b/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java index 2f05bd173817..f30a7520014f 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java @@ -65,6 +65,7 @@ public class HttpConnectionContext extends IOContext impl private final MultipartParserState multipartParserState = new MultipartParserState(); private final NetworkFacade nf; private final int recvBufferSize; + private final RejectProcessor rejectProcessor = new RejectProcessor(); private final HttpResponseSink responseSink; private final RetryAttemptAttributes retryAttemptAttributes = new RetryAttemptAttributes(); private final RescheduleContext retryRescheduleContext = retry -> { @@ -181,7 +182,7 @@ public void close() { } @Override - public void fail(HttpRequestProcessorSelector selector, HttpException e) { + public void fail(HttpRequestProcessorSelector selector, HttpException e) throws PeerIsSlowToReadException, ServerDisconnectException, PeerDisconnectedException { LOG.info().$("failed to retry query [fd=").$(getFd()).I$(); HttpRequestProcessor processor = getHttpRequestProcessor(selector); failProcessor(processor, e, DISCONNECT_REASON_RETRY_FAILED); @@ -246,7 +247,9 @@ public long getTotalBytesSent() { return totalBytesSent; } - public boolean handleClientOperation(int operation, HttpRequestProcessorSelector selector, RescheduleContext rescheduleContext) { + public boolean handleClientOperation(int operation, HttpRequestProcessorSelector selector, RescheduleContext rescheduleContext) + throws HeartBeatException, PeerIsSlowToReadException, ServerDisconnectException, PeerIsSlowToWriteException + { boolean keepGoing; switch (operation) { case IOOperation.READ: @@ -256,12 +259,9 @@ public boolean handleClientOperation(int operation, HttpRequestProcessorSelector keepGoing = handleClientSend(); break; case IOOperation.HEARTBEAT: - dispatcher.registerChannel(this, IOOperation.HEARTBEAT); - return false; + throw registerDispatcherHeartBeat(); default: - dispatcher.disconnect(this, DISCONNECT_REASON_UNKNOWN_OPERATION); - keepGoing = false; - break; + throw registerDispatcherDisconnect(DISCONNECT_REASON_UNKNOWN_OPERATION); } boolean useful = keepGoing; @@ -271,7 +271,7 @@ public boolean handleClientOperation(int operation, HttpRequestProcessorSelector keepGoing = handleClientRecv(selector, rescheduleContext); } while (keepGoing); } else { - dispatcher.disconnect(this, DISCONNECT_REASON_KEEPALIVE_OFF); + throw registerDispatcherDisconnect(DISCONNECT_REASON_KEEPALIVE_OFF); } } return useful; @@ -302,12 +302,17 @@ public HttpConnectionContext of(int fd, @NotNull IODispatcher -1 && multipartProcessor) { + } else if (contentLength > -1) { + if (!multipartProcessor) { + processor = rejectRequest(HTTP_NOT_FOUND, "method (POST) not supported"); + } busyRecv = consumeContent(contentLength, socket, processor, headerEnd, read, newRequest); - } else if (multipartProcessor) { - // bad request - regular request for processor that expects multipart - busyRecv = rejectRequest("Bad request. Multipart POST expected."); } else { + if (multipartProcessor) { + // bad request - regular request for processor that expects multipart + rejectRequest(HTTP_BAD_REQUEST, "Bad request. Multipart POST expected."); + processor = rejectProcessor; + } + // Do not expect any more bytes to be sent to us before // we respond back to client. We will disconnect the client when // they abuse protocol. In addition, we will not call processor @@ -861,8 +867,7 @@ private boolean handleClientRecv(HttpRequestProcessorSelector selector, Reschedu dumpBuffer(recvBuffer, read); LOG.info().$("disconnect after request [fd=").$(getFd()).$(", read=").$(read).I$(); int reason = read > 0 ? DISCONNECT_REASON_KICKED_OUT_AT_EXTRA_BYTES : DISCONNECT_REASON_PEER_DISCONNECT_AT_RECV; - dispatcher.disconnect(this, reason); - busyRecv = false; + throw registerDispatcherDisconnect(reason); } else { processor.onHeadersReady(this); LOG.debug().$("good [fd=").$(getFd()).I$(); @@ -876,22 +881,14 @@ private boolean handleClientRecv(HttpRequestProcessorSelector selector, Reschedu scheduleRetry(processor, rescheduleContext); busyRecv = false; } catch (PeerDisconnectedException e) { - dispatcher.disconnect(this, DISCONNECT_REASON_PEER_DISCONNECT_AT_RECV); - processor.onConnectionClosed(this); - busyRecv = false; - } catch (ServerDisconnectException e) { - LOG.info().$("kicked out [fd=").$(getFd()).I$(); - dispatcher.disconnect(this, DISCONNECT_REASON_KICKED_OUT_AT_RECV); - processor.onConnectionClosed(this); - busyRecv = false; + return disconnectHttp(processor, DISCONNECT_REASON_PEER_DISCONNECT_AT_RECV); } catch (PeerIsSlowToReadException e) { LOG.debug().$("peer is slow reader [two]").$(); // it is important to assign resume processor before we fire // event off to dispatcher processor.parkRequest(this, false); resumeProcessor = processor; - dispatcher.registerChannel(this, IOOperation.WRITE); - busyRecv = false; + throw registerDispatcherWrite(); } catch (QueryPausedException e) { LOG.debug().$("partition is in cold storage").$(); // it is important to assign resume processor before we fire @@ -899,22 +896,25 @@ private boolean handleClientRecv(HttpRequestProcessorSelector selector, Reschedu processor.parkRequest(this, true); resumeProcessor = processor; suspendEvent = e.getEvent(); - dispatcher.registerChannel(this, IOOperation.WRITE); - busyRecv = false; + throw registerDispatcherWrite(); } + } catch (ServerDisconnectException | PeerIsSlowToReadException | PeerIsSlowToWriteException e) { + throw e; } catch (HttpException e) { LOG.error().$("http error [fd=").$(getFd()).$(", e=`").$(e.getFlyweightMessage()).$("`]").$(); - dispatcher.disconnect(this, DISCONNECT_REASON_PROTOCOL_VIOLATION); - busyRecv = false; + throw registerDispatcherDisconnect(DISCONNECT_REASON_PROTOCOL_VIOLATION); } catch (Throwable e) { LOG.error().$("internal error [fd=").$(getFd()).$(", e=`").$(e).$("`]").$(); - dispatcher.disconnect(this, DISCONNECT_REASON_SERVER_ERROR); - busyRecv = false; + throw registerDispatcherDisconnect( DISCONNECT_REASON_SERVER_ERROR); } return busyRecv; } - private boolean handleClientSend() { + private boolean isRequestBeingRejected() { + return rejectProcessor.rejectCode != 0; + } + + private boolean handleClientSend() throws PeerIsSlowToReadException, ServerDisconnectException { if (resumeProcessor != null) { try { resumeProcessor.resumeSend(this); @@ -923,17 +923,17 @@ private boolean handleClientSend() { } catch (PeerIsSlowToReadException ignore) { resumeProcessor.parkRequest(this, false); LOG.debug().$("peer is slow reader").$(); - dispatcher.registerChannel(this, IOOperation.WRITE); + throw registerDispatcherWrite(); } catch (QueryPausedException e) { resumeProcessor.parkRequest(this, true); suspendEvent = e.getEvent(); LOG.debug().$("partition is in cold storage").$(); - dispatcher.registerChannel(this, IOOperation.WRITE); + throw registerDispatcherWrite(); } catch (PeerDisconnectedException ignore) { - dispatcher.disconnect(this, DISCONNECT_REASON_PEER_DISCONNECT_AT_SEND); + throw registerDispatcherDisconnect( DISCONNECT_REASON_PEER_DISCONNECT_AT_SEND); } catch (ServerDisconnectException ignore) { LOG.info().$("kicked out [fd=").$(getFd()).I$(); - dispatcher.disconnect(this, DISCONNECT_REASON_KICKED_OUT_AT_SEND); + throw registerDispatcherDisconnect( DISCONNECT_REASON_KICKED_OUT_AT_SEND); } } else { LOG.error().$("spurious write request [fd=").$(getFd()).I$(); @@ -968,26 +968,53 @@ private boolean parseMultipartResult( return false; } - private boolean rejectForbiddenRequest(CharSequence userMessage) throws PeerDisconnectedException, PeerIsSlowToReadException { + private HttpRequestProcessor rejectForbiddenRequest(CharSequence userMessage) throws PeerDisconnectedException, PeerIsSlowToReadException { return rejectRequest(HTTP_FORBIDDEN, userMessage, null, null); } - private boolean rejectRequest(CharSequence userMessage) throws PeerDisconnectedException, PeerIsSlowToReadException { - return rejectRequest(HTTP_NOT_FOUND, userMessage, null, null); - } - - private boolean rejectUnauthenticatedRequest() throws PeerDisconnectedException, PeerIsSlowToReadException { - reset(); - LOG.error().$("rejecting unauthenticated request [fd=").$(getFd()).I$(); - simpleResponse().sendStatusWithHeader(HTTP_UNAUTHORIZED, "WWW-Authenticate: Basic realm=\"questdb\", charset=\"UTF-8\""); - dispatcher.registerChannel(this, IOOperation.READ); - return false; - } - private void shiftReceiveBufferUnprocessedBytes(long start, int receivedBytes) { // Shift to start this.receivedBytes = receivedBytes; Vect.memcpy(recvBuffer, start, receivedBytes); LOG.debug().$("peer is slow, waiting for bigger part to parse [multipart]").$(); } + + private class RejectProcessor implements HttpRequestProcessor, HttpMultipartContentListener { + private int rejectCode; + private CharSequence rejectCookieName = null; + private CharSequence rejectCookieValue = null; + private CharSequence rejectMessage = null; + + public void clear() { + rejectCode = 0; + rejectCookieName = null; + rejectCookieValue = null; + rejectMessage = null; + } + + @Override + public void onChunk(long lo, long hi) { + } + + @Override + public void onPartBegin(HttpRequestHeader partHeader) { + } + + @Override + public void onPartEnd() { + } + + @Override + public void onRequestComplete( + HttpConnectionContext context + ) throws PeerDisconnectedException, PeerIsSlowToReadException { + if (rejectCode == HTTP_UNAUTHORIZED) { + // Special case, include WWW-Authenticate header + context.simpleResponse().sendStatusTextContent(HTTP_UNAUTHORIZED, "WWW-Authenticate: Basic realm=\"questdb\", charset=\"UTF-8\""); + } else { + simpleResponse().sendStatusWithCookie(rejectCode, rejectMessage, rejectCookieName, rejectCookieValue); + } + reset(); + } + } } diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpHeaderParser.java b/core/src/main/java/io/questdb/cutlass/http/HttpHeaderParser.java index 1cb0fd0208c8..ca4ad0068a94 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpHeaderParser.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpHeaderParser.java @@ -104,6 +104,7 @@ public void clear() { this.isStatusCode = true; this.isStatusText = true; this.needProtocol = true; + this.contentLength = -1; // do not clear the pool // this.pool.clear(); } diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpResponseSink.java b/core/src/main/java/io/questdb/cutlass/http/HttpResponseSink.java index 087e77f21ef9..2931c7be7c6e 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpResponseSink.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpResponseSink.java @@ -595,8 +595,6 @@ public String status(CharSequence httpProtocolVersion, int code, CharSequence co } else { putAscii("Content-Length: ").put(contentLength).putEOL(); } - } - if (contentType != null) { putAscii("Content-Type: ").put(contentType).putEOL(); } @@ -747,15 +745,39 @@ private void putAsciiInternal(@Nullable CharSequence cs) { } public class SimpleResponseImpl { - public void sendStatus(int code, CharSequence message) throws PeerDisconnectedException, PeerIsSlowToReadException { - sendStatus(code, message, null); + public void sendStatusNoContent(int code) throws PeerDisconnectedException, PeerIsSlowToReadException { + sendStatusNoContent(code, null); } - public void sendStatus(int code, CharSequence message, CharSequence header) throws PeerDisconnectedException, PeerIsSlowToReadException { - sendStatus(code, message, header, null, null); + public void sendStatusNoContent(int code, @Nullable CharSequence header) throws PeerDisconnectedException, PeerIsSlowToReadException { + buffer.clearAndPrepareToWriteToBuffer(); + headerImpl.status(httpVersion, code, null, -2L); + if (header != null) { + headerImpl.put(header).put(Misc.EOL); + } + prepareHeaderSink(); + flushSingle(); } - public void sendStatus(int code, CharSequence message, CharSequence header, CharSequence cookieName, CharSequence cookieValue) throws PeerDisconnectedException, PeerIsSlowToReadException { + /** + * Sends "text/plain" content type response with customised message and + * optional additional header and cookie. + * + * @param code response code, has to be compatible with "text" response type + * @param message optional message, if not provided, a standard message for the response code will be used + * @param header optional header + * @param cookieName optional cookie name, when name is not null the value must be not-null too + * @param cookieValue optional cookie value + * @throws PeerDisconnectedException exception if HTTP client disconnects during us sending + * @throws PeerIsSlowToReadException exception if HTTP client does not keep up with us sending + */ + public void sendStatusTextContent( + int code, + @Nullable CharSequence message, + @Nullable CharSequence header, + @Nullable CharSequence cookieName, + @Nullable CharSequence cookieValue + ) throws PeerDisconnectedException, PeerIsSlowToReadException { buffer.clearAndPrepareToWriteToBuffer(); final String std = headerImpl.status(httpVersion, code, CONTENT_TYPE_TEXT, -1L); if (header != null) { @@ -772,23 +794,24 @@ public void sendStatus(int code, CharSequence message, CharSequence header, Char resumeSend(); } - public void sendStatus(int code) throws PeerDisconnectedException, PeerIsSlowToReadException { - buffer.clearAndPrepareToWriteToBuffer(); - headerImpl.status(httpVersion, code, CONTENT_TYPE_HTML, -2L); - prepareHeaderSink(); - flushSingle(); + public void sendStatusTextContent( + int code, + CharSequence message, + CharSequence header + ) throws PeerDisconnectedException, PeerIsSlowToReadException { + sendStatusTextContent(code, message, header, null, null); } public void sendStatusWithCookie(int code, CharSequence message, CharSequence cookieName, CharSequence cookieValue) throws PeerDisconnectedException, PeerIsSlowToReadException { - sendStatus(code, message, null, cookieName, cookieValue); + sendStatusTextContent(code, message, null, cookieName, cookieValue); } - public void sendStatusWithDefaultMessage(int code) throws PeerDisconnectedException, PeerIsSlowToReadException { - sendStatus(code, null); + public void sendStatusTextContent(int code) throws PeerDisconnectedException, PeerIsSlowToReadException { + sendStatusTextContent(code, null, null); } - public void sendStatusWithHeader(int code, CharSequence header) throws PeerDisconnectedException, PeerIsSlowToReadException { - sendStatus(code, null, header); + public void sendStatusTextContent(int code, CharSequence header) throws PeerDisconnectedException, PeerIsSlowToReadException { + sendStatusTextContent(code, null, header); } private void setCookie(CharSequence name, CharSequence value) { diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpServer.java b/core/src/main/java/io/questdb/cutlass/http/HttpServer.java index 0d82ff3eb68b..b941f8e1e9c3 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpServer.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpServer.java @@ -69,7 +69,7 @@ public HttpServer( this.httpContextFactory = new HttpContextFactory(configuration, metrics, socketFactory, cookieHandler, headerParserFactory); this.dispatcher = IODispatchers.create(configuration.getDispatcherConfiguration(), httpContextFactory); pool.assign(dispatcher); - this.rescheduleContext = new WaitProcessor(configuration.getWaitProcessorConfiguration()); + this.rescheduleContext = new WaitProcessor(configuration.getWaitProcessorConfiguration(), dispatcher); pool.assign(this.rescheduleContext); for (int i = 0; i < workerCount; i++) { @@ -78,7 +78,7 @@ public HttpServer( pool.assign(i, new Job() { private final HttpRequestProcessorSelector selector = selectors.getQuick(index); private final IORequestProcessor processor = - (operation, context) -> context.handleClientOperation(operation, selector, rescheduleContext); + (operation, context, dispatcher) -> handleClientOperation(context, operation, selector, rescheduleContext, dispatcher); @Override public boolean run(int workerId, @NotNull RunStatus runStatus) { @@ -94,6 +94,21 @@ public boolean run(int workerId, @NotNull RunStatus runStatus) { } } + private boolean handleClientOperation(HttpConnectionContext context, int operation, HttpRequestProcessorSelector selector, WaitProcessor rescheduleContext, IODispatcher dispatcher) { + try { + return context.handleClientOperation(operation, selector, rescheduleContext); + } catch (HeartBeatException e) { + dispatcher.registerChannel(context, IOOperation.HEARTBEAT); + } catch (PeerIsSlowToReadException e) { + dispatcher.registerChannel(context, IOOperation.WRITE); + } catch (ServerDisconnectException e) { + dispatcher.disconnect(context, context.getDisconnectReason()); + } catch (PeerIsSlowToWriteException e) { + dispatcher.registerChannel(context, IOOperation.READ); + } + return false; + } + public static void addDefaultEndpoints( HttpServer server, HttpServerConfiguration configuration, diff --git a/core/src/main/java/io/questdb/cutlass/http/Retry.java b/core/src/main/java/io/questdb/cutlass/http/Retry.java index 9307bb937ad0..55808701c252 100644 --- a/core/src/main/java/io/questdb/cutlass/http/Retry.java +++ b/core/src/main/java/io/questdb/cutlass/http/Retry.java @@ -24,6 +24,11 @@ package io.questdb.cutlass.http; +import io.questdb.network.PeerDisconnectedException; +import io.questdb.network.PeerIsSlowToReadException; +import io.questdb.network.PeerIsSlowToWriteException; +import io.questdb.network.ServerDisconnectException; + import java.io.Closeable; public interface Retry extends Closeable { @@ -33,7 +38,7 @@ public interface Retry extends Closeable { * @param selector processor selector * @param e exception information */ - void fail(HttpRequestProcessorSelector selector, HttpException e); + void fail(HttpRequestProcessorSelector selector, HttpException e) throws PeerIsSlowToReadException, ServerDisconnectException, PeerDisconnectedException; /** * Provides retry information @@ -49,5 +54,5 @@ public interface Retry extends Closeable { * @param rescheduleContext context to be retried * @return success indicator */ - boolean tryRerun(HttpRequestProcessorSelector selector, RescheduleContext rescheduleContext); + boolean tryRerun(HttpRequestProcessorSelector selector, RescheduleContext rescheduleContext) throws PeerIsSlowToReadException, PeerIsSlowToWriteException, ServerDisconnectException; } diff --git a/core/src/main/java/io/questdb/cutlass/http/WaitProcessor.java b/core/src/main/java/io/questdb/cutlass/http/WaitProcessor.java index 4677bb49241e..9bd31e9b9a66 100644 --- a/core/src/main/java/io/questdb/cutlass/http/WaitProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/WaitProcessor.java @@ -26,6 +26,7 @@ import io.questdb.cutlass.http.ex.RetryFailedOperationException; import io.questdb.mp.*; +import io.questdb.network.*; import io.questdb.std.Misc; import io.questdb.std.Os; import io.questdb.std.datetime.millitime.MillisecondClock; @@ -43,15 +44,17 @@ public class WaitProcessor extends SynchronizedJob implements RescheduleContext, private final Sequence inSubSequence; private final long maxWaitCapMs; private final PriorityQueue nextRerun; + private final IODispatcher dispatcher; private final Sequence outPubSequence; private final RingQueue outQueue; private final Sequence outSubSequence; - public WaitProcessor(WaitProcessorConfiguration configuration) { + public WaitProcessor(WaitProcessorConfiguration configuration, IODispatcher dispatcher) { clock = configuration.getClock(); maxWaitCapMs = configuration.getMaxWaitCapMs(); exponentialWaitMultiplier = configuration.getExponentialWaitMultiplier(); nextRerun = new PriorityQueue<>(configuration.getInitialWaitQueueSize(), WaitProcessor::compareRetriesInQueue); + this.dispatcher = dispatcher; int retryQueueLength = configuration.getMaxProcessingQueueSize(); inQueue = new RingQueue<>(RetryHolder::new, retryQueueLength); @@ -87,19 +90,37 @@ public boolean runReruns(HttpRequestProcessorSelector selector) { Retry retry = getNextRerun(); if (retry != null) { useful = true; - if (!retry.tryRerun(selector, this)) { - try { - reschedule(retry, retry.getAttemptDetails().attempt + 1, retry.getAttemptDetails().waitStartTimestamp); - } catch (RetryFailedOperationException e) { - retry.fail(selector, e); - } - } + run(selector, retry); } else { return useful; } } } + private void run(HttpRequestProcessorSelector selector, Retry retry) { + try { + if (!retry.tryRerun(selector, this)) { + try { + reschedule(retry, retry.getAttemptDetails().attempt + 1, retry.getAttemptDetails().waitStartTimestamp); + } catch (RetryFailedOperationException e) { + retry.fail(selector, e); + } + } + } catch (PeerDisconnectedException e) { + HttpConnectionContext context = (HttpConnectionContext) retry; + dispatcher.disconnect((HttpConnectionContext) retry, IODispatcher.DISCONNECT_REASON_KICKED_OUT_AT_RECV); + } catch (PeerIsSlowToReadException e) { + HttpConnectionContext context = (HttpConnectionContext) retry; + dispatcher.registerChannel(context, IOOperation.WRITE); + } catch (PeerIsSlowToWriteException e) { + HttpConnectionContext context = (HttpConnectionContext) retry; + dispatcher.registerChannel(context, IOOperation.READ); + } catch (ServerDisconnectException e) { + HttpConnectionContext context = (HttpConnectionContext) retry; + dispatcher.disconnect((HttpConnectionContext) retry, context.getDisconnectReason()); + } + } + @Override public boolean runSerially() { return processInQueue() || sendToOutQueue(); diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpPingProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpPingProcessor.java index 6b460f2980aa..54a16cfb00ae 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpPingProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpPingProcessor.java @@ -18,7 +18,7 @@ public LineHttpPingProcessor(CharSequence version) { public void onRequestComplete( HttpConnectionContext context ) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException, QueryPausedException { - context.simpleResponse().sendStatusWithHeader(204, header); + context.simpleResponse().sendStatusNoContent(204, header); } @Override diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessor.java index 897b101184d1..7c5fc8b1468a 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessor.java @@ -145,7 +145,7 @@ public void onRequestComplete(HttpConnectionContext context) throws PeerDisconne // Check state again, commit may have failed if (state.isOk()) { state.setSendStatus(SendStatus.HEADER); - context.simpleResponse().sendStatus(204); + context.simpleResponse().sendStatusNoContent(204); } else { state.setSendStatus(SendStatus.HEADER); sendErrorHeader(context); diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/StaticContentProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/StaticContentProcessor.java index 72d26cc3732c..7817727882a3 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/StaticContentProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/StaticContentProcessor.java @@ -78,7 +78,7 @@ public void onRequestComplete(HttpConnectionContext context) throws PeerDisconne logInfoWithFd(context).$("incoming [url=").$(url).$(']').$(); if (Utf8s.containsAscii(url, "..")) { logInfoWithFd(context).$("URL abuse: ").$(url).$(); - sendStatusWithDefaultMessage(context, 404); + sendStatusTextContent(context, 404); } else { PrefixedPath path = prefixedPath.rewind(); @@ -94,7 +94,7 @@ public void onRequestComplete(HttpConnectionContext context) throws PeerDisconne send(context, path, headers.getUrlParam(URL_PARAM_ATTACHMENT) != null); } else { logInfoWithFd(context).$("not found [path=").$(path).$(']').$(); - sendStatusWithDefaultMessage(context, 404); + sendStatusTextContent(context, 404); } } } @@ -130,15 +130,15 @@ public void resumeSend(HttpConnectionContext context) throws PeerDisconnectedExc } } - private static void sendStatusWithDefaultMessage(HttpConnectionContext context, int code) throws PeerDisconnectedException, PeerIsSlowToReadException { - context.simpleResponse().sendStatusWithDefaultMessage(code); + private static void sendStatusTextContent(HttpConnectionContext context, int code) throws PeerDisconnectedException, PeerIsSlowToReadException { + context.simpleResponse().sendStatusTextContent(code); } private void send(HttpConnectionContext context, LPSZ path, boolean asAttachment) throws PeerDisconnectedException, PeerIsSlowToReadException { int n = Utf8s.lastIndexOfAscii(path, '.'); if (n == -1) { logInfoWithFd(context).$("missing extension [file=").$(path).$(']').$(); - sendStatusWithDefaultMessage(context, 404); + sendStatusTextContent(context, 404); return; } @@ -162,12 +162,12 @@ private void send(HttpConnectionContext context, LPSZ path, boolean asAttachment try { long that = Numbers.parseLong(val, 1, l - 1); if (that == ff.getLastModified(path)) { - context.simpleResponse().sendStatus(304); + context.simpleResponse().sendStatusNoContent(304); return; } } catch (NumericException e) { LOG.info().$("bad 'If-None-Match' [value=").$(val).$(']').$(); - sendStatusWithDefaultMessage(context, 400); + sendStatusTextContent(context, 400); return; } } @@ -191,7 +191,7 @@ private void sendRange( state.fd = ff.openRO(path); if (state.fd == -1) { LOG.info().$("Cannot open file: ").$(path).$(); - sendStatusWithDefaultMessage(context, 404); + sendStatusTextContent(context, 404); return; } @@ -201,7 +201,7 @@ private void sendRange( final long lo = rangeParser.getLo(); final long hi = rangeParser.getHi(); if (lo > length || (hi != Long.MAX_VALUE && hi > length) || lo > hi) { - sendStatusWithDefaultMessage(context, 416); + sendStatusTextContent(context, 416); } else { state.bytesSent = lo; state.sendMax = hi == Long.MAX_VALUE ? length : hi; @@ -221,7 +221,7 @@ private void sendRange( resumeSend(context); } } else { - sendStatusWithDefaultMessage(context, 416); + sendStatusTextContent(context, 416); } } @@ -234,7 +234,7 @@ private void sendVanilla( int fd = ff.openRO(path); if (fd == -1) { LOG.info().$("Cannot open file: ").$(path).$('(').$(ff.errno()).$(')').$(); - sendStatusWithDefaultMessage(context, 404); + sendStatusTextContent(context, 404); } else { StaticContentProcessorState h = LV.get(context); if (h == null) { diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/TableStatusCheckProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/TableStatusCheckProcessor.java index 4f46aa025e42..778d384cbc62 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/TableStatusCheckProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/TableStatusCheckProcessor.java @@ -64,7 +64,7 @@ public void close() { public void onRequestComplete(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException { DirectUtf8Sequence tableName = context.getRequestHeader().getUrlParam(URL_PARAM_STATUS_TABLE_NAME); if (tableName == null) { - context.simpleResponse().sendStatus(200, "table name missing"); + context.simpleResponse().sendStatusTextContent(200, "table name missing", null); } else { int check = TableUtils.TABLE_DOES_NOT_EXIST; utf16Sink.clear(); @@ -81,7 +81,7 @@ public void onRequestComplete(HttpConnectionContext context) throws PeerDisconne response.put('{').putQuoted("status").put(':').putQuoted(toResponse(check)).put('}'); response.sendChunk(true); } else { - context.simpleResponse().sendStatus(200, toResponse(check)); + context.simpleResponse().sendStatusTextContent(200, toResponse(check), null); } } } diff --git a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpNetworkIOJob.java b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpNetworkIOJob.java index 1f60e74bc83c..5d284d23900d 100644 --- a/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpNetworkIOJob.java +++ b/core/src/main/java/io/questdb/cutlass/line/tcp/LineTcpNetworkIOJob.java @@ -26,9 +26,7 @@ import io.questdb.log.Log; import io.questdb.log.LogFactory; -import io.questdb.network.IODispatcher; -import io.questdb.network.IOOperation; -import io.questdb.network.IORequestProcessor; +import io.questdb.network.*; import io.questdb.std.*; import io.questdb.std.datetime.millitime.MillisecondClock; import io.questdb.std.str.DirectUtf8Sequence; @@ -76,7 +74,7 @@ public void addTableUpdateDetails(Utf8String tableNameUtf8, TableUpdateDetails t @Override public void close() { if (busyContext != null) { - busyContext.getDispatcher().disconnect(busyContext, DISCONNECT_REASON_RETRY_FAILED); + dispatcher.disconnect(busyContext, DISCONNECT_REASON_RETRY_FAILED); busyContext = null; } Misc.free(unusedSymbolCaches); @@ -107,7 +105,7 @@ public boolean run(int workerId, @NotNull RunStatus runStatus) { assert this.workerId == workerId; boolean busy = false; if (busyContext != null) { - if (handleIO(busyContext)) { + if (handleIO(busyContext, dispatcher)) { // queue is still full return true; } @@ -131,32 +129,32 @@ public boolean run(int workerId, @NotNull RunStatus runStatus) { return busy; } - private boolean handleIO(LineTcpConnectionContext context) { + private boolean handleIO(LineTcpConnectionContext context, IODispatcher dispatcher) { if (!context.invalid()) { switch (context.handleIO(this)) { case NEEDS_READ: - context.getDispatcher().registerChannel(context, IOOperation.READ); + dispatcher.registerChannel(context, IOOperation.READ); return false; case NEEDS_WRITE: - context.getDispatcher().registerChannel(context, IOOperation.WRITE); + dispatcher.registerChannel(context, IOOperation.WRITE); return false; case QUEUE_FULL: return true; case NEEDS_DISCONNECT: - context.getDispatcher().disconnect(context, DISCONNECT_REASON_UNKNOWN_OPERATION); + dispatcher.disconnect(context, DISCONNECT_REASON_UNKNOWN_OPERATION); return false; } } return false; } - private boolean onRequest(int operation, LineTcpConnectionContext context) { + private boolean onRequest(int operation, LineTcpConnectionContext context, IODispatcher dispatcher) { if (operation == IOOperation.HEARTBEAT) { context.doMaintenance(millisecondClock.getTicks()); - context.getDispatcher().registerChannel(context, IOOperation.HEARTBEAT); + dispatcher.registerChannel(context, IOOperation.HEARTBEAT); return false; } - if (handleIO(context)) { + if (handleIO(context, dispatcher)) { busyContext = context; LOG.debug().$("context is waiting on a full queue [fd=").$(context.getFd()).$(']').$(); return false; diff --git a/core/src/main/java/io/questdb/cutlass/pgwire/PGWireServer.java b/core/src/main/java/io/questdb/cutlass/pgwire/PGWireServer.java index a4c28cca9871..5d1000483d55 100644 --- a/core/src/main/java/io/questdb/cutlass/pgwire/PGWireServer.java +++ b/core/src/main/java/io/questdb/cutlass/pgwire/PGWireServer.java @@ -70,36 +70,36 @@ public PGWireServer( for (int i = 0, n = workerPool.getWorkerCount(); i < n; i++) { workerPool.assign(i, new Job() { - private final IORequestProcessor processor = (operation, context) -> { + private final IORequestProcessor processor = (operation, context, dispatcher) -> { try { if (operation == IOOperation.HEARTBEAT) { - context.getDispatcher().registerChannel(context, IOOperation.HEARTBEAT); + dispatcher.registerChannel(context, IOOperation.HEARTBEAT); return false; } context.handleClientOperation(operation); - context.getDispatcher().registerChannel(context, IOOperation.READ); + dispatcher.registerChannel(context, IOOperation.READ); return true; } catch (PeerIsSlowToWriteException e) { - context.getDispatcher().registerChannel(context, IOOperation.READ); + dispatcher.registerChannel(context, IOOperation.READ); } catch (PeerIsSlowToReadException e) { - context.getDispatcher().registerChannel(context, IOOperation.WRITE); + dispatcher.registerChannel(context, IOOperation.WRITE); } catch (QueryPausedException e) { context.setSuspendEvent(e.getEvent()); - context.getDispatcher().registerChannel(context, IOOperation.WRITE); + dispatcher.registerChannel(context, IOOperation.WRITE); } catch (PeerDisconnectedException e) { - context.getDispatcher().disconnect( + dispatcher.disconnect( context, operation == IOOperation.READ ? DISCONNECT_REASON_PEER_DISCONNECT_AT_RECV : DISCONNECT_REASON_PEER_DISCONNECT_AT_SEND ); } catch (BadProtocolException e) { - context.getDispatcher().disconnect(context, DISCONNECT_REASON_PROTOCOL_VIOLATION); + dispatcher.disconnect(context, DISCONNECT_REASON_PROTOCOL_VIOLATION); } catch (Throwable e) { // must remain last in catch list! LOG.critical().$("internal error [ex=").$(e).$(']').$(); // This is a critical error, so we treat it as an unhandled one. metrics.health().incrementUnhandledErrors(); - context.getDispatcher().disconnect(context, DISCONNECT_REASON_SERVER_ERROR); + dispatcher.disconnect(context, DISCONNECT_REASON_SERVER_ERROR); } return false; }; diff --git a/core/src/main/java/io/questdb/network/AbstractIODispatcher.java b/core/src/main/java/io/questdb/network/AbstractIODispatcher.java index c214310cf944..96c191ae6d57 100644 --- a/core/src/main/java/io/questdb/network/AbstractIODispatcher.java +++ b/core/src/main/java/io/questdb/network/AbstractIODispatcher.java @@ -196,7 +196,7 @@ public boolean processIOQueue(IORequestProcessor processor) { C connectionContext = event.context; final int operation = event.operation; ioEventSubSeq.done(cursor); - useful = processor.onRequest(operation, connectionContext); + useful = processor.onRequest(operation, connectionContext, this); } return useful; diff --git a/core/src/main/java/io/questdb/network/HeartBeatException.java b/core/src/main/java/io/questdb/network/HeartBeatException.java new file mode 100644 index 000000000000..cba5b6a76df5 --- /dev/null +++ b/core/src/main/java/io/questdb/network/HeartBeatException.java @@ -0,0 +1,32 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2023 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.network; + +public class HeartBeatException extends Exception { + private HeartBeatException() { + } + + public static final HeartBeatException INSTANCE = new HeartBeatException(); +} diff --git a/core/src/main/java/io/questdb/network/IOContext.java b/core/src/main/java/io/questdb/network/IOContext.java index 71747e4a001b..feebf24b7d18 100644 --- a/core/src/main/java/io/questdb/network/IOContext.java +++ b/core/src/main/java/io/questdb/network/IOContext.java @@ -32,9 +32,11 @@ public abstract class IOContext> implements Mutable, QuietCloseable { protected final Socket socket; - protected IODispatcher dispatcher; + private final LongGauge connectionCountGauge; protected long heartbeatId = -1; - private LongGauge connectionCountGauge; + private int disconnectReason; + // keep dispatcher private to avoid context scheduling itself multiple times + private IODispatcher dispatcher; protected IOContext(SocketFactory socketFactory, NetworkFacade nf, Log log, LongGauge connectionCountGauge) { this.socket = socketFactory.newInstance(nf, log); @@ -55,14 +57,32 @@ public void close() { _clear(); } + public int getDisconnectReason() { + return disconnectReason; + } + + + public PeerIsSlowToReadException registerDispatcherWrite() { + return PeerIsSlowToReadException.INSTANCE; + } + + public HeartBeatException registerDispatcherHeartBeat() { + return HeartBeatException.INSTANCE; + } + + public ServerDisconnectException registerDispatcherDisconnect(int reason) { + disconnectReason = reason; + return ServerDisconnectException.INSTANCE; + } + public long getAndResetHeartbeatId() { long id = heartbeatId; heartbeatId = -1; return id; } - public IODispatcher getDispatcher() { - return dispatcher; + public PeerIsSlowToWriteException registerDispatcherRead() { + return PeerIsSlowToWriteException.INSTANCE; } public int getFd() { @@ -109,6 +129,7 @@ private void _clear() { heartbeatId = -1; socket.close(); dispatcher = null; + disconnectReason = -1; clearSuspendEvent(); } } diff --git a/core/src/main/java/io/questdb/network/IORequestProcessor.java b/core/src/main/java/io/questdb/network/IORequestProcessor.java index 999f375e3c19..6c2638d8ada7 100644 --- a/core/src/main/java/io/questdb/network/IORequestProcessor.java +++ b/core/src/main/java/io/questdb/network/IORequestProcessor.java @@ -25,6 +25,6 @@ package io.questdb.network; @FunctionalInterface -public interface IORequestProcessor> { - boolean onRequest(int operation, C context); +public interface IORequestProcessor> { + boolean onRequest(int operation, C context, IODispatcher dispatcher); } diff --git a/core/src/main/java/io/questdb/network/PeerDisconnectedException.java b/core/src/main/java/io/questdb/network/PeerDisconnectedException.java index 6c7efa8cc8f8..ae35606b30ef 100644 --- a/core/src/main/java/io/questdb/network/PeerDisconnectedException.java +++ b/core/src/main/java/io/questdb/network/PeerDisconnectedException.java @@ -26,4 +26,7 @@ public class PeerDisconnectedException extends Exception { public static final PeerDisconnectedException INSTANCE = new PeerDisconnectedException(); + + private PeerDisconnectedException() { + } } diff --git a/core/src/main/java/io/questdb/network/PeerIsSlowToReadException.java b/core/src/main/java/io/questdb/network/PeerIsSlowToReadException.java index 03be4d283523..47a2ab585ac3 100644 --- a/core/src/main/java/io/questdb/network/PeerIsSlowToReadException.java +++ b/core/src/main/java/io/questdb/network/PeerIsSlowToReadException.java @@ -26,4 +26,7 @@ public class PeerIsSlowToReadException extends Exception { public static final PeerIsSlowToReadException INSTANCE = new PeerIsSlowToReadException(); + + private PeerIsSlowToReadException() { + } } diff --git a/core/src/main/java/io/questdb/network/PeerIsSlowToWriteException.java b/core/src/main/java/io/questdb/network/PeerIsSlowToWriteException.java index c71f0f9a819e..be030f39bafe 100644 --- a/core/src/main/java/io/questdb/network/PeerIsSlowToWriteException.java +++ b/core/src/main/java/io/questdb/network/PeerIsSlowToWriteException.java @@ -26,4 +26,7 @@ public class PeerIsSlowToWriteException extends Exception { public static final PeerIsSlowToWriteException INSTANCE = new PeerIsSlowToWriteException(); + + private PeerIsSlowToWriteException() { + } } diff --git a/core/src/main/java/io/questdb/network/ServerDisconnectException.java b/core/src/main/java/io/questdb/network/ServerDisconnectException.java index 1ae27dd86e32..b046e1a13b6f 100644 --- a/core/src/main/java/io/questdb/network/ServerDisconnectException.java +++ b/core/src/main/java/io/questdb/network/ServerDisconnectException.java @@ -26,4 +26,7 @@ public class ServerDisconnectException extends Exception { public static final ServerDisconnectException INSTANCE = new ServerDisconnectException(); + + private ServerDisconnectException() { + } } diff --git a/core/src/test/java/io/questdb/test/cutlass/IODispatcherHeartbeatTest.java b/core/src/test/java/io/questdb/test/cutlass/IODispatcherHeartbeatTest.java index 5f1189dec8cc..8efffc9f2d2e 100644 --- a/core/src/test/java/io/questdb/test/cutlass/IODispatcherHeartbeatTest.java +++ b/core/src/test/java/io/questdb/test/cutlass/IODispatcherHeartbeatTest.java @@ -438,7 +438,7 @@ public SuspendingTestProcessor(TestClock clock, SuspendEvent suspendEvent) { } @Override - public boolean onRequest(int operation, TestContext context) { + public boolean onRequest(int operation, TestContext context, IODispatcher dispatcher) { context.checkInvariant(operation, clock.getTicks()); if (operation != IOOperation.HEARTBEAT && !alreadySuspended) { context.suspendEvent = suspendEvent; @@ -512,7 +512,6 @@ public void close() { super.close(); } - @Override public IODispatcher getDispatcher() { return dispatcher; } @@ -536,7 +535,7 @@ public TestProcessor(TestClock clock) { } @Override - public boolean onRequest(int operation, TestContext context) { + public boolean onRequest(int operation, TestContext context, IODispatcher dispatcher) { context.checkInvariant(operation, clock.getTicks()); context.getDispatcher().registerChannel(context, operation); return true; diff --git a/core/src/test/java/io/questdb/test/cutlass/http/IODispatcherTest.java b/core/src/test/java/io/questdb/test/cutlass/http/IODispatcherTest.java index 3b9b1e0e686f..f8b3f6b38c3b 100644 --- a/core/src/test/java/io/questdb/test/cutlass/http/IODispatcherTest.java +++ b/core/src/test/java/io/questdb/test/cutlass/http/IODispatcherTest.java @@ -191,10 +191,10 @@ public int getInitialBias() { while (serverRunning.get()) { dispatcher.run(0); dispatcher.processIOQueue( - (operation, context) -> { + (operation, context, dispatcher1) -> { if (operation == IOOperation.WRITE) { Assert.assertEquals(1024, Net.send(context.getFd(), context.buffer, 1024)); - context.getDispatcher().disconnect(context, IODispatcher.DISCONNECT_REASON_TEST); + dispatcher1.disconnect(context, IODispatcher.DISCONNECT_REASON_TEST); } return true; } @@ -411,7 +411,7 @@ public HttpRequestProcessor select(Utf8Sequence url) { while (serverRunning.get()) { dispatcher.run(0); dispatcher.processIOQueue( - (operation, context) -> context.handleClientOperation(operation, selector, EmptyRescheduleContext) + (operation, context, dispatcher1) -> handleClientOperation(context, operation, selector, EmptyRescheduleContext, dispatcher1) ); } } finally { @@ -1452,7 +1452,7 @@ public void testImportBadJson() throws Exception { @Test public void testImportBadRequestGet() throws Exception { testImport( - "HTTP/1.1 404 Not Found\r\n" + + "HTTP/1.1 400 Bad request\r\n" + "Server: questDB/1.0\r\n" + "Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" + "Transfer-Encoding: chunked\r\n" + @@ -1484,26 +1484,16 @@ public void testImportBadRequestGet() throws Exception { } @Test - public void testImportBadRequestNoBoundary() throws Exception { + public void testImportBadRequestNoBoundaryDisconnects() throws Exception { testImport( - "HTTP/1.1 404 Not Found\r\n" + - "Server: questDB/1.0\r\n" + - "Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" + - "Transfer-Encoding: chunked\r\n" + - "Content-Type: text/plain; charset=utf-8\r\n" + - "\r\n" + - "34\r\n" + - "Bad request. Form data in multipart POST expected.\r\n" + - "\r\n" + - "00\r\n" + - "\r\n", + "", "POST /upload?overwrite=true HTTP/1.1\r\n" + "Host: localhost:9000\r\n" + "Accept: */*\r\n" + "content-type: multipart/form-data\r\n" + "\r\n", NetworkFacadeImpl.INSTANCE, - false, + true, 1 ); } @@ -5481,7 +5471,7 @@ public HttpRequestProcessor select(Utf8Sequence url) { do { dispatcher.run(0); dispatcher.processIOQueue( - (operation, context) -> context.handleClientOperation(operation, selector, EmptyRescheduleContext) + (operation, context, dispatcher1) -> handleClientOperation(context, operation, selector, EmptyRescheduleContext, dispatcher1) ); } while (serverRunning.get()); } finally { @@ -5676,8 +5666,8 @@ public void testPostRequestToGetProcessor() throws Exception { "Transfer-Encoding: chunked\r\n" + "Content-Type: text/plain; charset=utf-8\r\n" + "\r\n" + - "2a\r\n" + - "Bad request. Non-multipart GET expected.\r\n" + + "27\r\n" + + "method (multipart POST) not supported\r\n" + "\r\n" + "00\r\n" + "\r\n", @@ -5891,8 +5881,6 @@ public HttpRequestProcessor newInstance() { try (Path path = new Path().of(baseDir).concat("questdb-temp.txt").$()) { try { Rnd rnd = new Rnd(); - final int diskBufferLen = 1024 * 1024; - writeRandomFile(path, rnd, 122222212222L); long sockAddr = Net.sockaddr("127.0.0.1", 9001); @@ -5922,14 +5910,7 @@ public HttpRequestProcessor newInstance() { "\r\n"; for (int j = 0; j < 10; j++) { - int fd = Net.socketTcp(true); - TestUtils.assertConnect(fd, sockAddr); - try { - sendRequest(request, fd, buffer); - assertDownloadResponse(fd, rnd, buffer, netBufferLen, diskBufferLen, expectedResponseHeader, 20971670); - } finally { - Net.close(fd); - } + sendAndReceive(request, expectedResponseHeader); } // send few requests to receive 304 @@ -5948,30 +5929,15 @@ public HttpRequestProcessor newInstance() { String expectedResponseHeader2 = "HTTP/1.1 304 Not Modified\r\n" + "Server: questDB/1.0\r\n" + "Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" + - "Content-Type: text/html; charset=utf-8\r\n" + "\r\n"; for (int i = 0; i < 3; i++) { - int fd = Net.socketTcp(true); - TestUtils.assertConnect(fd, sockAddr); - try { - sendRequest(request2, fd, buffer); - assertDownloadResponse(fd, rnd, buffer, netBufferLen, 0, expectedResponseHeader2, 126); - } finally { - Net.close(fd); - } + sendAndReceive(request2, expectedResponseHeader2); } // couple more full downloads after 304 for (int j = 0; j < 2; j++) { - int fd = Net.socketTcp(true); - TestUtils.assertConnect(fd, sockAddr); - try { - sendRequest(request, fd, buffer); - assertDownloadResponse(fd, rnd, buffer, netBufferLen, diskBufferLen, expectedResponseHeader, 20971670); - } finally { - Net.close(fd); - } + sendAndReceive(request, expectedResponseHeader); } // get a 404 now @@ -5998,10 +5964,9 @@ public HttpRequestProcessor newInstance() { "00\r\n" + "\r\n"; - - sendAndReceive(NetworkFacadeImpl.INSTANCE, request3, expectedResponseHeader3, 4, 0, false); + sendAndReceive(request3, expectedResponseHeader3); // and few more 304s - sendAndReceive(NetworkFacadeImpl.INSTANCE, request2, expectedResponseHeader2, 4, 0, false); + sendAndReceive(request2, expectedResponseHeader2); } finally { Unsafe.free(buffer, netBufferLen, MemoryTag.NATIVE_DEFAULT); } @@ -6099,12 +6064,10 @@ public HttpRequestProcessor newInstance() { String expectedResponseHeader2 = "HTTP/1.1 304 Not Modified\r\n" + "Server: questDB/1.0\r\n" + "Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" + - "Content-Type: text/html; charset=utf-8\r\n" + "\r\n"; for (int i = 0; i < 3; i++) { - sendRequest(request2, fd, buffer); - assertDownloadResponse(fd, rnd, buffer, netBufferLen, 0, expectedResponseHeader2, 126); + sendAndReceive(request2, expectedResponseHeader2); } // couple more full downloads after 304 @@ -6164,8 +6127,9 @@ public HttpRequestProcessor newInstance() { public void testSCPHttp10() throws Exception { assertMemoryLeak(() -> { final String baseDir = root; + NetworkFacade nf = NetworkFacadeImpl.INSTANCE; final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration( - NetworkFacadeImpl.INSTANCE, + nf, baseDir, 16 * 1024, false, @@ -6226,24 +6190,7 @@ public HttpRequestProcessor newInstance() { "ETag: \"122222212222\"\r\n" + // this is last modified timestamp on the file, we set this value when we created file "\r\n"; - for (int j = 0; j < 1; j++) { - int fd = Net.socketTcp(true); - TestUtils.assertConnect(fd, sockAddr); - try { - sendRequest(request, fd, buffer); - assertDownloadResponse( - fd, - rnd, - buffer, - netBufferLen, - diskBufferLen, - expectedResponseHeader, - 20971670 - ); - } finally { - Net.close(fd); - } - } + sendAndReceive(nf, request, expectedResponseHeader, 1, 0, false); // send few requests to receive 304 final String request2 = "GET /questdb-temp.txt HTTP/1.1\r\n" + @@ -6261,31 +6208,16 @@ public HttpRequestProcessor newInstance() { String expectedResponseHeader2 = "HTTP/1.0 304 Not Modified\r\n" + "Server: questDB/1.0\r\n" + "Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" + - "Content-Type: text/html; charset=utf-8\r\n" + "Connection: close\r\n" + "\r\n"; for (int i = 0; i < 3; i++) { - int fd = Net.socketTcp(true); - TestUtils.assertConnect(fd, sockAddr); - try { - sendRequest(request2, fd, buffer); - assertDownloadResponse(fd, rnd, buffer, netBufferLen, 0, expectedResponseHeader2, 126); - } finally { - Net.close(fd); - } + sendAndReceive(nf, request2, expectedResponseHeader2, 1, 0, false); } // couple more full downloads after 304 - for (int j = 0; j < 2; j++) { - int fd = Net.socketTcp(true); - TestUtils.assertConnect(fd, sockAddr); - try { - sendRequest(request, fd, buffer); - assertDownloadResponse(fd, rnd, buffer, netBufferLen, diskBufferLen, expectedResponseHeader, 20971670); - } finally { - Net.close(fd); - } + for (int i = 0; i < 3; i++) { + sendAndReceive(nf, request, expectedResponseHeader, 1, 0, false); } // get a 404 now @@ -6313,29 +6245,11 @@ public HttpRequestProcessor newInstance() { "00\r\n" + "\r\n"; - - for (int i = 0; i < 4; i++) { - int fd = Net.socketTcp(true); - TestUtils.assertConnect(fd, sockAddr); - try { - sendRequest(request3, fd, buffer); - assertDownloadResponse(fd, rnd, buffer, netBufferLen, 0, expectedResponseHeader3, expectedResponseHeader3.length()); - } finally { - Net.close(fd); - } - } + sendAndReceive(nf, request3, expectedResponseHeader3, 1, 0, false); // and few more 304s - for (int i = 0; i < 3; i++) { - int fd = Net.socketTcp(true); - TestUtils.assertConnect(fd, sockAddr); - try { - sendRequest(request2, fd, buffer); - assertDownloadResponse(fd, rnd, buffer, netBufferLen, 0, expectedResponseHeader2, 126); - } finally { - Net.close(fd); - } + sendAndReceive(nf, request2, expectedResponseHeader2, 1, 0, false); } } finally { @@ -6451,7 +6365,7 @@ public void onHeadersReady(HttpConnectionContext context) { while (serverRunning.get()) { dispatcher.run(0); dispatcher.processIOQueue( - (operation, context) -> context.handleClientOperation(operation, selector, EmptyRescheduleContext) + (operation, context, dispatcher1) -> handleClientOperation(context, operation, selector, EmptyRescheduleContext, dispatcher1) ); } } finally { @@ -6603,7 +6517,7 @@ public void onHeadersReady(HttpConnectionContext context) { @Override public void onRequestComplete(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException { - context.simpleResponse().sendStatusWithDefaultMessage(200); + context.simpleResponse().sendStatusTextContent(200); } }; } @@ -6622,7 +6536,7 @@ public HttpRequestProcessor select(Utf8Sequence url) { while (serverRunning.get()) { dispatcher.run(0); dispatcher.processIOQueue( - (operation, context) -> context.handleClientOperation(operation, selector, EmptyRescheduleContext) + (operation, context, dispatcher1) -> handleClientOperation(context, operation, selector, EmptyRescheduleContext, dispatcher1) ); } } finally { @@ -6777,7 +6691,7 @@ public HttpRequestProcessor select(Utf8Sequence url) { while (serverRunning.get()) { dispatcher.run(0); dispatcher.processIOQueue( - (operation, context) -> context.handleClientOperation(operation, selector, EmptyRescheduleContext) + (operation, context, dispatcher1) -> handleClientOperation(context, operation, selector, EmptyRescheduleContext, dispatcher1) ); } } finally { @@ -7734,7 +7648,7 @@ public HttpRequestProcessor select(Utf8Sequence url) { while (serverRunning.get()) { dispatcher.run(0); dispatcher.processIOQueue( - (operation, context) -> context.handleClientOperation(operation, selector, EmptyRescheduleContext) + (operation, context, dispatcher1) -> handleClientOperation(context, operation, selector, EmptyRescheduleContext, dispatcher1) ); } } finally { @@ -7797,7 +7711,7 @@ public HttpRequestProcessor select(Utf8Sequence url) { serverHaltLatch.await(); } } catch (Throwable e) { - e.printStackTrace(); + LOG.critical().$(e).$(); throw e; } finally { finished.set(true); @@ -7889,7 +7803,7 @@ private static Thread createDelayThread(AtomicBoolean stopDelayThread, AtomicRef event.close(); totalEvents.incrementAndGet(); } catch (Exception e) { - e.printStackTrace(); + LOG.critical().$(e).$(); } } else { Os.pause(); @@ -7930,6 +7844,18 @@ private static void createTestTable(CairoEngine engine) { } } + private static void sendAndReceive(String request, CharSequence response) { + sendAndReceive( + NetworkFacadeImpl.INSTANCE, + request, + response, + 1, + 0, + false, + false + ); + } + private static void sendAndReceive( NetworkFacade nf, String request, @@ -8116,6 +8042,27 @@ private HttpQueryTestBuilder getSimpleTester() { .withTelemetry(false); } + private boolean handleClientOperation( + HttpConnectionContext context, + int operation, + HttpRequestProcessorSelector selector, + RescheduleContext rescheduleContext, + IODispatcher dispatcher + ) { + try { + return context.handleClientOperation(operation, selector, rescheduleContext); + } catch (HeartBeatException e) { + dispatcher.registerChannel(context, IOOperation.HEARTBEAT); + } catch (PeerIsSlowToReadException e) { + dispatcher.registerChannel(context, IOOperation.WRITE); + } catch (ServerDisconnectException e) { + dispatcher.disconnect(context, context.getDisconnectReason()); + } catch (PeerIsSlowToWriteException e) { + dispatcher.registerChannel(context, IOOperation.READ); + } + return false; + } + private void importWithO3MaxLagAndMaxUncommittedRowsTableExists( boolean overwrite, boolean syncCommitMode, @@ -8381,7 +8328,7 @@ private void testExecuteAndCancelSqlCommands(final String url) throws Exception String baseTable = "create table tab (b boolean, ts timestamp, sym symbol)"; String walTable = baseTable + " timestamp(ts) partition by DAY WAL"; - ObjList ddls = new ObjList( + ObjList ddls = new ObjList<>( baseTable, baseTable + " timestamp(ts)", baseTable + " timestamp(ts) partition by DAY BYPASS WAL", @@ -8405,15 +8352,15 @@ private void testExecuteAndCancelSqlCommands(final String url) throws Exception String updateWithJoin2 = "update tab t1 set b=sleep(120000) from tab t2 where t1.b = t2.b"; // add many symbols to slow down operation enough so that other thread can detect it in registry and cancel it - String addColumnsTmp = "alter table tab add column s1 symbol index"; + StringBuilder addColumnsTmp = new StringBuilder("alter table tab add column s1 symbol index"); for (int i = 2; i < 30; i++) { - addColumnsTmp += ", s" + i + " symbol index"; + addColumnsTmp.append(", s").append(i).append(" symbol index"); } - final String addColumns = addColumnsTmp; + final String addColumns = addColumnsTmp.toString(); - final ObjList commands; + final ObjList commands; if ("/query".equals(url)) { - commands = new ObjList( + commands = new ObjList<>( createAsSelect, select1, select2, @@ -8432,7 +8379,7 @@ private void testExecuteAndCancelSqlCommands(final String url) throws Exception addColumns ); } else { - commands = new ObjList( + commands = new ObjList<>( select1, select2, selectWithJoin); @@ -8459,7 +8406,7 @@ private void testExecuteAndCancelSqlCommands(final String url) throws Exception try (SqlExecutionContext executionContext = TestUtils.createSqlExecutionCtx(engine)) { for (int i = 0, n = ddls.size(); i < n; i++) { - final String ddl = (String) ddls.getQuick(i); + final String ddl = ddls.getQuick(i); boolean isWal = ddl.equals(walTable); engine.drop("drop table if exists tab", executionContext, null); @@ -8470,7 +8417,7 @@ private void testExecuteAndCancelSqlCommands(final String url) throws Exception } for (int j = 0, k = commands.size(); j < k; j++) { - final String command = (String) commands.getQuick(j); + final String command = commands.getQuick(j); if (isWal) { try (RecordCursorFactory factory = engine.select("select suspended, writerTxn, sequencerTxn from wal_tables() where name = 'tab'", executionContext)) { @@ -8537,7 +8484,7 @@ private void testExecuteAndCancelSqlCommands(final String url) throws Exception Thread walJob = new Thread(() -> { started.countDown(); - try (ApplyWal2TableJob walApplyJob = new ApplyWal2TableJob(engine, 1, 1);) { + try (ApplyWal2TableJob walApplyJob = new ApplyWal2TableJob(engine, 1, 1)) { while (queryError.get() == null) { walApplyJob.drain(0); new CheckWalTransactionsJob(engine).run(0); @@ -8561,6 +8508,7 @@ private void testExecuteAndCancelSqlCommands(final String url) throws Exception //wait until query appears in registry and get query id while (true) { Os.sleep(1); + //noinspection Convert2Diamond testHttpClient.assertGetRegexp( "/query", ".*dataset.*", @@ -8929,7 +8877,7 @@ public long getQueueTimeout() { @Override public void run() { long smem = Unsafe.malloc(1, MemoryTag.NATIVE_DEFAULT); - IORequestProcessor requestProcessor = (operation, context) -> { + IORequestProcessor requestProcessor = (operation, context, dispatcher) -> { int fd = context.getFd(); int rc; switch (operation) { @@ -9110,9 +9058,8 @@ private static class HelloContext extends IOContext { public HelloContext(int fd, SOCountDownLatch closeLatch, IODispatcher dispatcher) { super(PlainSocketFactory.INSTANCE, NetworkFacadeImpl.INSTANCE, LOG, NullLongGauge.INSTANCE); - socket.of(fd); + this.of(fd, dispatcher); this.closeLatch = closeLatch; - this.dispatcher = dispatcher; } @Override @@ -9154,14 +9101,14 @@ public void run() { try { requester.execute(requests[index][0], requests[index][1]); } catch (Throwable e) { - e.printStackTrace(); + LOG.critical().$(e).$(); System.out.println("erm: " + index + ", ts=" + Timestamps.toString(Os.currentTimeMicros())); throw e; } } }); } catch (Throwable e) { - e.printStackTrace(); + LOG.critical().$(e).$(); errorCounter.incrementAndGet(); } finally { latch.countDown(); diff --git a/core/src/test/java/io/questdb/test/cutlass/http/SendAndReceiveRequestBuilder.java b/core/src/test/java/io/questdb/test/cutlass/http/SendAndReceiveRequestBuilder.java index 089749d205fa..a044a438846a 100644 --- a/core/src/test/java/io/questdb/test/cutlass/http/SendAndReceiveRequestBuilder.java +++ b/core/src/test/java/io/questdb/test/cutlass/http/SendAndReceiveRequestBuilder.java @@ -60,7 +60,7 @@ public class SendAndReceiveRequestBuilder { private int compareLength = -1; private boolean expectReceiveDisconnect; private boolean expectSendDisconnect; - private int maxWaitTimeoutMs = 600_000; + private int maxWaitTimeoutMs = Runtime.getRuntime().availableProcessors() > 5 ? 5000 : 300_000; private NetworkFacade nf = NetworkFacadeImpl.INSTANCE; private long pauseBetweenSendAndReceive; private int port = 9001; diff --git a/core/src/test/java/io/questdb/test/cutlass/http/WaitProcessorTest.java b/core/src/test/java/io/questdb/test/cutlass/http/WaitProcessorTest.java index f167f7be523d..e5d357ae7528 100644 --- a/core/src/test/java/io/questdb/test/cutlass/http/WaitProcessorTest.java +++ b/core/src/test/java/io/questdb/test/cutlass/http/WaitProcessorTest.java @@ -175,7 +175,7 @@ public int getMaxProcessingQueueSize() { public long getMaxWaitCapMs() { return 1000; } - }); + }, null); } @NotNull diff --git a/core/src/test/java/io/questdb/test/cutlass/http/line/LineRawHttpTest.java b/core/src/test/java/io/questdb/test/cutlass/http/line/LineRawHttpFuzzTest.java similarity index 76% rename from core/src/test/java/io/questdb/test/cutlass/http/line/LineRawHttpTest.java rename to core/src/test/java/io/questdb/test/cutlass/http/line/LineRawHttpFuzzTest.java index a25fbdeb7322..323321bab10c 100644 --- a/core/src/test/java/io/questdb/test/cutlass/http/line/LineRawHttpTest.java +++ b/core/src/test/java/io/questdb/test/cutlass/http/line/LineRawHttpFuzzTest.java @@ -47,7 +47,7 @@ import static io.questdb.PropertyKey.*; import static io.questdb.test.cutlass.http.line.IlpHttpUtils.getHttpPort; -public class LineRawHttpTest extends AbstractBootstrapTest { +public class LineRawHttpFuzzTest extends AbstractBootstrapTest { @Before public void setUp() { super.setUp(); @@ -250,6 +250,82 @@ public void testMultipartFileIlpUpload() throws Exception { }); } + @Test + public void testValidRequestAfterInvalidWithKeepAlive() throws Exception { + TestUtils.assertMemoryLeak(() -> { + try (final TestServerMain serverMain = startWithEnvVariables( + )) { + serverMain.start(); + + Rnd rnd = TestUtils.generateRandom(LOG); + + int totalCount = 0; + try (HttpClient httpClient = HttpClientFactory.newInstance(new DefaultHttpClientConfiguration())) { + String line = "line,sym1=123 field1=123i 1234567890000000000\n"; + + for (int r = 0; r < 30; r++) { + if (r % 3 == 0) { + int count = 1 + rnd.nextInt(1000); + HttpClient.Request request = httpClient.newRequest(); + request.POST() + .url("/not_found_chunked ") + .withChunkedContent(); + String hexChunkLen = Integer.toHexString(line.length() * count); + hexChunkLen = hexChunkLen.toUpperCase(); + request.putAscii(hexChunkLen).putEOL(); + + for (int i = 0; i < count; i++) { + request.putAscii(line); + } + request.putEOL().putAscii("0").putEOL().putEOL(); + HttpClient.ResponseHeaders resp = request.send("localhost", getHttpPort(serverMain), 5000); + resp.await(); + } else if (r % 3 == 1) { + // Good request + int count = 1 + rnd.nextInt(100); + HttpClient.Request request = httpClient.newRequest(); + request.POST() + .url("/write ") + .withChunkedContent(); + + String hexChunkLen = Integer.toHexString(line.length() * count); + hexChunkLen = hexChunkLen.toLowerCase(); + request.putAscii(hexChunkLen).putEOL(); + + for (int i = 0; i < count; i++) { + request.putAscii(line); + } + + request.putEOL().putAscii("0").putEOL().putEOL(); + HttpClient.ResponseHeaders resp = request.send("localhost", getHttpPort(serverMain), 5000); + resp.await(); + totalCount += count; + } else { + // Good request + int count = 1 + rnd.nextInt(100); + HttpClient.Request request = httpClient.newRequest(); + request.POST() + .url("/not_found ") + .withContent(); + + for (int i = 0; i < count; i++) { + request.putAscii(line); + } + + request.putEOL().putAscii("0").putEOL().putEOL(); + HttpClient.ResponseHeaders resp = request.send("localhost", getHttpPort(serverMain), 5000); + resp.await(); + } + } + } + + serverMain.waitWalTxnApplied("line"); + serverMain.assertSql("select count() from line", "count\n" + + totalCount + "\n"); + } + }); + } + private RequestBody chunkedBody(final RequestBody body) { return new RequestBody() { @Override diff --git a/core/src/test/java/io/questdb/test/cutlass/line/tcp/BaseLineTcpContextTest.java b/core/src/test/java/io/questdb/test/cutlass/line/tcp/BaseLineTcpContextTest.java index dbc4089b882a..4a6902c2b38e 100644 --- a/core/src/test/java/io/questdb/test/cutlass/line/tcp/BaseLineTcpContextTest.java +++ b/core/src/test/java/io/questdb/test/cutlass/line/tcp/BaseLineTcpContextTest.java @@ -234,16 +234,10 @@ public boolean isSymbolAsFieldSupported() { protected boolean handleContextIO0() { switch (context.handleIO(noNetworkIOJob)) { - case NEEDS_READ: - context.getDispatcher().registerChannel(context, IOOperation.READ); - break; - case NEEDS_WRITE: - context.getDispatcher().registerChannel(context, IOOperation.WRITE); - break; case QUEUE_FULL: return true; case NEEDS_DISCONNECT: - context.getDispatcher().disconnect(context, IODispatcher.DISCONNECT_REASON_PROTOCOL_VIOLATION); + disconnected = true; break; } context.commitWalTables(Long.MAX_VALUE); @@ -317,7 +311,6 @@ protected NetworkIOJob createNetworkIOJob(IODispatcher }; noNetworkIOJob.setScheduler(scheduler); context = new LineTcpConnectionContext(lineTcpConfiguration, scheduler, metrics); - Assert.assertNull(context.getDispatcher()); context.of(FD, new IODispatcher() { @Override public void close() { diff --git a/core/src/test/java/io/questdb/test/griffin/engine/functions/regex/RegexpReplaceStrFunctionFactoryTest.java b/core/src/test/java/io/questdb/test/griffin/engine/functions/regex/RegexpReplaceStrFunctionFactoryTest.java index c0ebdbe7363b..b2e9e1fe8841 100644 --- a/core/src/test/java/io/questdb/test/griffin/engine/functions/regex/RegexpReplaceStrFunctionFactoryTest.java +++ b/core/src/test/java/io/questdb/test/griffin/engine/functions/regex/RegexpReplaceStrFunctionFactoryTest.java @@ -29,6 +29,7 @@ import io.questdb.test.AbstractCairoTest; import io.questdb.test.tools.TestUtils; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; public class RegexpReplaceStrFunctionFactoryTest extends AbstractCairoTest { @@ -79,6 +80,7 @@ public void testSimple() throws Exception { } @Test + @Ignore("Flaky") public void testStackOverFlowError() throws Exception { assertFailure( "stack overflow error", diff --git a/core/src/test/java/io/questdb/test/std/str/DirectUtf8SinkTest.java b/core/src/test/java/io/questdb/test/std/str/DirectUtf8SinkTest.java index c49dcd40653e..4722c84b01c9 100644 --- a/core/src/test/java/io/questdb/test/std/str/DirectUtf8SinkTest.java +++ b/core/src/test/java/io/questdb/test/std/str/DirectUtf8SinkTest.java @@ -90,6 +90,7 @@ public void testBorrowNativeByteSink() { @Test public void testCreateEmpty() { + System.gc(); // This test is sensitive to GC deallocating GcUtf8String memory final long mallocCount0 = Unsafe.getMallocCount(); final long reallocCount0 = Unsafe.getReallocCount(); final long freeCount0 = Unsafe.getFreeCount(); @@ -104,28 +105,28 @@ public void testCreateEmpty() { Assert.assertEquals(32, sink.capacity()); final long ptr = sink.ptr(); Assert.assertNotEquals(0, ptr); - Assert.assertEquals(getMallocCount.get().longValue(), 1); - Assert.assertEquals(getReallocCount.get().longValue(), 0); - Assert.assertEquals(getFreeCount.get().longValue(), 0); - Assert.assertEquals(getMemUsed.get().longValue(), 32); + Assert.assertEquals(1, getMallocCount.get().longValue()); + Assert.assertEquals(0, getReallocCount.get().longValue()); + Assert.assertEquals(0, getFreeCount.get().longValue()); + Assert.assertEquals(32, getMemUsed.get().longValue()); sink.put((byte) 'a'); Assert.assertEquals(1, sink.size()); Assert.assertEquals(32, sink.capacity()); Assert.assertEquals(ptr, sink.ptr()); - Assert.assertEquals(getMallocCount.get().longValue(), 1); - Assert.assertEquals(getReallocCount.get().longValue(), 0); - Assert.assertEquals(getFreeCount.get().longValue(), 0); - Assert.assertEquals(getMemUsed.get().longValue(), 32); + Assert.assertEquals(1, getMallocCount.get().longValue()); + Assert.assertEquals(0, getReallocCount.get().longValue()); + Assert.assertEquals(0, getFreeCount.get().longValue()); + Assert.assertEquals(32, getMemUsed.get().longValue()); sink.clear(); Assert.assertEquals(0, sink.size()); Assert.assertEquals(32, sink.capacity()); Assert.assertEquals(ptr, sink.ptr()); - Assert.assertEquals(getMallocCount.get().longValue(), 1); - Assert.assertEquals(getReallocCount.get().longValue(), 0); - Assert.assertEquals(getFreeCount.get().longValue(), 0); - Assert.assertEquals(getMemUsed.get().longValue(), 32); + Assert.assertEquals(1, getMallocCount.get().longValue()); + Assert.assertEquals(0, getReallocCount.get().longValue()); + Assert.assertEquals(0, getFreeCount.get().longValue()); + Assert.assertEquals(32, getMemUsed.get().longValue()); Utf8StringSink onHeapSink = new Utf8StringSink(); onHeapSink.repeat("a", 40); @@ -133,16 +134,16 @@ public void testCreateEmpty() { sink.put(onHeapSink); Assert.assertEquals(40, sink.size()); Assert.assertEquals(64, sink.capacity()); - Assert.assertEquals(getMallocCount.get().longValue(), 1); - Assert.assertEquals(getReallocCount.get().longValue(), 1); - Assert.assertEquals(getFreeCount.get().longValue(), 0); - Assert.assertEquals(getMemUsed.get().longValue(), 64); + Assert.assertEquals(1, getMallocCount.get().longValue()); + Assert.assertEquals(1, getReallocCount.get().longValue()); + Assert.assertEquals(0, getFreeCount.get().longValue()); + Assert.assertEquals(64, getMemUsed.get().longValue()); } - Assert.assertEquals(getMallocCount.get().longValue(), 1); - Assert.assertEquals(getReallocCount.get().longValue(), 1); - Assert.assertEquals(getFreeCount.get().longValue(), 1); - Assert.assertEquals(getMemUsed.get().longValue(), 0); + Assert.assertEquals(1, getMallocCount.get().longValue()); + Assert.assertEquals(1, getReallocCount.get().longValue()); + Assert.assertEquals(1, getFreeCount.get().longValue()); + Assert.assertEquals(0, getMemUsed.get().longValue()); } @Test