From df4469763e50312166f3c1b9d6832ef82875925b Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 17 Jun 2022 13:05:31 +0200 Subject: [PATCH 01/19] fix: writing data to multiple destination --- Client.Test/WriteApiTest.cs | 29 +++++++++++++++++++++++++++++ Client/WriteApi.cs | 17 ++++++----------- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/Client.Test/WriteApiTest.cs b/Client.Test/WriteApiTest.cs index dcae19fed..be40f5f31 100644 --- a/Client.Test/WriteApiTest.cs +++ b/Client.Test/WriteApiTest.cs @@ -502,6 +502,35 @@ public void RequiredOrgBucketWriteApi() "Expecting a non-empty string for 'bucket' parameter. Please specify the bucket as a method parameter or use default configuration at 'InfluxDBClientOptions.Bucket'.", ae.Message); } + + [Test] + public void WritesToDifferentBuckets() + { + var listener = new EventListener(_writeApi); + + MockServer + .Given(Request.Create().WithPath("/api/v2/write").UsingPost()) + .RespondWith(CreateResponse("{}")); + + var entryA = PointData.Measurement("myData") + .Tag("id", 54836.ToString()) + .Field("valueA", 12) + .Timestamp(DateTime.UtcNow.AddSeconds(-10), WritePrecision.Ns); + + _writeApi.WritePoint(entryA, "my-bucket-1", "my-org"); + + var entryB = PointData.Measurement("myData") + .Field("valueB", 42) + .Timestamp(DateTime.UtcNow, WritePrecision.Ns); + + _writeApi.WritePoint(entryB, "my-bucket-2", "my-org"); + + listener.Get(); + listener.Get(); + + Assert.AreEqual("my-bucket-1", MockServer.LogEntries.ToArray()[0].RequestMessage.Query["bucket"].ToString()); + Assert.AreEqual("my-bucket-2", MockServer.LogEntries.ToArray()[1].RequestMessage.Query["bucket"].ToString()); + } } [Measurement("m")] diff --git a/Client/WriteApi.cs b/Client/WriteApi.cs index 8fc14323b..8eb152c02 100644 --- a/Client/WriteApi.cs +++ b/Client/WriteApi.cs @@ -188,7 +188,7 @@ protected internal WriteApi( // // Create Write Point = bucket, org, ... + data // - .Select(grouped => + .SelectMany(grouped => { var aggregate = grouped .Aggregate(StringBuilderPool.Get(), (builder, batchWrite) => @@ -220,20 +220,15 @@ protected internal WriteApi( if (writeOptions.JitterInterval > 0) { + // + // Jitter + // batches = batches - // - // Jitter - // - .Select(source => - { - return source.Delay(_ => - Observable.Timer(TimeSpan.FromMilliseconds(RetryAttempt.JitterDelay(writeOptions)), - writeOptions.WriteScheduler)); - }); + .Delay(_ => Observable.Timer(TimeSpan.FromMilliseconds(RetryAttempt.JitterDelay(writeOptions)), + writeOptions.WriteScheduler)); } var unused = batches - .Concat() // // Map to Async request // From f9e44c18a126897a9a4090c7e88f16fdbc1ef1d6 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 17 Jun 2022 13:17:20 +0200 Subject: [PATCH 02/19] docs: update CHANGELOG.md --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5642d921..f3f89c72b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ### Features 1. [#327](https://github.com/influxdata/influxdb-client-csharp/pull/327): Add interfaces to client's APIs +### Bug Fixes +1. [#329](https://github.com/influxdata/influxdb-client-csharp/pull/329): Writing data to multiple destination by the background writer + ### Dependencies 1. [#326](https://github.com/influxdata/influxdb-client-csharp/pull/326): Update dependencies: From 361c536073fe690045fcb5e4554916dcd696fdbd Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 17 Jun 2022 13:20:41 +0200 Subject: [PATCH 03/19] fix: formatting --- Client.Test/WriteApiTest.cs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Client.Test/WriteApiTest.cs b/Client.Test/WriteApiTest.cs index be40f5f31..8939df411 100644 --- a/Client.Test/WriteApiTest.cs +++ b/Client.Test/WriteApiTest.cs @@ -502,12 +502,12 @@ public void RequiredOrgBucketWriteApi() "Expecting a non-empty string for 'bucket' parameter. Please specify the bucket as a method parameter or use default configuration at 'InfluxDBClientOptions.Bucket'.", ae.Message); } - + [Test] public void WritesToDifferentBuckets() { var listener = new EventListener(_writeApi); - + MockServer .Given(Request.Create().WithPath("/api/v2/write").UsingPost()) .RespondWith(CreateResponse("{}")); @@ -528,8 +528,10 @@ public void WritesToDifferentBuckets() listener.Get(); listener.Get(); - Assert.AreEqual("my-bucket-1", MockServer.LogEntries.ToArray()[0].RequestMessage.Query["bucket"].ToString()); - Assert.AreEqual("my-bucket-2", MockServer.LogEntries.ToArray()[1].RequestMessage.Query["bucket"].ToString()); + Assert.AreEqual("my-bucket-1", + MockServer.LogEntries.ToArray()[0].RequestMessage.Query["bucket"].ToString()); + Assert.AreEqual("my-bucket-2", + MockServer.LogEntries.ToArray()[1].RequestMessage.Query["bucket"].ToString()); } } From 4407dcdc14849b4313460b070d3389d572419dd4 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 17 Jun 2022 14:00:37 +0200 Subject: [PATCH 04/19] fix: jitter --- Client.Test/WriteApiTest.cs | 36 +++++++++++++++++++++++++++++++++ Client/Internal/RetryAttempt.cs | 2 +- Client/WriteApi.cs | 25 ++++++++++++----------- 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/Client.Test/WriteApiTest.cs b/Client.Test/WriteApiTest.cs index 8939df411..e09e18ac2 100644 --- a/Client.Test/WriteApiTest.cs +++ b/Client.Test/WriteApiTest.cs @@ -528,6 +528,42 @@ public void WritesToDifferentBuckets() listener.Get(); listener.Get(); + Assert.AreEqual(2, MockServer.LogEntries.Count()); + Assert.AreEqual("my-bucket-1", + MockServer.LogEntries.ToArray()[0].RequestMessage.Query["bucket"].ToString()); + Assert.AreEqual("my-bucket-2", + MockServer.LogEntries.ToArray()[1].RequestMessage.Query["bucket"].ToString()); + } + + [Test] + public void WritesToDifferentBucketsJitter() + { + _writeApi.Dispose(); + _writeApi = _influxDbClient.GetWriteApi(WriteOptions.CreateNew().JitterInterval(1_000).Build()); + + var listener = new EventListener(_writeApi); + + MockServer + .Given(Request.Create().WithPath("/api/v2/write").UsingPost()) + .RespondWith(CreateResponse("{}")); + + var entryA = PointData.Measurement("myData") + .Tag("id", 54836.ToString()) + .Field("valueA", 12) + .Timestamp(DateTime.UtcNow.AddSeconds(-10), WritePrecision.Ns); + + _writeApi.WritePoint(entryA, "my-bucket-1", "my-org"); + + var entryB = PointData.Measurement("myData") + .Field("valueB", 42) + .Timestamp(DateTime.UtcNow, WritePrecision.Ns); + + _writeApi.WritePoint(entryB, "my-bucket-2", "my-org"); + + listener.Get(); + listener.Get(); + + Assert.AreEqual(2, MockServer.LogEntries.Count()); Assert.AreEqual("my-bucket-1", MockServer.LogEntries.ToArray()[0].RequestMessage.Query["bucket"].ToString()); Assert.AreEqual("my-bucket-2", diff --git a/Client/Internal/RetryAttempt.cs b/Client/Internal/RetryAttempt.cs index 1cd68a080..d274ee9da 100644 --- a/Client/Internal/RetryAttempt.cs +++ b/Client/Internal/RetryAttempt.cs @@ -114,7 +114,7 @@ internal long GetRetryInterval() // from header if (Error is HttpException httpException && httpException.RetryAfter.HasValue) { - return httpException.RetryAfter.Value * 1000 + JitterDelay(_writeOptions); + return httpException.RetryAfter.Value * 1000; } // from configuration diff --git a/Client/WriteApi.cs b/Client/WriteApi.cs index 8eb152c02..10bd237be 100644 --- a/Client/WriteApi.cs +++ b/Client/WriteApi.cs @@ -218,16 +218,6 @@ protected internal WriteApi( .Where(batchWriteItem => !string.IsNullOrEmpty(batchWriteItem.ToLineProtocol())); }); - if (writeOptions.JitterInterval > 0) - { - // - // Jitter - // - batches = batches - .Delay(_ => Observable.Timer(TimeSpan.FromMilliseconds(RetryAttempt.JitterDelay(writeOptions)), - writeOptions.WriteScheduler)); - } - var unused = batches // // Map to Async request @@ -241,10 +231,21 @@ protected internal WriteApi( return Observable .Defer(() => - service.PostWriteAsyncWithIRestResponse(org, bucket, + { + var observable = service.PostWriteAsyncWithIRestResponse(org, bucket, Encoding.UTF8.GetBytes(lineProtocol), null, "identity", "text/plain; charset=utf-8", null, "application/json", null, precision) - .ToObservable()) + .ToObservable(); + + if (writeOptions.JitterInterval > 0) + { + observable = observable + .Delay(_ => Observable.Timer(TimeSpan.FromMilliseconds(RetryAttempt.JitterDelay(writeOptions)), + writeOptions.WriteScheduler)); + } + + return observable; + }) .RetryWhen(f => f .Zip(Observable.Range(1, writeOptions.MaxRetries + 1), (exception, count) => new RetryAttempt(exception, count, writeOptions)) From 191418deb679b11ca4bfeae9292d868c58773db9 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 17 Jun 2022 14:05:24 +0200 Subject: [PATCH 05/19] fix: code style --- Client.Test/WriteApiTest.cs | 4 ++-- Client/WriteApi.cs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Client.Test/WriteApiTest.cs b/Client.Test/WriteApiTest.cs index e09e18ac2..cd9014943 100644 --- a/Client.Test/WriteApiTest.cs +++ b/Client.Test/WriteApiTest.cs @@ -534,13 +534,13 @@ public void WritesToDifferentBuckets() Assert.AreEqual("my-bucket-2", MockServer.LogEntries.ToArray()[1].RequestMessage.Query["bucket"].ToString()); } - + [Test] public void WritesToDifferentBucketsJitter() { _writeApi.Dispose(); _writeApi = _influxDbClient.GetWriteApi(WriteOptions.CreateNew().JitterInterval(1_000).Build()); - + var listener = new EventListener(_writeApi); MockServer diff --git a/Client/WriteApi.cs b/Client/WriteApi.cs index 10bd237be..7f1c2869f 100644 --- a/Client/WriteApi.cs +++ b/Client/WriteApi.cs @@ -240,7 +240,8 @@ protected internal WriteApi( if (writeOptions.JitterInterval > 0) { observable = observable - .Delay(_ => Observable.Timer(TimeSpan.FromMilliseconds(RetryAttempt.JitterDelay(writeOptions)), + .Delay(_ => Observable.Timer( + TimeSpan.FromMilliseconds(RetryAttempt.JitterDelay(writeOptions)), writeOptions.WriteScheduler)); } From bf1732d50191e0bb9a2819ea14acf7e2123631f8 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Mon, 20 Jun 2022 10:22:52 +0200 Subject: [PATCH 06/19] chore: add test to buffer consistency --- Client.Test/ItWriteApiRaceTest.cs | 147 ++++++++++++++++++++++++------ 1 file changed, 121 insertions(+), 26 deletions(-) diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index 3aa2babad..9d8188b2d 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -1,4 +1,6 @@ +using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using InfluxDB.Client.Api.Domain; @@ -10,47 +12,52 @@ namespace InfluxDB.Client.Test [TestFixture] public class ItWriteApiRaceTest : AbstractItClientTest { - [SetUp] - public new async Task SetUp() + private async Task> CreateBuckets(int count = 1) { - _organization = await FindMyOrg(); + var organization = await FindMyOrg(); - var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 3600); + var loggedUser = await Client.GetUsersApi().MeAsync(); + Assert.IsNotNull(loggedUser); - _bucket = await Client.GetBucketsApi() - .CreateBucketAsync(GenerateName("h2o"), retention, _organization); + var buckets = new List(); + var permissions = new List(); - // - // Add Permissions to read and write to the Bucket - // - var resource = - new PermissionResource(PermissionResource.TypeBuckets, _bucket.Id, null, _organization.Id); + for (var i = 1; i <= count; i++) + { + var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 0); - var readBucket = new Permission(Permission.ActionEnum.Read, resource); - var writeBucket = new Permission(Permission.ActionEnum.Write, resource); + var bucket = await Client.GetBucketsApi() + .CreateBucketAsync(GenerateName($"race{i}"), retention, organization); - var loggedUser = await Client.GetUsersApi().MeAsync(); - Assert.IsNotNull(loggedUser); + buckets.Add(bucket); + // + // Add Permissions to read and write to the Bucket + // + var resource = new PermissionResource( + PermissionResource.TypeBuckets, bucket.Id, null, organization.Id); - var authorization = await Client.GetAuthorizationsApi() - .CreateAuthorizationAsync(await FindMyOrg(), new List { readBucket, writeBucket }); + var readBucket = new Permission(Permission.ActionEnum.Read, resource); + var writeBucket = new Permission(Permission.ActionEnum.Write, resource); - _token = authorization.Token; + permissions.Add(readBucket); + permissions.Add(writeBucket); + } + + var authorization = await Client.GetAuthorizationsApi() + .CreateAuthorizationAsync(await FindMyOrg(), permissions); Client.Dispose(); - var options = new InfluxDBClientOptions.Builder().Url(InfluxDbUrl).AuthenticateToken(_token) - .Org(_organization.Id).Bucket(_bucket.Id).Build(); + var options = new InfluxDBClientOptions.Builder().Url(InfluxDbUrl).AuthenticateToken(authorization.Token) + .Org(organization.Id).Bucket(buckets[0].Id).Build(); Client = InfluxDBClientFactory.Create(options); - } - - private Bucket _bucket; - private Organization _organization; - private string _token; + return buckets; + } [Test] - public void Race() + public async Task Race() { + await CreateBuckets(); var point = PointData.Measurement("race-test") .Tag("test", "stress") .Field("value", 1); @@ -81,5 +88,93 @@ public void Race() gateEnd.Wait(); } } + + [Test] + public async Task BufferConsistency() + { + // Configuration + const int secondsCount = 5; + const int writerCount = 4; + var writeOptions = WriteOptions.CreateNew().FlushInterval(1_000_000).Build(); + var buckets = await CreateBuckets(writerCount); + + using var countdownEvent = new CountdownEvent(1); + using var writeApi = Client.GetWriteApi(writeOptions); + + var writers = new List(); + for (var i = 1; i <= writerCount; i++) + { + var writer = new Writer(i, writeApi, countdownEvent, buckets[i - 1]); + writers.Add(writer); + var thread = new Thread(writer.Do); + thread.Start(); + } + + // wait + Thread.Sleep(secondsCount * 1_000); + + // stop + countdownEvent.Signal(); + + // wait to finish + Trace.WriteLine("Wait to finish the writer..."); + writeApi.Dispose(); + Trace.WriteLine("Finished"); + + // check successfully written + foreach (var writer in writers) await writer.Check(Client.GetQueryApi()); + } + } + + internal class Writer + { + private int Identifier { get; } + private IWriteApi WriteApi { get; } + private CountdownEvent CountdownEvent { get; } + private Bucket Bucket { get; } + private long _time; + + public Writer(int identifier, IWriteApi writeApi, CountdownEvent countdownEvent, Bucket bucket) + { + Identifier = identifier; + WriteApi = writeApi ?? throw new ArgumentNullException(nameof(writeApi)); + CountdownEvent = countdownEvent ?? throw new ArgumentNullException(nameof(countdownEvent)); + Bucket = bucket ?? throw new ArgumentNullException(nameof(bucket)); + } + + internal void Do() + { + while (!CountdownEvent.IsSet) + { + _time++; + + var point = PointData.Measurement($"writer-{Identifier}") + .Tag("test", "stress") + .Field("value", _time) + .Timestamp(_time, WritePrecision.Ns); + + WriteApi.WritePoint(point, Bucket.Id); + + if (Identifier == 1 && _time % 50_000 == 0) + { + Trace.WriteLine($"Generated point: {point.ToLineProtocol()}, bucket: {Bucket.Name}"); + } + } + + if (Identifier == 1) + { + Trace.WriteLine($"Generated points: {_time}"); + } + } + + public async Task Check(QueryApi queryApi) + { + var query = $"from(bucket: \"{Bucket.Name}\") |> range(start: 0) |> count()"; + var value = (await queryApi.QueryAsync(query))[0].Records[0].GetValue(); + + Trace.WriteLine($"Written count [{Identifier}]: {value}"); + + Assert.AreEqual(value, _time); + } } } \ No newline at end of file From 462e833117867f8a4f3cbd0600c1e5578e9c9feb Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Mon, 20 Jun 2022 13:17:47 +0200 Subject: [PATCH 07/19] chore: add test to buffer consistency --- Client.Test/ItWriteApiRaceTest.cs | 33 +++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index 9d8188b2d..4d743960e 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -90,16 +90,37 @@ public async Task Race() } [Test] - public async Task BufferConsistency() + public async Task BatchConsistency() + { + var options = WriteOptions.CreateNew().BatchSize(1_555).FlushInterval(1_000_000).Build(); + + await StressfulWriteAndValidate(1, 5, options, (sender, eventArgs) => + { + if (eventArgs is WriteSuccessEvent successEvent) + { + var length = successEvent.LineProtocol.Split("\n").Length; + + Trace.WriteLine($"Count: {length} {successEvent.Bucket}"); + Assert.AreEqual(1_555, length); + } + }); + } + + [Test] + public async Task MultipleBuckets() + { + await StressfulWriteAndValidate(4, 5); + } + + private async Task StressfulWriteAndValidate(int writerCount, int secondsCount, + WriteOptions writeOptions = null, EventHandler eventHandler = null) { - // Configuration - const int secondsCount = 5; - const int writerCount = 4; - var writeOptions = WriteOptions.CreateNew().FlushInterval(1_000_000).Build(); var buckets = await CreateBuckets(writerCount); using var countdownEvent = new CountdownEvent(1); - using var writeApi = Client.GetWriteApi(writeOptions); + using var writeApi = Client + .GetWriteApi(writeOptions ?? WriteOptions.CreateNew().FlushInterval(1_000_000).Build()); + writeApi.EventHandler += eventHandler; var writers = new List(); for (var i = 1; i <= writerCount; i++) From ae6c783cfe7b0d578b20c8f106cb44a9422fbedb Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 08:13:05 +0200 Subject: [PATCH 08/19] fix: tests --- Client.Test/ItWriteApiRaceTest.cs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index 4d743960e..20ba8e805 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Threading; using System.Threading.Tasks; using InfluxDB.Client.Api.Domain; @@ -94,16 +95,27 @@ public async Task BatchConsistency() { var options = WriteOptions.CreateNew().BatchSize(1_555).FlushInterval(1_000_000).Build(); + var batches = new List(); await StressfulWriteAndValidate(1, 5, options, (sender, eventArgs) => { if (eventArgs is WriteSuccessEvent successEvent) { - var length = successEvent.LineProtocol.Split("\n").Length; + batches.Add(successEvent); + } + }); + + foreach (var batch in batches) + { + var length = batch.LineProtocol.Split("\n").Length; - Trace.WriteLine($"Count: {length} {successEvent.Bucket}"); + Trace.WriteLine($"Count: {length} {batch.Bucket}"); + + // last element flush the rest + if (batches.Last() != batch) + { Assert.AreEqual(1_555, length); } - }); + } } [Test] From b598da07b730f6c893d887d30d5cb1e2f6572283 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 08:33:21 +0200 Subject: [PATCH 09/19] chore: add test with flush --- Client.Test/ItWriteApiRaceTest.cs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index 20ba8e805..0226c6b99 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -93,7 +93,7 @@ public async Task Race() [Test] public async Task BatchConsistency() { - var options = WriteOptions.CreateNew().BatchSize(1_555).FlushInterval(1_000_000).Build(); + var options = WriteOptions.CreateNew().BatchSize(1_555).FlushInterval(10_000).Build(); var batches = new List(); await StressfulWriteAndValidate(1, 5, options, (sender, eventArgs) => @@ -124,6 +124,14 @@ public async Task MultipleBuckets() await StressfulWriteAndValidate(4, 5); } + [Test] + public async Task MultipleBucketsWithFlush() + { + var writeOptions = WriteOptions.CreateNew().FlushInterval(1_000).Build(); + + await StressfulWriteAndValidate(4, 5, writeOptions); + } + private async Task StressfulWriteAndValidate(int writerCount, int secondsCount, WriteOptions writeOptions = null, EventHandler eventHandler = null) { @@ -131,7 +139,7 @@ private async Task StressfulWriteAndValidate(int writerCount, int secondsCount, using var countdownEvent = new CountdownEvent(1); using var writeApi = Client - .GetWriteApi(writeOptions ?? WriteOptions.CreateNew().FlushInterval(1_000_000).Build()); + .GetWriteApi(writeOptions ?? WriteOptions.CreateNew().FlushInterval(10_000).Build()); writeApi.EventHandler += eventHandler; var writers = new List(); From 6e39c24c9618f28d93590fd60a424c9ebbe448ae Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 08:42:47 +0200 Subject: [PATCH 10/19] chore: add test with flush --- Client.Test/ItWriteApiRaceTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index 0226c6b99..a9bbefa75 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -128,7 +128,7 @@ public async Task MultipleBuckets() public async Task MultipleBucketsWithFlush() { var writeOptions = WriteOptions.CreateNew().FlushInterval(1_000).Build(); - + await StressfulWriteAndValidate(4, 5, writeOptions); } From b7d100ee859e033bdf73634954072bb0b452d703 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 09:02:05 +0200 Subject: [PATCH 11/19] fix: tests --- Client.Test/ItWriteApiRaceTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index a9bbefa75..4d1041910 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -139,7 +139,7 @@ private async Task StressfulWriteAndValidate(int writerCount, int secondsCount, using var countdownEvent = new CountdownEvent(1); using var writeApi = Client - .GetWriteApi(writeOptions ?? WriteOptions.CreateNew().FlushInterval(10_000).Build()); + .GetWriteApi(writeOptions ?? WriteOptions.CreateNew().FlushInterval(20_000).Build()); writeApi.EventHandler += eventHandler; var writers = new List(); From 8594a976a3872eecadfa0c76bbeb99fd6ab1b00c Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 10:59:51 +0200 Subject: [PATCH 12/19] chore: stream console output --- Client.Core.Test/AbstractTest.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/Client.Core.Test/AbstractTest.cs b/Client.Core.Test/AbstractTest.cs index 7f7e82bb9..ecb9c92a8 100644 --- a/Client.Core.Test/AbstractTest.cs +++ b/Client.Core.Test/AbstractTest.cs @@ -23,6 +23,7 @@ public void SetUp() if (!Trace.Listeners.Contains(ConsoleOutListener)) { + Console.SetOut(TestContext.Progress); Trace.Listeners.Add(ConsoleOutListener); } } From baca79638ed200bc11390e2f69458b4fa144cdc8 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 11:00:19 +0200 Subject: [PATCH 13/19] feat: use ThreadPool to optimize thread allocation --- Client/WriteOptions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Client/WriteOptions.cs b/Client/WriteOptions.cs index 5e3b98af2..4b88e1ba5 100644 --- a/Client/WriteOptions.cs +++ b/Client/WriteOptions.cs @@ -113,7 +113,7 @@ public sealed class Builder internal int MaxRetriesBuilder = DefaultMaxRetries; internal int MaxRetryDelayBuilder = DefaultMaxRetryDelay; internal int ExponentialBaseBuilder = DefaultExponentialBase; - internal IScheduler WriteSchedulerBuilder = NewThreadScheduler.Default; + internal IScheduler WriteSchedulerBuilder = ThreadPoolScheduler.Instance; /// /// Set the number of data point to collect in batch. From b970a191b2b799c95f5265bc98e22a035a8c6a48 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 11:00:49 +0200 Subject: [PATCH 14/19] chore: add possibility to specify amount of time to wait --- Client/WriteApi.cs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/Client/WriteApi.cs b/Client/WriteApi.cs index 7f1c2869f..944e2cf81 100644 --- a/Client/WriteApi.cs +++ b/Client/WriteApi.cs @@ -325,6 +325,15 @@ protected internal WriteApi( } public void Dispose() + { + ReleaseAndClose(); + } + + /// + /// Release all resources and flush remaining data into database. + /// + /// How much milliseconds wait to flush data. + internal void ReleaseAndClose(int millis = 30000) { _unsubscribeDisposeCommand.Dispose(); // avoid duplicate call to dispose @@ -343,7 +352,7 @@ public void Dispose() _subject.Dispose(); _flush.Dispose(); - WaitToCondition(() => _disposed, 30000); + WaitToCondition(() => _disposed, millis); } public bool Disposed => _disposed; From e52c392e2558f5299d74255a5170ecc2f2c0833e Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 11:01:16 +0200 Subject: [PATCH 15/19] chore: better logging --- Client.Test/ItWriteApiRaceTest.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index 4d1041910..e1cf5d0a6 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -158,9 +158,9 @@ private async Task StressfulWriteAndValidate(int writerCount, int secondsCount, countdownEvent.Signal(); // wait to finish - Trace.WriteLine("Wait to finish the writer..."); - writeApi.Dispose(); - Trace.WriteLine("Finished"); + Console.WriteLine("Wait to finish the writer..."); + writeApi.ReleaseAndClose(millis: 180_000); + Console.WriteLine("Finished"); // check successfully written foreach (var writer in writers) await writer.Check(Client.GetQueryApi()); @@ -196,15 +196,15 @@ internal void Do() WriteApi.WritePoint(point, Bucket.Id); - if (Identifier == 1 && _time % 50_000 == 0) + if (Identifier == 1 && _time % 100_000 == 0) { - Trace.WriteLine($"Generated point: {point.ToLineProtocol()}, bucket: {Bucket.Name}"); + Console.WriteLine($"Generated point: {point.ToLineProtocol()}, bucket: {Bucket.Name}"); } } if (Identifier == 1) { - Trace.WriteLine($"Generated points: {_time}"); + Console.WriteLine($"Generated points: {_time}"); } } @@ -213,7 +213,7 @@ public async Task Check(QueryApi queryApi) var query = $"from(bucket: \"{Bucket.Name}\") |> range(start: 0) |> count()"; var value = (await queryApi.QueryAsync(query))[0].Records[0].GetValue(); - Trace.WriteLine($"Written count [{Identifier}]: {value}"); + Console.WriteLine($"Written count [{Identifier}]: {value}"); Assert.AreEqual(value, _time); } From 5c741e929a112495d560721c4490bde4c654352b Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 11:13:55 +0200 Subject: [PATCH 16/19] fix: code style --- Client.Test/ItWriteApiRaceTest.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index e1cf5d0a6..41313144c 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -121,18 +121,18 @@ await StressfulWriteAndValidate(1, 5, options, (sender, eventArgs) => [Test] public async Task MultipleBuckets() { - await StressfulWriteAndValidate(4, 5); + await StressfulWriteAndValidate(); } [Test] public async Task MultipleBucketsWithFlush() { - var writeOptions = WriteOptions.CreateNew().FlushInterval(1_000).Build(); + var writeOptions = WriteOptions.CreateNew().FlushInterval(100).Build(); - await StressfulWriteAndValidate(4, 5, writeOptions); + await StressfulWriteAndValidate(writeOptions: writeOptions); } - private async Task StressfulWriteAndValidate(int writerCount, int secondsCount, + private async Task StressfulWriteAndValidate(int writerCount = 4, int secondsCount = 5, WriteOptions writeOptions = null, EventHandler eventHandler = null) { var buckets = await CreateBuckets(writerCount); @@ -159,7 +159,7 @@ private async Task StressfulWriteAndValidate(int writerCount, int secondsCount, // wait to finish Console.WriteLine("Wait to finish the writer..."); - writeApi.ReleaseAndClose(millis: 180_000); + writeApi.ReleaseAndClose(180_000); Console.WriteLine("Finished"); // check successfully written From 0dfa54081e3e0abd1db59e5c8b17f76b727a2358 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 13:45:10 +0200 Subject: [PATCH 17/19] chore: try to use preview version --- Client/Client.csproj | 2 +- NuGet.config | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 NuGet.config diff --git a/Client/Client.csproj b/Client/Client.csproj index acb199a11..b98fde656 100644 --- a/Client/Client.csproj +++ b/Client/Client.csproj @@ -38,7 +38,7 @@ - + diff --git a/NuGet.config b/NuGet.config new file mode 100644 index 000000000..cb01ed005 --- /dev/null +++ b/NuGet.config @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file From 9deba30f4d14252264bfc78540f6bac5adc1cfdf Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 14:08:49 +0200 Subject: [PATCH 18/19] chore: revert new NuGet source --- Client.Test/ItWriteApiRaceTest.cs | 4 ++-- Client/Client.csproj | 2 +- NuGet.config | 7 ------- 3 files changed, 3 insertions(+), 10 deletions(-) delete mode 100644 NuGet.config diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index 41313144c..1107fceee 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -211,11 +211,11 @@ internal void Do() public async Task Check(QueryApi queryApi) { var query = $"from(bucket: \"{Bucket.Name}\") |> range(start: 0) |> count()"; - var value = (await queryApi.QueryAsync(query))[0].Records[0].GetValue(); + var value = (long)(await queryApi.QueryAsync(query))[0].Records[0].GetValue(); Console.WriteLine($"Written count [{Identifier}]: {value}"); - Assert.AreEqual(value, _time); + Assert.GreaterOrEqual(value, _time); } } } \ No newline at end of file diff --git a/Client/Client.csproj b/Client/Client.csproj index b98fde656..acb199a11 100644 --- a/Client/Client.csproj +++ b/Client/Client.csproj @@ -38,7 +38,7 @@ - + diff --git a/NuGet.config b/NuGet.config deleted file mode 100644 index cb01ed005..000000000 --- a/NuGet.config +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - - \ No newline at end of file From 45b0b0eb2f59f02b86a40a23ddcf4548624ca3d3 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 21 Jun 2022 14:26:44 +0200 Subject: [PATCH 19/19] chore: revert new NuGet source --- Client.Test/ItWriteApiRaceTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index 1107fceee..fc793ee9a 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -215,7 +215,7 @@ public async Task Check(QueryApi queryApi) Console.WriteLine($"Written count [{Identifier}]: {value}"); - Assert.GreaterOrEqual(value, _time); + Assert.GreaterOrEqual(_time, value); } } } \ No newline at end of file