diff --git a/compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java b/compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java index 439b13038408..85faa20837d1 100644 --- a/compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java +++ b/compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java @@ -477,8 +477,10 @@ public void testNoErrorLastLineNoLineBreak() throws Exception { @Test public void testPing() { + LOG.info().$("=== send fragmentation=").$(5).$(); try (final ServerMain serverMain = ServerMain.create(root, new HashMap() {{ put(PropertyKey.LINE_HTTP_PING_VERSION.getEnvVarName(), "v2.2.2"); + put(PropertyKey.DEBUG_FORCE_SEND_FRAGMENTATION_CHUNK_SIZE.getEnvVarName(), "5"); }})) { serverMain.start(); try (final InfluxDB influxDB = InfluxDBUtils.getConnection(serverMain)) { diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java b/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java index 2dfeb5678f35..31d95f050ccd 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java @@ -777,6 +777,42 @@ private HttpRequestProcessor getHttpRequestProcessor(HttpRequestProcessorSelecto return processor; } + private HttpRequestProcessor checkProcessorValidForRequest( + Utf8Sequence method, + HttpRequestProcessor processor, + boolean chunked, + boolean multipartRequest, + long contentLength, + boolean multipartProcessor + ) { + if (Utf8s.equalsAscii("POST", method) || Utf8s.equalsAscii("PUT", method)) { + if (!multipartProcessor) { + if (multipartRequest) { + return rejectRequest(HTTP_NOT_FOUND, "Method (multipart POST) not supported"); + } else { + return rejectRequest(HTTP_NOT_FOUND, "Method not supported"); + } + } + if (chunked && contentLength > 0) { + return rejectRequest(HTTP_BAD_REQUEST, "Invalid chunked request; content-length specified"); + } + if (!chunked && !multipartRequest && contentLength < 0) { + return rejectRequest(HTTP_BAD_REQUEST, "Content-length not specified for POST/PUT request"); + } + } else if (Utf8s.equalsAscii("GET", method)) { + if (chunked || multipartRequest || contentLength > 0) { + return rejectRequest(HTTP_BAD_REQUEST, "GET request method cannot have content"); + } + if (multipartProcessor) { + return rejectRequest(HTTP_NOT_FOUND, "Method GET not supported"); + } + } else { + return rejectRequest(HTTP_BAD_REQUEST, "Method not supported"); + } + + return processor; + } + private boolean handleClientRecv(HttpRequestProcessorSelector selector, RescheduleContext rescheduleContext) throws PeerIsSlowToReadException, PeerIsSlowToWriteException, ServerDisconnectException { boolean busyRecv = true; try { @@ -844,29 +880,22 @@ private boolean handleClientRecv(HttpRequestProcessorSelector selector, Reschedu } } + processor = checkProcessorValidForRequest( + headerParser.getMethod(), + processor, + chunked, + multipartRequest, + contentLength, + multipartProcessor + ); + if (chunked) { - if (!multipartProcessor) { - // bad request - regular request for processor that expects multipart - processor = rejectRequest(HTTP_NOT_FOUND, "method (chunked POST) not supported"); - } busyRecv = consumeChunked(processor, headerEnd, read, newRequest); } else if (multipartRequest) { - if (!multipartProcessor) { - // bad request - multipart request for processor that doesn't expect multipart - processor = rejectRequest(HTTP_NOT_FOUND, "method (multipart POST) not supported"); - } busyRecv = consumeMultipart(socket, processor, headerEnd, read, newRequest, rescheduleContext); - } else if (contentLength > -1) { - if (!multipartProcessor) { - processor = rejectRequest(HTTP_NOT_FOUND, "method (POST) not supported"); - } + } else if (contentLength > 0) { busyRecv = consumeContent(contentLength, socket, processor, headerEnd, read, newRequest); } else { - if (multipartProcessor) { - // bad request - regular request for processor that expects multipart - processor = rejectRequest(HTTP_BAD_REQUEST, "Bad request. Multipart POST expected."); - } - // Do not expect any more bytes to be sent to us before // we respond back to client. We will disconnect the client when // they abuse protocol. In addition, we will not call processor @@ -1019,5 +1048,12 @@ public void onRequestComplete(HttpConnectionContext context) throws PeerDisconne } reset(); } + + @Override + public void resumeSend( + HttpConnectionContext context + ) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException, QueryPausedException { + onRequestComplete(context); + } } } diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpResponseSink.java b/core/src/main/java/io/questdb/cutlass/http/HttpResponseSink.java index 6162fe2ef633..590ae5ff9aee 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpResponseSink.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpResponseSink.java @@ -91,6 +91,7 @@ public void clear() { totalBytesSent = 0; headersSent = false; chunkedRequestDone = false; + simple.clear(); resetZip(); } @@ -748,6 +749,14 @@ private void putAsciiInternal(@Nullable CharSequence cs) { } public class SimpleResponseImpl { + private boolean contentSent = false; + private boolean headerSent = false; + + public void clear() { + contentSent = false; + headerSent = false; + } + public void sendStatusJsonContent( int code ) throws PeerDisconnectedException, PeerIsSlowToReadException { @@ -772,12 +781,15 @@ public void sendStatusJsonContent( } public void sendStatusNoContent(int code, @Nullable CharSequence header) throws PeerDisconnectedException, PeerIsSlowToReadException { - buffer.clearAndPrepareToWriteToBuffer(); - headerImpl.status(httpVersion, code, null, -2L); - if (header != null) { - headerImpl.put(header).put(Misc.EOL); + if (!headerSent) { + buffer.clearAndPrepareToWriteToBuffer(); + headerImpl.status(httpVersion, code, null, -2L); + if (header != null) { + headerImpl.put(header).put(Misc.EOL); + } + prepareHeaderSink(); + headerSent = true; } - prepareHeaderSink(); flushSingle(); } @@ -836,28 +848,35 @@ private void sendStatusWithContent( @Nullable CharSequence cookieValue, long contentLength ) throws PeerDisconnectedException, PeerIsSlowToReadException { - buffer.clearAndPrepareToWriteToBuffer(); - final String std = headerImpl.status(httpVersion, code, contentType, contentLength); - if (header != null) { - headerImpl.put(header).put(Misc.EOL); + if (!headerSent) { + buffer.clearAndPrepareToWriteToBuffer(); + headerImpl.status(httpVersion, code, contentType, contentLength); + if (header != null) { + headerImpl.put(header).put(Misc.EOL); + } + if (cookieName != null) { + setCookie(cookieName, cookieValue); + } + prepareHeaderSink(); + headerSent = true; } - if (cookieName != null) { - setCookie(cookieName, cookieValue); + + if (!contentSent) { + flushSingle(); + buffer.clearAndPrepareToWriteToBuffer(); + if (message == null) { + sink.put(httpStatusMap.get(code)).putEOL(); + } else { + // this is ugly, add a putUtf16() method to the response sink? + final Utf8StringSink utf8Sink = tlSink.get(); + utf8Sink.clear(); + utf8Sink.put(message); + sink.put(utf8Sink).putEOL(); + } + final boolean chunked = headerImpl.isChunked(); + buffer.prepareToReadFromBuffer(chunked, chunked); + contentSent = true; } - prepareHeaderSink(); - flushSingle(); - buffer.clearAndPrepareToWriteToBuffer(); - if (message == null) { - sink.put(std).putEOL(); - } else { - // this is ugly, add a putUtf16() method to the response sink? - final Utf8StringSink utf8Sink = tlSink.get(); - utf8Sink.clear(); - utf8Sink.put(message); - sink.put(utf8Sink).putEOL(); - } - final boolean chunked = headerImpl.isChunked(); - buffer.prepareToReadFromBuffer(chunked, chunked); resumeSend(); } diff --git a/core/src/main/java/io/questdb/cutlass/http/client/HttpClient.java b/core/src/main/java/io/questdb/cutlass/http/client/HttpClient.java index 26f0ec976447..c264bf90fff9 100644 --- a/core/src/main/java/io/questdb/cutlass/http/client/HttpClient.java +++ b/core/src/main/java/io/questdb/cutlass/http/client/HttpClient.java @@ -252,6 +252,12 @@ public class Request implements Utf8Sink { private int state; private boolean urlEncode = false; + public Request DELETE() { + assert state == STATE_REQUEST; + state = STATE_URL; + return put("DELETE "); + } + public Request GET() { assert state == STATE_REQUEST; state = STATE_URL; diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpPingProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpPingProcessor.java index def8d54b2580..7e18f799344f 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpPingProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/LineHttpPingProcessor.java @@ -26,4 +26,9 @@ public void onRequestComplete( ) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException, QueryPausedException { context.simpleResponse().sendStatusNoContent(204, header); } + + @Override + public void resumeSend(HttpConnectionContext context) throws PeerIsSlowToReadException, PeerDisconnectedException { + context.simpleResponse().sendStatusNoContent(204, header); + } } diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java index 6cc12dd7e680..b3eb55277ad7 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java @@ -223,7 +223,9 @@ public void onPartEnd() throws PeerDisconnectedException, PeerIsSlowToReadExcept @Override public void onRequestComplete(HttpConnectionContext context) { - transientState.clear(); + if (transientState != null) { + transientState.clear(); + } } @Override diff --git a/core/src/test/java/io/questdb/test/cutlass/http/IODispatcherTest.java b/core/src/test/java/io/questdb/test/cutlass/http/IODispatcherTest.java index 76e7cbeec553..d392799c735f 100644 --- a/core/src/test/java/io/questdb/test/cutlass/http/IODispatcherTest.java +++ b/core/src/test/java/io/questdb/test/cutlass/http/IODispatcherTest.java @@ -54,7 +54,6 @@ import io.questdb.test.AbstractTest; import io.questdb.test.CreateTableTestUtils; import io.questdb.test.cairo.DefaultTestCairoConfiguration; -import io.questdb.cairo.CursorPrinter; import io.questdb.test.cairo.TableModel; import io.questdb.test.cairo.TestRecord; import io.questdb.test.cutlass.NetUtils; @@ -1461,14 +1460,14 @@ public void testImportBadJson() throws Exception { @Test public void testImportBadRequestGet() throws Exception { testImport( - "HTTP/1.1 400 Bad request\r\n" + + "HTTP/1.1 404 Not Found\r\n" + "Server: questDB/1.0\r\n" + "Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" + "Transfer-Encoding: chunked\r\n" + "Content-Type: text/plain; charset=utf-8\r\n" + "\r\n" + - "27\r\n" + - "Bad request. Multipart POST expected.\r\n" + + "1a\r\n" + + "Method GET not supported\r\n" + "\r\n" + "00\r\n" + "\r\n", @@ -5676,7 +5675,7 @@ public void testPostRequestToGetProcessor() throws Exception { "Content-Type: text/plain; charset=utf-8\r\n" + "\r\n" + "27\r\n" + - "method (multipart POST) not supported\r\n" + + "Method (multipart POST) not supported\r\n" + "\r\n" + "00\r\n" + "\r\n", diff --git a/core/src/test/java/io/questdb/test/cutlass/http/line/LineHttpFailureTest.java b/core/src/test/java/io/questdb/test/cutlass/http/line/LineHttpFailureTest.java index 8575f5531744..3f6764be95d5 100644 --- a/core/src/test/java/io/questdb/test/cutlass/http/line/LineHttpFailureTest.java +++ b/core/src/test/java/io/questdb/test/cutlass/http/line/LineHttpFailureTest.java @@ -27,6 +27,7 @@ import io.questdb.Bootstrap; import io.questdb.DefaultBootstrapConfiguration; import io.questdb.DefaultHttpClientConfiguration; +import io.questdb.ServerMain; import io.questdb.cairo.TableToken; import io.questdb.cairo.pool.PoolListener; import io.questdb.cutlass.http.client.HttpClient; @@ -48,6 +49,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -357,9 +359,10 @@ public void testClientDisconnectsMidRequest() throws Exception { @Test public void testPutAndGetAreNotSupported() throws Exception { TestUtils.assertMemoryLeak(() -> { - try (final TestServerMain serverMain = startWithEnvVariables( - DEBUG_FORCE_SEND_FRAGMENTATION_CHUNK_SIZE.getEnvVarName(), "5" - )) { + try (final ServerMain serverMain = ServerMain.create(root, new HashMap() {{ + put(DEBUG_FORCE_SEND_FRAGMENTATION_CHUNK_SIZE.getEnvVarName(), "5"); + }}) + ) { serverMain.start(); String line = "line,sym1=123 field1=123i 1234567890000000000\n"; @@ -389,7 +392,36 @@ public void testPutAndGetAreNotSupported() throws Exception { .send() ) { resp.await(); - TestUtils.assertEquals("404", resp.getStatusCode()); + TestUtils.assertEquals("400", resp.getStatusCode()); + } + } + + try (HttpClient httpClient = HttpClientFactory.newPlainTextInstance(new DefaultHttpClientConfiguration())) { + HttpClient.Request request = httpClient.newRequest("localhost", serverMain.getHttpServerPort()); + try ( + HttpClient.ResponseHeaders resp = request.DELETE() + .url("/write ") + .withContent() + .putAscii(line) + .putAscii(line) + .send() + ) { + resp.await(); + TestUtils.assertEquals("400", resp.getStatusCode()); + } + } + + try (HttpClient httpClient = HttpClientFactory.newPlainTextInstance(new DefaultHttpClientConfiguration())) { + HttpClient.Request request = httpClient.newRequest("localhost", serverMain.getHttpServerPort()); + try ( + HttpClient.ResponseHeaders resp = request.POST() + .url("/write ") + .putAscii(line) + .putAscii(line) + .send() + ) { + resp.await(); + TestUtils.assertEquals("400", resp.getStatusCode()); } } }