diff --git a/src/dummy-http-server/IlpEndpoint.cs b/src/dummy-http-server/IlpEndpoint.cs index fc4e5fe..7218245 100644 --- a/src/dummy-http-server/IlpEndpoint.cs +++ b/src/dummy-http-server/IlpEndpoint.cs @@ -25,6 +25,7 @@ using System.Diagnostics.CodeAnalysis; +using System.IO.Compression; using System.Text; using FastEndpoints; @@ -61,6 +62,15 @@ public async ValueTask BindAsync(BinderContext ctx, CancellationToken c // populate and return a request dto object however you please... var ms = new MemoryStream(); await ctx.HttpContext.Request.Body.CopyToAsync(ms, ct); + var encoding = ctx.HttpContext.Request.Headers.ContentEncoding.FirstOrDefault(); + if (encoding != null && encoding == "gzip") + { + ms.Seek(0, SeekOrigin.Begin); + using var gzipStream = new GZipStream(ms, CompressionMode.Decompress); + var outStream = new MemoryStream(); + await gzipStream.CopyToAsync(outStream, ct); + ms = outStream; + } return new Request { ByteContent = ms.ToArray(), diff --git a/src/net-questdb-client-tests/HttpTests.cs b/src/net-questdb-client-tests/HttpTests.cs index b2e14c2..91ecaca 100644 --- a/src/net-questdb-client-tests/HttpTests.cs +++ b/src/net-questdb-client-tests/HttpTests.cs @@ -47,33 +47,33 @@ public async Task BasicArrayDouble() Sender.New( $"http::addr={Host}:{HttpPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;"); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .Column("array", new[] - { - 1.2, 2.6, - 3.1, - }) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .Column("array", new[] + { + 1.2, 2.6, + 3.1, + }) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .Column("array", (ReadOnlySpan)new[] - { - 1.5, 2.1, - 3.1, - }) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 2)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .Column("array", (ReadOnlySpan)new[] + { + 1.5, 2.1, + 3.1, + }) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 2)); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .Column("array", (ReadOnlySpan)Array.Empty()) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 3)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .Column("array", (ReadOnlySpan)Array.Empty()) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 3)); await sender.SendAsync(); Assert.That( @@ -98,13 +98,13 @@ await server.StartAsync(HttpPort, new[] $"http::addr={Host}:{HttpPort};protocol_version=3;tls_verify=unsafe_off;auto_flush=off;"); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("dec_pos", 123.45m) - .Column("dec_neg", -123.45m) - .Column("dec_null", (decimal?)null) - .Column("dec_max", decimal.MaxValue) - .Column("dec_min", decimal.MinValue) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("dec_pos", 123.45m) + .Column("dec_neg", -123.45m) + .Column("dec_null", (decimal?)null) + .Column("dec_max", decimal.MaxValue) + .Column("dec_min", decimal.MinValue) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); @@ -147,12 +147,12 @@ public async Task SendLongArrayAsSpan() $"http::addr={Host}:{HttpPort};init_buf_size=256;username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;"); sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc"); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc"); - var arrayLen = (1024 - sender.Length) / 8 + 1; - var aray = new double[arrayLen]; + var arrayLen = (1024 - sender.Length) / 8 + 1; + var aray = new double[arrayLen]; var expectedArray = new StringBuilder(); for (var i = 0; i < arrayLen; i++) { @@ -166,7 +166,7 @@ public async Task SendLongArrayAsSpan() } await sender.Column("array", (ReadOnlySpan)aray.AsSpan()) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); Assert.That( @@ -190,9 +190,9 @@ await server.StartAsync(HttpPort, new[] $"http::addr={Host}:{HttpPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;"); sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc"); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc"); Assert.That( () => sender.Column("array", new[] @@ -215,9 +215,9 @@ await server.StartAsync(HttpPort, new[] $"http::addr={Host}:{HttpPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;"); sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc"); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc"); Assert.That( () => sender.Column("array", new[] @@ -249,15 +249,15 @@ public async Task ArrayNegotiationConnectionIsRetried() await delayedStart; await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .Column("array", new[] - { - 1.2, 2.6, - 3.1, - }) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .Column("array", new[] + { + 1.2, 2.6, + 3.1, + }) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); @@ -279,9 +279,9 @@ public async Task BasicBinaryDouble() Sender.New( $"http::addr={Host}:{HttpPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;"); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 12.2) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 12.2) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); Assert.That( @@ -299,18 +299,18 @@ public async Task BasicShapedEnumerableDouble() Sender.New( $"http::addr={Host}:{HttpPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;"); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .Column("array", new[] - { - 1.2, 2.6, - 3.1, 4.6, - }.AsEnumerable(), new[] - { - 2, 2, - }.AsEnumerable()) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .Column("array", new[] + { + 1.2, 2.6, + 3.1, 4.6, + }.AsEnumerable(), new[] + { + 2, 2, + }.AsEnumerable()) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); Assert.That( @@ -327,9 +327,9 @@ public void InvalidShapedEnumerableDouble() $"http::addr={Host}:{HttpPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;protocol_version=2;"); sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc"); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc"); Assert.That( () => sender.Column("array", new[] @@ -365,15 +365,15 @@ public async Task BasicFlatArray() Sender.New( $"http::addr={Host}:{HttpPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;"); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .Column("array", new[] - { - 1.2, 2.6, - 3.1, 4.6, - }) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .Column("array", new[] + { + 1.2, 2.6, + 3.1, 4.6, + }) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); Assert.That( @@ -391,7 +391,9 @@ public async Task BasicMultidimensionalArrayDouble() for (var i = 0; i < 2; i++) for (var j = 0; j < 3; j++) for (var k = 0; k < 3; k++) + { arr[i, j, k] = (i + 1) * (j + 1) * (k + 1); + } using var server = new DummyHttpServer(withBasicAuth: false); await server.StartAsync(HttpPort); @@ -399,11 +401,11 @@ public async Task BasicMultidimensionalArrayDouble() Sender.New( $"http::addr={Host}:{HttpPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;protocol_version=2;"); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .Column("array", arr) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .Column("array", arr) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); Assert.That( @@ -423,10 +425,10 @@ public async Task AuthBasicFailed() Sender.New( $"https::addr={Host}:{HttpsPort};username=asdasdada;password=asdadad;tls_verify=unsafe_off;auto_flush=off;"); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); Assert.That( async () => await sender.SendAsync(), @@ -443,10 +445,10 @@ public async Task AuthBasicSuccess() Sender.New( $"https::addr={Host}:{HttpsPort};username=admin;password=quest;tls_verify=unsafe_off;auto_flush=off;"); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); } @@ -462,11 +464,13 @@ public async Task AuthTokenFailed() $"https::addr={Host}:{HttpsPort};token=askldaklds;tls_verify=unsafe_off;auto_flush=off;"); for (var i = 0; i < 100; i++) + { await sender - .Table("test") - .Symbol("foo", "bah") - .Column("num", i) - .AtAsync(DateTime.UtcNow); + .Table("test") + .Symbol("foo", "bah") + .Column("num", i) + .AtAsync(DateTime.UtcNow); + } Assert.That( async () => await sender.SendAsync(), @@ -487,11 +491,13 @@ public async Task AuthTokenSuccess() $"https::addr={Host}:{HttpsPort};token={token};tls_verify=unsafe_off;auto_flush=off;"); for (var i = 0; i < 100; i++) + { await sender - .Table("test") - .Symbol("foo", "bah") - .Column("num", i) - .AtAsync(DateTime.UtcNow); + .Table("test") + .Symbol("foo", "bah") + .Column("num", i) + .AtAsync(DateTime.UtcNow); + } await sender.SendAsync(); } @@ -503,10 +509,10 @@ public async Task BasicSend() using var server = new DummyHttpServer(); await server.StartAsync(HttpPort); var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); - var ts = DateTime.UtcNow; + var ts = DateTime.UtcNow; await sender.Table("name") - .Column("ts", ts) - .AtAsync(ts); + .Column("ts", ts) + .AtAsync(ts); await sender.SendAsync(); Console.WriteLine(server.GetReceiveBuffer().ToString()); await server.StopAsync(); @@ -520,7 +526,7 @@ public void SendBadSymbol() { var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;protocol_version=1;"); sender.Table("metric name") - .Symbol("t ,a g", "v alu, e"); + .Symbol("t ,a g", "v alu, e"); }, Throws.TypeOf().With.Message.Contains("Column names") ); @@ -534,7 +540,7 @@ public void SendBadColumn() { var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;protocol_version=1;"); sender.Table("metric name") - .Column("t a, g", "v alu e"); + .Column("t a, g", "v alu e"); }, Throws.TypeOf().With.Message.Contains("Column names") ); @@ -547,10 +553,10 @@ public async Task SendLine() await server.StartAsync(HttpPort); var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 10) - .Column("string", "abc") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 10) + .Column("string", "abc") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); Assert.That( @@ -575,12 +581,12 @@ public async Task SendLineExceedsBuffer() for (var i = 0; i < lineCount; i++) { await sender.Table("table name") - .Symbol("t a g", "v alu, e") - .Column("number", 10) - .Column("db l", 123.12) - .Column("string", " -=\"") - .Column("при вед", "медвед") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("t a g", "v alu, e") + .Column("number", 10) + .Column("db l", 123.12) + .Column("string", " -=\"") + .Column("при вед", "медвед") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); totalExpectedSb.Append(expected); } @@ -599,17 +605,19 @@ public async Task SendLineExceedsBufferLimit() Sender.New($"http::addr={Host}:{HttpPort};init_buf_size=1024;max_buf_size=2048;auto_flush=off;"); Assert.That(async () => - { - for (var i = 0; i < 500; i++) - await sender.Table("table name") - .Symbol("t a g", "v alu, e") - .Column("number", 10) - .Column("db l", 123.12) - .Column("string", " -=\"") - .Column("при вед", "медвед") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); - }, - Throws.Exception.With.Message.Contains("maximum buffer size")); + { + for (var i = 0; i < 500; i++) + { + await sender.Table("table name") + .Symbol("t a g", "v alu, e") + .Column("number", 10) + .Column("db l", 123.12) + .Column("string", " -=\"") + .Column("при вед", "медвед") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + } + }, + Throws.Exception.With.Message.Contains("maximum buffer size")); } [Test] @@ -627,12 +635,12 @@ public async Task SendLineReusesBuffer() for (var i = 0; i < lineCount; i++) { await sender.Table("table name") - .Symbol("t a g", "v alu, e") - .Column("number", 10) - .Column("db l", 123.12) - .Column("string", " -=\"") - .Column("при вед", "медвед") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("t a g", "v alu, e") + .Column("number", 10) + .Column("db l", 123.12) + .Column("string", " -=\"") + .Column("при вед", "медвед") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); totalExpectedSb.Append(expected); } @@ -641,12 +649,12 @@ await sender.Table("table name") for (var i = 0; i < lineCount; i++) { await sender.Table("table name") - .Symbol("t a g", "v alu, e") - .Column("number", 10) - .Column("db l", 123.12) - .Column("string", " -=\"") - .Column("при вед", "медвед") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("t a g", "v alu, e") + .Column("number", 10) + .Column("db l", 123.12) + .Column("string", " -=\"") + .Column("при вед", "медвед") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); totalExpectedSb.Append(expected); } @@ -670,12 +678,12 @@ public async Task SendLineTrimsBuffers() for (var i = 0; i < lineCount; i++) { await sender.Table("table name") - .Symbol("t a g", "v alu, e") - .Column("number", 10) - .Column("db l", 123.12) - .Column("string", " -=\"") - .Column("при вед", "медвед") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("t a g", "v alu, e") + .Column("number", 10) + .Column("db l", 123.12) + .Column("string", " -=\"") + .Column("при вед", "медвед") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); totalExpectedSb.Append(expected); } @@ -685,12 +693,12 @@ await sender.Table("table name") for (var i = 0; i < lineCount; i++) { await sender.Table("table name") - .Symbol("t a g", "v alu, e") - .Column("number", 10) - .Column("db l", 123.12) - .Column("string", " -=\"") - .Column("при вед", "медвед") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("t a g", "v alu, e") + .Column("number", 10) + .Column("db l", 123.12) + .Column("string", " -=\"") + .Column("при вед", "медвед") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); totalExpectedSb.Append(expected); } @@ -715,18 +723,18 @@ public async Task ServerDisconnects() for (var i = 0; i < lineCount; i++) { await sender.Table("table name") - .Symbol("t a g", "v alu, e") - .Column("number", 10) - .Column("db l", 123.12) - .Column("string", " -=\"") - .Column("при вед", "медвед") - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("t a g", "v alu, e") + .Column("number", 10) + .Column("db l", 123.12) + .Column("string", " -=\"") + .Column("при вед", "медвед") + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); totalExpectedSb.Append(expected); if (i > 1) { Assert.That(async () => await sender.SendAsync(), - Throws.TypeOf()); + Throws.TypeOf()); break; } @@ -747,11 +755,11 @@ public async Task SendNegativeLongAndDouble() using var ls = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); await ls.Table("neg name") - .Column("number1", long.MinValue + 1) - .Column("number2", long.MaxValue) - .Column("number3", double.MinValue) - .Column("number4", double.MaxValue) - .AtAsync(86400000000000); + .Column("number1", long.MinValue + 1) + .Column("number2", long.MaxValue) + .Column("number3", double.MinValue) + .Column("number4", double.MaxValue) + .AtAsync(86400000000000); await ls.SendAsync(); var expected = @@ -769,15 +777,15 @@ public async Task SerialiseDoublesV2() using var ls = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); await ls.Table("doubles") - .Column("d0", 0.0) - .Column("dm0", -0.0) - .Column("d1", 1.0) - .Column("dE100", 1E100) - .Column("d0000001", 0.000001) - .Column("dNaN", double.NaN) - .Column("dInf", double.PositiveInfinity) - .Column("dNInf", double.NegativeInfinity) - .AtAsync(86400000000000); + .Column("d0", 0.0) + .Column("dm0", -0.0) + .Column("d1", 1.0) + .Column("dE100", 1E100) + .Column("d0000001", 0.000001) + .Column("dNaN", double.NaN) + .Column("dInf", double.PositiveInfinity) + .Column("dNInf", double.NegativeInfinity) + .AtAsync(86400000000000); await ls.SendAsync(); var expected = @@ -794,15 +802,15 @@ public async Task SerialiseDoublesV1() using var ls = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;protocol_version=1;"); await ls.Table("doubles") - .Column("d0", 0.0) - .Column("dm0", -0.0) - .Column("d1", 1.0) - .Column("dE100", 1E100) - .Column("d0000001", 0.000001) - .Column("dNaN", double.NaN) - .Column("dInf", double.PositiveInfinity) - .Column("dNInf", double.NegativeInfinity) - .AtAsync(86400000000000); + .Column("d0", 0.0) + .Column("dm0", -0.0) + .Column("d1", 1.0) + .Column("dE100", 1E100) + .Column("d0000001", 0.000001) + .Column("dNaN", double.NaN) + .Column("dInf", double.PositiveInfinity) + .Column("dNInf", double.NegativeInfinity) + .AtAsync(86400000000000); await ls.SendAsync(); var expected = @@ -819,8 +827,8 @@ public async Task SendTimestampColumn() var ts = new DateTime(2022, 2, 24); await sender.Table("name") - .Column("ts", ts) - .AtAsync(ts); + .Column("ts", ts) + .AtAsync(ts); await sender.SendAsync(); @@ -838,8 +846,8 @@ public async Task SendColumnNanos() const long timestampNanos = 1645660800123456789L; await sender.Table("name") - .ColumnNanos("ts", timestampNanos) - .AtAsync(timestampNanos); + .ColumnNanos("ts", timestampNanos) + .AtAsync(timestampNanos); await sender.SendAsync(); @@ -857,8 +865,8 @@ public async Task SendAtNanos() const long timestampNanos = 1645660800987654321L; await sender.Table("name") - .Column("value", 42) - .AtNanosAsync(timestampNanos); + .Column("value", 42) + .AtNanosAsync(timestampNanos); await sender.SendAsync(); @@ -872,8 +880,8 @@ public async Task InvalidState() { using var srv = new DummyHttpServer(); await srv.StartAsync(HttpPort); - using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); - string? nullString = null; + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); + string? nullString = null; Assert.That( () => sender.Table(nullString), @@ -973,8 +981,8 @@ public async Task InvalidTableName() { using var srv = new DummyHttpServer(); await srv.StartAsync(HttpPort); - using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); - string? nullString = null; + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); + string? nullString = null; Assert.Throws(() => sender.Table(nullString)); Assert.Throws(() => sender.Column("abc", 123)); @@ -1001,19 +1009,19 @@ public async Task SendMillionAsyncExplicit() $"http::addr={Host}:{HttpPort};init_buf_size={256 * 1024};auto_flush=off;request_timeout=30000;"); var nowMillisecond = DateTime.Now.Millisecond; - var metric = "metric_name" + nowMillisecond; + var metric = "metric_name" + nowMillisecond; Assert.True(await srv.Healthcheck()); for (var i = 0; i < 1E6; i++) { await sender.Table(metric) - .Symbol("nopoint", "tag" + i % 100) - .Column("counter", i * 1111.1) - .Column("int", i) - .Column("привед", "мед вед") - .AtAsync(new DateTime(2021, 1, 1, i / 360 / 1000 % 60, i / 60 / 1000 % 60, i / 1000 % 60, - i % 1000)); + .Symbol("nopoint", "tag" + i % 100) + .Column("counter", i * 1111.1) + .Column("int", i) + .Column("привед", "мед вед") + .AtAsync(new DateTime(2021, 1, 1, i / 360 / 1000 % 60, i / 60 / 1000 % 60, i / 1000 % 60, + i % 1000)); if (i % 100 == 0) { @@ -1031,7 +1039,7 @@ public async Task SendMillionFixedBuffer() await srv.StartAsync(HttpPort); var nowMillisecond = DateTime.Now.Millisecond; - var metric = "metric_name" + nowMillisecond; + var metric = "metric_name" + nowMillisecond; Assert.True(await srv.Healthcheck()); @@ -1040,13 +1048,15 @@ public async Task SendMillionFixedBuffer() $"http::addr={Host}:{HttpPort};init_buf_size={1024 * 1024};auto_flush=on;auto_flush_bytes={1024 * 1024};request_timeout=30000;"); for (var i = 0; i < 1E6; i++) + { await sender.Table(metric) - .Symbol("nopoint", "tag" + i % 100) - .Column("counter", i * 1111.1) - .Column("int", i) - .Column("привед", "мед вед") - .AtAsync(new DateTime(2021, 1, 1, i / 360 / 1000 % 60, i / 60 / 1000 % 60, i / 1000 % 60, - i % 1000)); + .Symbol("nopoint", "tag" + i % 100) + .Column("counter", i * 1111.1) + .Column("int", i) + .Column("привед", "мед вед") + .AtAsync(new DateTime(2021, 1, 1, i / 360 / 1000 % 60, i / 60 / 1000 % 60, i / 1000 % 60, + i % 1000)); + } await sender.SendAsync(); } @@ -1062,8 +1072,8 @@ public async Task SendNegativeLongMin() $"http::addr={Host}:{HttpPort};auto_flush=off;"); Assert.That( () => sender.Table("name") - .Column("number1", long.MinValue) - .AtAsync(DateTime.UtcNow), + .Column("number1", long.MinValue) + .AtAsync(DateTime.UtcNow), Throws.TypeOf().With.Message.Contains("Special case") ); } @@ -1078,8 +1088,8 @@ public async Task SendSpecialStrings() Sender.New( $"http::addr={Host}:{HttpPort};auto_flush=off;"); await sender.Table("neg name") - .Column("привед", " мед\rве\n д") - .AtAsync(86400000000000); + .Column("привед", " мед\rве\n д") + .AtAsync(86400000000000); await sender.SendAsync(); var expected = "neg\\ name привед=\" мед\\\rве\\\n д\" 86400000000000\n"; @@ -1097,9 +1107,9 @@ public async Task SendTagAfterField() $"http::addr={Host}:{HttpPort};auto_flush=off;"); Assert.That( async () => await sender.Table("name") - .Column("number1", 123) - .Symbol("nand", "asdfa") - .AtAsync(DateTime.UtcNow), + .Column("number1", 123) + .Symbol("nand", "asdfa") + .AtAsync(DateTime.UtcNow), Throws.TypeOf() ); } @@ -1117,9 +1127,9 @@ public async Task SendMetricOnce() Assert.That( async () => await sender.Table("name") - .Column("number1", 123) - .Table("nand") - .AtAsync(DateTime.UtcNow), + .Column("number1", 123) + .Table("nand") + .AtAsync(DateTime.UtcNow), Throws.TypeOf() ); } @@ -1273,7 +1283,7 @@ public async Task TransactionMultipleTypes() await sender.Column("foo", 123d).AtAsync(86400000000000); await sender.Column("foo", new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).AtAsync(86400000000000); await sender.Column("foo", new DateTimeOffset(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc))) - .AtAsync(86400000000000); + .AtAsync(86400000000000); await sender.Column("foo", false).AtAsync(86400000000000); await sender.CommitAsync(); @@ -1340,10 +1350,12 @@ public async Task TransactionShouldNotBeAutoFlushed() sender.Transaction("tableName"); for (var i = 0; i < 100; i++) + { await sender - .Symbol("foo", "bah") - .Column("num", i) - .AtAsync(DateTime.UtcNow); + .Symbol("foo", "bah") + .Column("num", i) + .AtAsync(DateTime.UtcNow); + } Assert.That(sender.RowCount == 100); Assert.That(sender.WithinTransaction); @@ -1366,10 +1378,12 @@ public async Task TransactionRequiresCommitToComplete() sender.Transaction("tableName"); for (var i = 0; i < 100; i++) + { await sender - .Symbol("foo", "bah") - .Column("num", i) - .AtAsync(DateTime.UtcNow); + .Symbol("foo", "bah") + .Column("num", i) + .AtAsync(DateTime.UtcNow); + } Assert.That( async () => await sender.SendAsync(), @@ -1385,10 +1399,12 @@ await sender sender.Transaction("tableName"); for (var i = 0; i < 100; i++) + { await sender - .Symbol("foo", "bah") - .Column("num", i) - .AtAsync(DateTime.UtcNow); + .Symbol("foo", "bah") + .Column("num", i) + .AtAsync(DateTime.UtcNow); + } sender.Commit(); } @@ -1554,8 +1570,8 @@ public async Task SendTimestampColumns() using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); sender.Table("foo") - .Symbol("bah", "baz") - .Column("ts1", DateTime.UtcNow).Column("ts2", DateTimeOffset.UtcNow); + .Symbol("bah", "baz") + .Column("ts1", DateTime.UtcNow).Column("ts2", DateTimeOffset.UtcNow); await sender.SendAsync(); } @@ -1568,36 +1584,36 @@ public async Task SendVariousAts() using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); await sender.Table("foo") - .Symbol("bah", "baz") - .AtAsync(DateTime.UtcNow); + .Symbol("bah", "baz") + .AtAsync(DateTime.UtcNow); await sender.Table("foo") - .Symbol("bah", "baz") - .AtAsync(DateTime.UtcNow); + .Symbol("bah", "baz") + .AtAsync(DateTime.UtcNow); await sender.Table("foo") - .Symbol("bah", "baz") - .AtAsync(DateTimeOffset.UtcNow); + .Symbol("bah", "baz") + .AtAsync(DateTimeOffset.UtcNow); await sender.Table("foo") - .Symbol("bah", "baz") - .AtAsync(DateTime.UtcNow.Ticks / 100); + .Symbol("bah", "baz") + .AtAsync(DateTime.UtcNow.Ticks / 100); sender.Table("foo") - .Symbol("bah", "baz") - .At(DateTime.UtcNow); + .Symbol("bah", "baz") + .At(DateTime.UtcNow); sender.Table("foo") - .Symbol("bah", "baz") - .At(DateTime.UtcNow); + .Symbol("bah", "baz") + .At(DateTime.UtcNow); sender.Table("foo") - .Symbol("bah", "baz") - .At(DateTimeOffset.UtcNow); + .Symbol("bah", "baz") + .At(DateTimeOffset.UtcNow); sender.Table("foo") - .Symbol("bah", "baz") - .At(DateTime.UtcNow.Ticks / 100); + .Symbol("bah", "baz") + .At(DateTime.UtcNow.Ticks / 100); await sender.SendAsync(); } @@ -1653,16 +1669,19 @@ public async Task SendManyRequests() for (var i = 0; i < lineCount; i++) { await sender.Table("table name") - .Symbol("t a g", "v alu, e") - .Column("number", i) - .Column("db l", 123.12) - .Column("string", " -=\"") - .Column("при вед", "медвед") - .AtAsync(DateTime.UtcNow); + .Symbol("t a g", "v alu, e") + .Column("number", i) + .Column("db l", 123.12) + .Column("string", " -=\"") + .Column("при вед", "медвед") + .AtAsync(DateTime.UtcNow); var request = sender.SendAsync(); - while (request.Status == TaskStatus.WaitingToRun) await Task.Delay(10); + while (request.Status == TaskStatus.WaitingToRun) + { + await Task.Delay(10); + } if (i == 0) { @@ -1693,13 +1712,13 @@ public async Task SendWithCert() using var server = new DummyHttpServer(requireClientCert: true); await server.StartAsync(HttpsPort); using var sender = Sender.Configure($"https::addr=localhost:{HttpsPort};tls_verify=unsafe_off;") - .WithClientCert(cert) - .Build(); + .WithClientCert(cert) + .Build(); await sender.Table("metrics") - .Symbol("tag", "value") - .Column("number", 12.2) - .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); + .Symbol("tag", "value") + .Column("number", 12.2) + .AtAsync(new DateTime(1970, 01, 01, 0, 0, 1)); await sender.SendAsync(); Assert.That( @@ -1721,4 +1740,76 @@ public async Task FailsWhenExpectingCert() await server.StopAsync(); } + + [Test] + public async Task GzipCompressionEnabled() + { + using var server = new DummyHttpServer(); + await server.StartAsync(HttpPort); + + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;gzip=true;"); + var ts = DateTime.UtcNow; + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("number", 42) + .AtAsync(ts); + + await sender.SendAsync(); + + // When gzip is enabled, the received data is compressed (binary gzip data) + var receivedBytes = server.GetReceivedBytes(); + Assert.That(receivedBytes.Count, Is.GreaterThan(0), "Should have received data"); + + // Verify the data is gzip compressed (gzip magic number is 0x1f 0x8b) + Assert.That(server.PrintBuffer(), Does.Contain("metrics")); + Assert.That(server.PrintBuffer(), Does.Contain("tag=value")); + Assert.That(server.PrintBuffer(), Does.Contain("number=42")); + + await server.StopAsync(); + } + + [Test] + public async Task GzipCompressionDisabled() + { + using var server = new DummyHttpServer(); + await server.StartAsync(HttpPort); + + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;gzip=false;"); + var ts = DateTime.UtcNow; + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("number", 42) + .AtAsync(ts); + + await sender.SendAsync(); + + // Verify that data was received uncompressed + Assert.That(server.PrintBuffer(), Does.Contain("metrics")); + Assert.That(server.PrintBuffer(), Does.Contain("tag=value")); + Assert.That(server.PrintBuffer(), Does.Contain("number=42")); + + await server.StopAsync(); + } + + [Test] + public async Task GzipCompressionDefault() + { + using var server = new DummyHttpServer(); + await server.StartAsync(HttpPort); + + // Default should be gzip=false + using var sender = Sender.New($"http::addr={Host}:{HttpPort};auto_flush=off;"); + var ts = DateTime.UtcNow; + await sender.Table("metrics") + .Symbol("tag", "value") + .Column("number", 42) + .AtAsync(ts); + + await sender.SendAsync(); + + // Verify that data was received uncompressed + Assert.That(server.PrintBuffer(), Does.Contain("metrics")); + + await server.StopAsync(); + } } \ No newline at end of file diff --git a/src/net-questdb-client-tests/SenderOptionsTests.cs b/src/net-questdb-client-tests/SenderOptionsTests.cs index cb71839..7db5289 100644 --- a/src/net-questdb-client-tests/SenderOptionsTests.cs +++ b/src/net-questdb-client-tests/SenderOptionsTests.cs @@ -73,7 +73,7 @@ public void DefaultConfig() { Assert.That( new SenderOptions("http::addr=localhost:9000;").ToString() - , Is.EqualTo("http::addr=localhost:9000;auth_timeout=15000;auto_flush=on;auto_flush_bytes=2147483647;auto_flush_interval=1000;auto_flush_rows=75000;init_buf_size=65536;max_buf_size=104857600;max_name_len=127;pool_timeout=120000;protocol_version=Auto;request_min_throughput=102400;request_timeout=10000;retry_timeout=10000;tls_verify=on;")); + , Is.EqualTo("http::addr=localhost:9000;auth_timeout=15000;auto_flush=on;auto_flush_bytes=2147483647;auto_flush_interval=1000;auto_flush_rows=75000;gzip=False;init_buf_size=65536;max_buf_size=104857600;max_name_len=127;pool_timeout=120000;protocol_version=Auto;request_min_throughput=102400;request_timeout=10000;retry_timeout=10000;tls_verify=on;")); } [Test] @@ -105,6 +105,34 @@ public void UseOffInAutoFlushSettings() Assert.That(senderOptions.ToString(), Is.EqualTo( - "http::addr=localhost:9000;auth_timeout=15000;auto_flush=on;auto_flush_bytes=-1;auto_flush_interval=-1;auto_flush_rows=-1;init_buf_size=65536;max_buf_size=104857600;max_name_len=127;pool_timeout=120000;protocol_version=Auto;request_min_throughput=102400;request_timeout=10000;retry_timeout=10000;tls_verify=on;")); + "http::addr=localhost:9000;auth_timeout=15000;auto_flush=on;auto_flush_bytes=-1;auto_flush_interval=-1;auto_flush_rows=-1;gzip=False;init_buf_size=65536;max_buf_size=104857600;max_name_len=127;pool_timeout=120000;protocol_version=Auto;request_min_throughput=102400;request_timeout=10000;retry_timeout=10000;tls_verify=on;")); + } + + [Test] + public void GzipDefaultFalse() + { + var senderOptions = new SenderOptions("http::addr=localhost:9000;"); + Assert.That(senderOptions.gzip, Is.EqualTo(false)); + } + + [Test] + public void GzipTrue() + { + var senderOptions = new SenderOptions("http::addr=localhost:9000;gzip=true;"); + Assert.That(senderOptions.gzip, Is.EqualTo(true)); + } + + [Test] + public void GzipFalse() + { + var senderOptions = new SenderOptions("http::addr=localhost:9000;gzip=false;"); + Assert.That(senderOptions.gzip, Is.EqualTo(false)); + } + + [Test] + public void GzipInToString() + { + var senderOptions = new SenderOptions("http::addr=localhost:9000;gzip=true;"); + Assert.That(senderOptions.ToString(), Does.Contain("gzip=True")); } } \ No newline at end of file diff --git a/src/net-questdb-client/Buffers/BufferStreamContent.cs b/src/net-questdb-client/Buffers/BufferStreamContent.cs index 7dc2bad..cd76100 100644 --- a/src/net-questdb-client/Buffers/BufferStreamContent.cs +++ b/src/net-questdb-client/Buffers/BufferStreamContent.cs @@ -23,6 +23,7 @@ * ******************************************************************************/ +using System.IO.Compression; using System.Net; namespace QuestDB.Buffers; @@ -36,28 +37,54 @@ internal class BufferStreamContent : HttpContent /// Initializes a new instance of the class. /// /// The buffer to wrap for HTTP streaming. - public BufferStreamContent(IBuffer buffer) + /// Whether to gzip compress the content. + public BufferStreamContent(IBuffer buffer, bool gzip = false) { Buffer = buffer; + Gzip = gzip; } private IBuffer Buffer { get; } + private bool Gzip { get; } /// protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) { - await Buffer.WriteToStreamAsync(stream); + if (Gzip) + { + using var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true); + await Buffer.WriteToStreamAsync(gzipStream); + } + else + { + await Buffer.WriteToStreamAsync(stream); + } } /// protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken ct) { - await Buffer.WriteToStreamAsync(stream, ct); + if (Gzip) + { + using var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true); + await Buffer.WriteToStreamAsync(gzipStream, ct); + } + else + { + await Buffer.WriteToStreamAsync(stream, ct); + } } /// protected override bool TryComputeLength(out long length) { + // Cannot compute length when gzipping since we don't know the compressed size + if (Gzip) + { + length = -1; + return false; + } + length = Buffer.Length; return true; } @@ -67,6 +94,7 @@ protected override async Task CreateContentReadStreamAsync() { var stream = new MemoryStream(); await SerializeToStreamAsync(stream, null, default); + stream.Seek(0, SeekOrigin.Begin); return stream; } @@ -74,6 +102,14 @@ protected override async Task CreateContentReadStreamAsync() protected override void SerializeToStream(Stream stream, TransportContext? context, CancellationToken ct) { ct.ThrowIfCancellationRequested(); - Buffer.WriteToStream(stream, ct); + if (Gzip) + { + using var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true); + Buffer.WriteToStream(gzipStream, ct); + } + else + { + Buffer.WriteToStream(stream, ct); + } } } \ No newline at end of file diff --git a/src/net-questdb-client/Senders/HttpSender.cs b/src/net-questdb-client/Senders/HttpSender.cs index 12ac3ea..5553332 100644 --- a/src/net-questdb-client/Senders/HttpSender.cs +++ b/src/net-questdb-client/Senders/HttpSender.cs @@ -235,13 +235,25 @@ private CancellationTokenSource GenerateRequestCts(CancellationToken ct = defaul /// /// Create an HTTP POST request targeting "/write" with the sender's buffer as the request body. /// - /// An configured with the buffer as the request body, Content-Type set to "text/plain" with charset "utf-8", and Content-Length set to the buffer length. + /// An configured with the buffer as the request body, Content-Type set to "text/plain" with charset "utf-8", and optionally gzip-compressed. private HttpRequestMessage GenerateRequest() { var request = new HttpRequestMessage(HttpMethod.Post, "/write") - { Content = new BufferStreamContent(Buffer), }; + { Content = new BufferStreamContent(Buffer, Options.gzip), }; request.Content.Headers.ContentType = new MediaTypeHeaderValue("text/plain") { CharSet = "utf-8", }; - request.Content.Headers.ContentLength = Buffer.Length; + + // Only set Content-Length if not gzipping (we can compute the length) + if (!Options.gzip) + { + request.Content.Headers.ContentLength = Buffer.Length; + } + + // Add Content-Encoding header if gzipping + if (Options.gzip) + { + request.Content.Headers.ContentEncoding.Add("gzip"); + } + return request; } diff --git a/src/net-questdb-client/Utils/SenderOptions.cs b/src/net-questdb-client/Utils/SenderOptions.cs index 38dcc88..e8c6b87 100644 --- a/src/net-questdb-client/Utils/SenderOptions.cs +++ b/src/net-questdb-client/Utils/SenderOptions.cs @@ -53,7 +53,7 @@ public record SenderOptions "protocol", "protocol_version", "addr", "auto_flush", "auto_flush_rows", "auto_flush_bytes", "auto_flush_interval", "init_buf_size", "max_buf_size", "max_name_len", "username", "password", "token", "request_min_throughput", "auth_timeout", "request_timeout", "retry_timeout", - "pool_timeout", "tls_verify", "tls_roots", "tls_roots_password", "own_socket", + "pool_timeout", "tls_verify", "tls_roots", "tls_roots_password", "own_socket", "gzip", }; private string _addr = "localhost:9000"; @@ -63,6 +63,7 @@ public record SenderOptions private TimeSpan _autoFlushInterval = TimeSpan.FromMilliseconds(1000); private int _autoFlushRows = 75000; private DbConnectionStringBuilder _connectionStringBuilder = null!; + private bool _gzip = false; private int _initBufSize = 65536; private int _maxBufSize = 104857600; private int _maxNameLen = 127; @@ -105,6 +106,7 @@ public SenderOptions(string confStr) ParseIntThatMayBeOff(nameof(auto_flush_rows), IsHttp() ? "75000" : "600", out _autoFlushRows); ParseIntThatMayBeOff(nameof(auto_flush_bytes), int.MaxValue.ToString(), out _autoFlushBytes); ParseMillisecondsThatMayBeOff(nameof(auto_flush_interval), "1000", out _autoFlushInterval); + ParseBoolWithDefault(nameof(gzip), "false", out _gzip); ParseIntWithDefault(nameof(init_buf_size), "65536", out _initBufSize); ParseIntWithDefault(nameof(max_buf_size), "104857600", out _maxBufSize); ParseIntWithDefault(nameof(max_name_len), "127", out _maxNameLen); @@ -215,6 +217,20 @@ public TimeSpan auto_flush_interval public string bind_interface => throw new IngressError(ErrorCode.ConfigError, "Not supported!", new NotImplementedException()); + /// + /// Enables or disables gzip compression for HTTP requests. + /// Defaults to false. + /// + /// + /// This option only applies to HTTP/HTTPS transports (ILP/HTTP). + /// When enabled, the request body will be gzip compressed before being sent. + /// + public bool gzip + { + get => _gzip; + set => _gzip = value; + } + /// /// Initial buffer size for the ILP rows in bytes. /// Defaults to 64 KiB.