diff --git a/CHANGELOG.md b/CHANGELOG.md index f5642d921..f3f89c72b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ### Features 1. [#327](https://github.com/influxdata/influxdb-client-csharp/pull/327): Add interfaces to client's APIs +### Bug Fixes +1. [#329](https://github.com/influxdata/influxdb-client-csharp/pull/329): Writing data to multiple destination by the background writer + ### Dependencies 1. [#326](https://github.com/influxdata/influxdb-client-csharp/pull/326): Update dependencies: diff --git a/Client.Core.Test/AbstractTest.cs b/Client.Core.Test/AbstractTest.cs index 7f7e82bb9..ecb9c92a8 100644 --- a/Client.Core.Test/AbstractTest.cs +++ b/Client.Core.Test/AbstractTest.cs @@ -23,6 +23,7 @@ public void SetUp() if (!Trace.Listeners.Contains(ConsoleOutListener)) { + Console.SetOut(TestContext.Progress); Trace.Listeners.Add(ConsoleOutListener); } } diff --git a/Client.Test/ItWriteApiRaceTest.cs b/Client.Test/ItWriteApiRaceTest.cs index 3aa2babad..fc793ee9a 100644 --- a/Client.Test/ItWriteApiRaceTest.cs +++ b/Client.Test/ItWriteApiRaceTest.cs @@ -1,4 +1,7 @@ +using System; using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; using System.Threading; using System.Threading.Tasks; using InfluxDB.Client.Api.Domain; @@ -10,47 +13,52 @@ namespace InfluxDB.Client.Test [TestFixture] public class ItWriteApiRaceTest : AbstractItClientTest { - [SetUp] - public new async Task SetUp() + private async Task> CreateBuckets(int count = 1) { - _organization = await FindMyOrg(); + var organization = await FindMyOrg(); - var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 3600); + var loggedUser = await Client.GetUsersApi().MeAsync(); + Assert.IsNotNull(loggedUser); - _bucket = await Client.GetBucketsApi() - .CreateBucketAsync(GenerateName("h2o"), retention, _organization); + var buckets = new List(); + var permissions = new List(); - // - // Add Permissions to read and write to the Bucket - // - var resource = - new PermissionResource(PermissionResource.TypeBuckets, _bucket.Id, null, _organization.Id); + for (var i = 1; i <= count; i++) + { + var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 0); - var readBucket = new Permission(Permission.ActionEnum.Read, resource); - var writeBucket = new Permission(Permission.ActionEnum.Write, resource); + var bucket = await Client.GetBucketsApi() + .CreateBucketAsync(GenerateName($"race{i}"), retention, organization); - var loggedUser = await Client.GetUsersApi().MeAsync(); - Assert.IsNotNull(loggedUser); + buckets.Add(bucket); + // + // Add Permissions to read and write to the Bucket + // + var resource = new PermissionResource( + PermissionResource.TypeBuckets, bucket.Id, null, organization.Id); - var authorization = await Client.GetAuthorizationsApi() - .CreateAuthorizationAsync(await FindMyOrg(), new List { readBucket, writeBucket }); + var readBucket = new Permission(Permission.ActionEnum.Read, resource); + var writeBucket = new Permission(Permission.ActionEnum.Write, resource); + + permissions.Add(readBucket); + permissions.Add(writeBucket); + } - _token = authorization.Token; + var authorization = await Client.GetAuthorizationsApi() + .CreateAuthorizationAsync(await FindMyOrg(), permissions); Client.Dispose(); - var options = new InfluxDBClientOptions.Builder().Url(InfluxDbUrl).AuthenticateToken(_token) - .Org(_organization.Id).Bucket(_bucket.Id).Build(); + var options = new InfluxDBClientOptions.Builder().Url(InfluxDbUrl).AuthenticateToken(authorization.Token) + .Org(organization.Id).Bucket(buckets[0].Id).Build(); Client = InfluxDBClientFactory.Create(options); - } - - private Bucket _bucket; - private Organization _organization; - private string _token; + return buckets; + } [Test] - public void Race() + public async Task Race() { + await CreateBuckets(); var point = PointData.Measurement("race-test") .Tag("test", "stress") .Field("value", 1); @@ -81,5 +89,133 @@ public void Race() gateEnd.Wait(); } } + + [Test] + public async Task BatchConsistency() + { + var options = WriteOptions.CreateNew().BatchSize(1_555).FlushInterval(10_000).Build(); + + var batches = new List(); + await StressfulWriteAndValidate(1, 5, options, (sender, eventArgs) => + { + if (eventArgs is WriteSuccessEvent successEvent) + { + batches.Add(successEvent); + } + }); + + foreach (var batch in batches) + { + var length = batch.LineProtocol.Split("\n").Length; + + Trace.WriteLine($"Count: {length} {batch.Bucket}"); + + // last element flush the rest + if (batches.Last() != batch) + { + Assert.AreEqual(1_555, length); + } + } + } + + [Test] + public async Task MultipleBuckets() + { + await StressfulWriteAndValidate(); + } + + [Test] + public async Task MultipleBucketsWithFlush() + { + var writeOptions = WriteOptions.CreateNew().FlushInterval(100).Build(); + + await StressfulWriteAndValidate(writeOptions: writeOptions); + } + + private async Task StressfulWriteAndValidate(int writerCount = 4, int secondsCount = 5, + WriteOptions writeOptions = null, EventHandler eventHandler = null) + { + var buckets = await CreateBuckets(writerCount); + + using var countdownEvent = new CountdownEvent(1); + using var writeApi = Client + .GetWriteApi(writeOptions ?? WriteOptions.CreateNew().FlushInterval(20_000).Build()); + writeApi.EventHandler += eventHandler; + + var writers = new List(); + for (var i = 1; i <= writerCount; i++) + { + var writer = new Writer(i, writeApi, countdownEvent, buckets[i - 1]); + writers.Add(writer); + var thread = new Thread(writer.Do); + thread.Start(); + } + + // wait + Thread.Sleep(secondsCount * 1_000); + + // stop + countdownEvent.Signal(); + + // wait to finish + Console.WriteLine("Wait to finish the writer..."); + writeApi.ReleaseAndClose(180_000); + Console.WriteLine("Finished"); + + // check successfully written + foreach (var writer in writers) await writer.Check(Client.GetQueryApi()); + } + } + + internal class Writer + { + private int Identifier { get; } + private IWriteApi WriteApi { get; } + private CountdownEvent CountdownEvent { get; } + private Bucket Bucket { get; } + private long _time; + + public Writer(int identifier, IWriteApi writeApi, CountdownEvent countdownEvent, Bucket bucket) + { + Identifier = identifier; + WriteApi = writeApi ?? throw new ArgumentNullException(nameof(writeApi)); + CountdownEvent = countdownEvent ?? throw new ArgumentNullException(nameof(countdownEvent)); + Bucket = bucket ?? throw new ArgumentNullException(nameof(bucket)); + } + + internal void Do() + { + while (!CountdownEvent.IsSet) + { + _time++; + + var point = PointData.Measurement($"writer-{Identifier}") + .Tag("test", "stress") + .Field("value", _time) + .Timestamp(_time, WritePrecision.Ns); + + WriteApi.WritePoint(point, Bucket.Id); + + if (Identifier == 1 && _time % 100_000 == 0) + { + Console.WriteLine($"Generated point: {point.ToLineProtocol()}, bucket: {Bucket.Name}"); + } + } + + if (Identifier == 1) + { + Console.WriteLine($"Generated points: {_time}"); + } + } + + public async Task Check(QueryApi queryApi) + { + var query = $"from(bucket: \"{Bucket.Name}\") |> range(start: 0) |> count()"; + var value = (long)(await queryApi.QueryAsync(query))[0].Records[0].GetValue(); + + Console.WriteLine($"Written count [{Identifier}]: {value}"); + + Assert.GreaterOrEqual(_time, value); + } } } \ No newline at end of file diff --git a/Client.Test/WriteApiTest.cs b/Client.Test/WriteApiTest.cs index dcae19fed..cd9014943 100644 --- a/Client.Test/WriteApiTest.cs +++ b/Client.Test/WriteApiTest.cs @@ -502,6 +502,73 @@ public void RequiredOrgBucketWriteApi() "Expecting a non-empty string for 'bucket' parameter. Please specify the bucket as a method parameter or use default configuration at 'InfluxDBClientOptions.Bucket'.", ae.Message); } + + [Test] + public void WritesToDifferentBuckets() + { + var listener = new EventListener(_writeApi); + + MockServer + .Given(Request.Create().WithPath("/api/v2/write").UsingPost()) + .RespondWith(CreateResponse("{}")); + + var entryA = PointData.Measurement("myData") + .Tag("id", 54836.ToString()) + .Field("valueA", 12) + .Timestamp(DateTime.UtcNow.AddSeconds(-10), WritePrecision.Ns); + + _writeApi.WritePoint(entryA, "my-bucket-1", "my-org"); + + var entryB = PointData.Measurement("myData") + .Field("valueB", 42) + .Timestamp(DateTime.UtcNow, WritePrecision.Ns); + + _writeApi.WritePoint(entryB, "my-bucket-2", "my-org"); + + listener.Get(); + listener.Get(); + + Assert.AreEqual(2, MockServer.LogEntries.Count()); + Assert.AreEqual("my-bucket-1", + MockServer.LogEntries.ToArray()[0].RequestMessage.Query["bucket"].ToString()); + Assert.AreEqual("my-bucket-2", + MockServer.LogEntries.ToArray()[1].RequestMessage.Query["bucket"].ToString()); + } + + [Test] + public void WritesToDifferentBucketsJitter() + { + _writeApi.Dispose(); + _writeApi = _influxDbClient.GetWriteApi(WriteOptions.CreateNew().JitterInterval(1_000).Build()); + + var listener = new EventListener(_writeApi); + + MockServer + .Given(Request.Create().WithPath("/api/v2/write").UsingPost()) + .RespondWith(CreateResponse("{}")); + + var entryA = PointData.Measurement("myData") + .Tag("id", 54836.ToString()) + .Field("valueA", 12) + .Timestamp(DateTime.UtcNow.AddSeconds(-10), WritePrecision.Ns); + + _writeApi.WritePoint(entryA, "my-bucket-1", "my-org"); + + var entryB = PointData.Measurement("myData") + .Field("valueB", 42) + .Timestamp(DateTime.UtcNow, WritePrecision.Ns); + + _writeApi.WritePoint(entryB, "my-bucket-2", "my-org"); + + listener.Get(); + listener.Get(); + + Assert.AreEqual(2, MockServer.LogEntries.Count()); + Assert.AreEqual("my-bucket-1", + MockServer.LogEntries.ToArray()[0].RequestMessage.Query["bucket"].ToString()); + Assert.AreEqual("my-bucket-2", + MockServer.LogEntries.ToArray()[1].RequestMessage.Query["bucket"].ToString()); + } } [Measurement("m")] diff --git a/Client/Internal/RetryAttempt.cs b/Client/Internal/RetryAttempt.cs index 1cd68a080..d274ee9da 100644 --- a/Client/Internal/RetryAttempt.cs +++ b/Client/Internal/RetryAttempt.cs @@ -114,7 +114,7 @@ internal long GetRetryInterval() // from header if (Error is HttpException httpException && httpException.RetryAfter.HasValue) { - return httpException.RetryAfter.Value * 1000 + JitterDelay(_writeOptions); + return httpException.RetryAfter.Value * 1000; } // from configuration diff --git a/Client/WriteApi.cs b/Client/WriteApi.cs index 8fc14323b..944e2cf81 100644 --- a/Client/WriteApi.cs +++ b/Client/WriteApi.cs @@ -188,7 +188,7 @@ protected internal WriteApi( // // Create Write Point = bucket, org, ... + data // - .Select(grouped => + .SelectMany(grouped => { var aggregate = grouped .Aggregate(StringBuilderPool.Get(), (builder, batchWrite) => @@ -218,22 +218,7 @@ protected internal WriteApi( .Where(batchWriteItem => !string.IsNullOrEmpty(batchWriteItem.ToLineProtocol())); }); - if (writeOptions.JitterInterval > 0) - { - batches = batches - // - // Jitter - // - .Select(source => - { - return source.Delay(_ => - Observable.Timer(TimeSpan.FromMilliseconds(RetryAttempt.JitterDelay(writeOptions)), - writeOptions.WriteScheduler)); - }); - } - var unused = batches - .Concat() // // Map to Async request // @@ -246,10 +231,22 @@ protected internal WriteApi( return Observable .Defer(() => - service.PostWriteAsyncWithIRestResponse(org, bucket, + { + var observable = service.PostWriteAsyncWithIRestResponse(org, bucket, Encoding.UTF8.GetBytes(lineProtocol), null, "identity", "text/plain; charset=utf-8", null, "application/json", null, precision) - .ToObservable()) + .ToObservable(); + + if (writeOptions.JitterInterval > 0) + { + observable = observable + .Delay(_ => Observable.Timer( + TimeSpan.FromMilliseconds(RetryAttempt.JitterDelay(writeOptions)), + writeOptions.WriteScheduler)); + } + + return observable; + }) .RetryWhen(f => f .Zip(Observable.Range(1, writeOptions.MaxRetries + 1), (exception, count) => new RetryAttempt(exception, count, writeOptions)) @@ -328,6 +325,15 @@ protected internal WriteApi( } public void Dispose() + { + ReleaseAndClose(); + } + + /// + /// Release all resources and flush remaining data into database. + /// + /// How much milliseconds wait to flush data. + internal void ReleaseAndClose(int millis = 30000) { _unsubscribeDisposeCommand.Dispose(); // avoid duplicate call to dispose @@ -346,7 +352,7 @@ public void Dispose() _subject.Dispose(); _flush.Dispose(); - WaitToCondition(() => _disposed, 30000); + WaitToCondition(() => _disposed, millis); } public bool Disposed => _disposed; diff --git a/Client/WriteOptions.cs b/Client/WriteOptions.cs index 5e3b98af2..4b88e1ba5 100644 --- a/Client/WriteOptions.cs +++ b/Client/WriteOptions.cs @@ -113,7 +113,7 @@ public sealed class Builder internal int MaxRetriesBuilder = DefaultMaxRetries; internal int MaxRetryDelayBuilder = DefaultMaxRetryDelay; internal int ExponentialBaseBuilder = DefaultExponentialBase; - internal IScheduler WriteSchedulerBuilder = NewThreadScheduler.Default; + internal IScheduler WriteSchedulerBuilder = ThreadPoolScheduler.Instance; /// /// Set the number of data point to collect in batch.