Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
### Features
1. [#327](https://github.com/influxdata/influxdb-client-csharp/pull/327): Add interfaces to client's APIs

### Bug Fixes
1. [#329](https://github.com/influxdata/influxdb-client-csharp/pull/329): Writing data to multiple destination by the background writer

### Dependencies
1. [#326](https://github.com/influxdata/influxdb-client-csharp/pull/326): Update dependencies:

Expand Down
1 change: 1 addition & 0 deletions Client.Core.Test/AbstractTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public void SetUp()

if (!Trace.Listeners.Contains(ConsoleOutListener))
{
Console.SetOut(TestContext.Progress);
Trace.Listeners.Add(ConsoleOutListener);
}
}
Expand Down
188 changes: 162 additions & 26 deletions Client.Test/ItWriteApiRaceTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using InfluxDB.Client.Api.Domain;
Expand All @@ -10,47 +13,52 @@ namespace InfluxDB.Client.Test
[TestFixture]
public class ItWriteApiRaceTest : AbstractItClientTest
{
[SetUp]
public new async Task SetUp()
private async Task<List<Bucket>> CreateBuckets(int count = 1)
{
_organization = await FindMyOrg();
var organization = await FindMyOrg();

var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 3600);
var loggedUser = await Client.GetUsersApi().MeAsync();
Assert.IsNotNull(loggedUser);

_bucket = await Client.GetBucketsApi()
.CreateBucketAsync(GenerateName("h2o"), retention, _organization);
var buckets = new List<Bucket>();
var permissions = new List<Permission>();

//
// Add Permissions to read and write to the Bucket
//
var resource =
new PermissionResource(PermissionResource.TypeBuckets, _bucket.Id, null, _organization.Id);
for (var i = 1; i <= count; i++)
{
var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 0);

var readBucket = new Permission(Permission.ActionEnum.Read, resource);
var writeBucket = new Permission(Permission.ActionEnum.Write, resource);
var bucket = await Client.GetBucketsApi()
.CreateBucketAsync(GenerateName($"race{i}"), retention, organization);

var loggedUser = await Client.GetUsersApi().MeAsync();
Assert.IsNotNull(loggedUser);
buckets.Add(bucket);
//
// Add Permissions to read and write to the Bucket
//
var resource = new PermissionResource(
PermissionResource.TypeBuckets, bucket.Id, null, organization.Id);

var authorization = await Client.GetAuthorizationsApi()
.CreateAuthorizationAsync(await FindMyOrg(), new List<Permission> { readBucket, writeBucket });
var readBucket = new Permission(Permission.ActionEnum.Read, resource);
var writeBucket = new Permission(Permission.ActionEnum.Write, resource);

permissions.Add(readBucket);
permissions.Add(writeBucket);
}

_token = authorization.Token;
var authorization = await Client.GetAuthorizationsApi()
.CreateAuthorizationAsync(await FindMyOrg(), permissions);

Client.Dispose();
var options = new InfluxDBClientOptions.Builder().Url(InfluxDbUrl).AuthenticateToken(_token)
.Org(_organization.Id).Bucket(_bucket.Id).Build();
var options = new InfluxDBClientOptions.Builder().Url(InfluxDbUrl).AuthenticateToken(authorization.Token)
.Org(organization.Id).Bucket(buckets[0].Id).Build();
Client = InfluxDBClientFactory.Create(options);
}

private Bucket _bucket;
private Organization _organization;
private string _token;

return buckets;
}

[Test]
public void Race()
public async Task Race()
{
await CreateBuckets();
var point = PointData.Measurement("race-test")
.Tag("test", "stress")
.Field("value", 1);
Expand Down Expand Up @@ -81,5 +89,133 @@ public void Race()
gateEnd.Wait();
}
}

[Test]
public async Task BatchConsistency()
{
var options = WriteOptions.CreateNew().BatchSize(1_555).FlushInterval(10_000).Build();

var batches = new List<WriteSuccessEvent>();
await StressfulWriteAndValidate(1, 5, options, (sender, eventArgs) =>
{
if (eventArgs is WriteSuccessEvent successEvent)
{
batches.Add(successEvent);
}
});

foreach (var batch in batches)
{
var length = batch.LineProtocol.Split("\n").Length;

Trace.WriteLine($"Count: {length} {batch.Bucket}");

// last element flush the rest
if (batches.Last() != batch)
{
Assert.AreEqual(1_555, length);
}
}
}

[Test]
public async Task MultipleBuckets()
{
await StressfulWriteAndValidate();
}

[Test]
public async Task MultipleBucketsWithFlush()
{
var writeOptions = WriteOptions.CreateNew().FlushInterval(100).Build();

await StressfulWriteAndValidate(writeOptions: writeOptions);
}

private async Task StressfulWriteAndValidate(int writerCount = 4, int secondsCount = 5,
WriteOptions writeOptions = null, EventHandler eventHandler = null)
{
var buckets = await CreateBuckets(writerCount);

using var countdownEvent = new CountdownEvent(1);
using var writeApi = Client
.GetWriteApi(writeOptions ?? WriteOptions.CreateNew().FlushInterval(20_000).Build());
writeApi.EventHandler += eventHandler;

var writers = new List<Writer>();
for (var i = 1; i <= writerCount; i++)
{
var writer = new Writer(i, writeApi, countdownEvent, buckets[i - 1]);
writers.Add(writer);
var thread = new Thread(writer.Do);
thread.Start();
}

// wait
Thread.Sleep(secondsCount * 1_000);

// stop
countdownEvent.Signal();

// wait to finish
Console.WriteLine("Wait to finish the writer...");
writeApi.ReleaseAndClose(180_000);
Console.WriteLine("Finished");

// check successfully written
foreach (var writer in writers) await writer.Check(Client.GetQueryApi());
}
}

internal class Writer
{
private int Identifier { get; }
private IWriteApi WriteApi { get; }
private CountdownEvent CountdownEvent { get; }
private Bucket Bucket { get; }
private long _time;

public Writer(int identifier, IWriteApi writeApi, CountdownEvent countdownEvent, Bucket bucket)
{
Identifier = identifier;
WriteApi = writeApi ?? throw new ArgumentNullException(nameof(writeApi));
CountdownEvent = countdownEvent ?? throw new ArgumentNullException(nameof(countdownEvent));
Bucket = bucket ?? throw new ArgumentNullException(nameof(bucket));
}

internal void Do()
{
while (!CountdownEvent.IsSet)
{
_time++;

var point = PointData.Measurement($"writer-{Identifier}")
.Tag("test", "stress")
.Field("value", _time)
.Timestamp(_time, WritePrecision.Ns);

WriteApi.WritePoint(point, Bucket.Id);

if (Identifier == 1 && _time % 100_000 == 0)
{
Console.WriteLine($"Generated point: {point.ToLineProtocol()}, bucket: {Bucket.Name}");
}
}

if (Identifier == 1)
{
Console.WriteLine($"Generated points: {_time}");
}
}

public async Task Check(QueryApi queryApi)
{
var query = $"from(bucket: \"{Bucket.Name}\") |> range(start: 0) |> count()";
var value = (long)(await queryApi.QueryAsync(query))[0].Records[0].GetValue();

Console.WriteLine($"Written count [{Identifier}]: {value}");

Assert.GreaterOrEqual(_time, value);
}
}
}
67 changes: 67 additions & 0 deletions Client.Test/WriteApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,73 @@ public void RequiredOrgBucketWriteApi()
"Expecting a non-empty string for 'bucket' parameter. Please specify the bucket as a method parameter or use default configuration at 'InfluxDBClientOptions.Bucket'.",
ae.Message);
}

[Test]
public void WritesToDifferentBuckets()
{
var listener = new EventListener(_writeApi);

MockServer
.Given(Request.Create().WithPath("/api/v2/write").UsingPost())
.RespondWith(CreateResponse("{}"));

var entryA = PointData.Measurement("myData")
.Tag("id", 54836.ToString())
.Field("valueA", 12)
.Timestamp(DateTime.UtcNow.AddSeconds(-10), WritePrecision.Ns);

_writeApi.WritePoint(entryA, "my-bucket-1", "my-org");

var entryB = PointData.Measurement("myData")
.Field("valueB", 42)
.Timestamp(DateTime.UtcNow, WritePrecision.Ns);

_writeApi.WritePoint(entryB, "my-bucket-2", "my-org");

listener.Get<WriteSuccessEvent>();
listener.Get<WriteSuccessEvent>();

Assert.AreEqual(2, MockServer.LogEntries.Count());
Assert.AreEqual("my-bucket-1",
MockServer.LogEntries.ToArray()[0].RequestMessage.Query["bucket"].ToString());
Assert.AreEqual("my-bucket-2",
MockServer.LogEntries.ToArray()[1].RequestMessage.Query["bucket"].ToString());
}

[Test]
public void WritesToDifferentBucketsJitter()
{
_writeApi.Dispose();
_writeApi = _influxDbClient.GetWriteApi(WriteOptions.CreateNew().JitterInterval(1_000).Build());

var listener = new EventListener(_writeApi);

MockServer
.Given(Request.Create().WithPath("/api/v2/write").UsingPost())
.RespondWith(CreateResponse("{}"));

var entryA = PointData.Measurement("myData")
.Tag("id", 54836.ToString())
.Field("valueA", 12)
.Timestamp(DateTime.UtcNow.AddSeconds(-10), WritePrecision.Ns);

_writeApi.WritePoint(entryA, "my-bucket-1", "my-org");

var entryB = PointData.Measurement("myData")
.Field("valueB", 42)
.Timestamp(DateTime.UtcNow, WritePrecision.Ns);

_writeApi.WritePoint(entryB, "my-bucket-2", "my-org");

listener.Get<WriteSuccessEvent>();
listener.Get<WriteSuccessEvent>();

Assert.AreEqual(2, MockServer.LogEntries.Count());
Assert.AreEqual("my-bucket-1",
MockServer.LogEntries.ToArray()[0].RequestMessage.Query["bucket"].ToString());
Assert.AreEqual("my-bucket-2",
MockServer.LogEntries.ToArray()[1].RequestMessage.Query["bucket"].ToString());
}
}

[Measurement("m")]
Expand Down
2 changes: 1 addition & 1 deletion Client/Internal/RetryAttempt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ internal long GetRetryInterval()
// from header
if (Error is HttpException httpException && httpException.RetryAfter.HasValue)
{
return httpException.RetryAfter.Value * 1000 + JitterDelay(_writeOptions);
return httpException.RetryAfter.Value * 1000;
}

// from configuration
Expand Down
Loading