Skip to content

Commit

Permalink
Release/0.7.2 (#55)
Browse files Browse the repository at this point in the history
Run code cleanup to fix usings and bump to 0.7.2
  • Loading branch information
peter-quix committed May 24, 2024
1 parent 1e318a6 commit 9f2dc31
Show file tree
Hide file tree
Showing 90 changed files with 326 additions and 363 deletions.
4 changes: 2 additions & 2 deletions builds/csharp/nuget/build_nugets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import List

version = "0.7.2.0"
informal_version = "0.7.2.0-dev2"
nuget_version = "0.7.2.0-dev2"
informal_version = "0.7.2.0"
nuget_version = "0.7.2.0"


def updatecsproj(projfilepath):
Expand Down
4 changes: 2 additions & 2 deletions src/Quix.TestBase/Extensions/TestOutputHelperExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public TestLogger(ITestOutputHelper helper)
if (exception.StackTrace != null) this.helper.WriteLine(exception.StackTrace);
}
}
catch (System.InvalidOperationException)
catch (InvalidOperationException)
{
// I think this might happen with async tasks, but stop failing agent tests for this
}
Expand Down Expand Up @@ -95,7 +95,7 @@ public TestLogger(ITestOutputHelper helper, string categoryName)
if (exception.StackTrace != null) this.helper.WriteLine(exception.StackTrace);
}
}
catch (System.InvalidOperationException)
catch (InvalidOperationException)
{
// I think this might happen with async tasks, but stop failing agent tests for this
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Confluent.Kafka;
using Ductus.FluentDocker.Builders;
using Ductus.FluentDocker.Extensions;
Expand Down Expand Up @@ -28,11 +29,11 @@ public KafkaDockerTestFixtureBase()
{
var builder = new Builder().UseContainer();

if (System.Runtime.InteropServices.RuntimeInformation.OSArchitecture == System.Runtime.InteropServices.Architecture.Arm64)
if (RuntimeInformation.OSArchitecture == Architecture.Arm64)
{
builder = builder.UseImage("dougdonohoe/fast-data-dev:latest");
}
else if (System.Runtime.InteropServices.RuntimeInformation.OSArchitecture == System.Runtime.InteropServices.Architecture.X64)
else if (RuntimeInformation.OSArchitecture == Architecture.X64)
{
builder = builder.UseImage("lensesio/fast-data-dev:3.3.1");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public KafkaStreamingClientIntegrationTests(ITestOutputHelper output, KafkaDocke
{
this.output = output;
this.kafkaDockerTestFixture = kafkaDockerTestFixture;
QuixStreams.Logging.Factory = output.CreateLoggerFactory();
Logging.Factory = output.CreateLoggerFactory();
output.WriteLine($"Created client with brokerlist '{kafkaDockerTestFixture.BrokerList}'");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using QuixStreams.Kafka.Transport.SerDes;
using Timer = System.Timers.Timer;

namespace QuixStreams.Kafka.Transport.Samples.Samples
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using QuixStreams.Kafka.Transport.SerDes;
using QuixStreams.Kafka.Transport.SerDes.Codecs;
using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs;
using Timer = System.Timers.Timer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using QuixStreams.Kafka.Transport.SerDes;
using QuixStreams.Kafka.Transport.SerDes.Codecs;
using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class CommitModifierShould

public CommitModifierShould(ITestOutputHelper helper)
{
QuixStreams.Logging.Factory = helper.CreateLoggerFactory();
Logging.Factory = helper.CreateLoggerFactory();
this.helper = helper;
}

Expand Down
5 changes: 3 additions & 2 deletions src/QuixStreams.Kafka.Transport.Tests/Helpers/TestModel.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Text;
using QuixStreams.Kafka.Transport.SerDes.Codecs;
using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs;

namespace QuixStreams.Kafka.Transport.Tests.Helpers
Expand All @@ -9,8 +10,8 @@ public class TestModel : IEquatable<TestModel>
{
static TestModel()
{
QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RegisterCodec(typeof(TestModel), new DefaultJsonCodec<TestModel>());
QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RegisterCodec(typeof(TestModel[]), new DefaultJsonCodec<TestModel[]>());
CodecRegistry.RegisterCodec(typeof(TestModel), new DefaultJsonCodec<TestModel>());
CodecRegistry.RegisterCodec(typeof(TestModel[]), new DefaultJsonCodec<TestModel[]>());
}

public string StringProp { get; set; }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using FluentAssertions;
using NSubstitute;
using QuixStreams.Kafka.Transport.SerDes;
using Xunit;

Expand Down Expand Up @@ -50,7 +47,7 @@ public async Task Modify_SplitPackageInterweavedWithOtherPackages_ShouldRaiseInE
// then the outcome is [Message1_merged] [Message2_merged] [Message3]

// Arrange
QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode;
PackageSerializationSettings.Mode = mode;
var merger = new KafkaMessageMerger(new KafkaMessageBuffer());

var message1Segments = this.CreateSplitMessage(2, out var message1);
Expand Down Expand Up @@ -98,7 +95,7 @@ public async Task Modify_SplitPackageInterweavedWithOtherAndSplitPackageExpires_
// then the outcome is [Message2], [Message3], as segment 2 should be discarded

// Arrange
QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode;
PackageSerializationSettings.Mode = mode;
var buffer = new KafkaMessageBuffer();
var merger = new KafkaMessageMerger(buffer);
var message1Segments = this.CreateSplitMessage(2, out var message1);
Expand Down Expand Up @@ -141,7 +138,7 @@ public async Task Modify_SplitPackageInterweavedWithOtherAndSplitPackageExpires_
public void Modify_MergeReturnsNull_ShouldNotRaisePackageAndReturnCompletedTask(PackageSerializationMode mode)
{
// Arrange
QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode;
PackageSerializationSettings.Mode = mode;
var buffer = new KafkaMessageBuffer();
var merger = new KafkaMessageMerger(buffer);

Expand Down Expand Up @@ -171,7 +168,7 @@ public async Task Modify_SplitPackageInterweavedWithOtherAndSplitPackageRevoked_
// disappear then should raise [Message2] and [Message3]

// Arrange
QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode;
PackageSerializationSettings.Mode = mode;
var buffer = new KafkaMessageBuffer();
var merger = new KafkaMessageMerger(buffer);
var message1Segments = this.CreateSplitMessage(3, out var message1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using FluentAssertions;
using Newtonsoft.Json;
using QuixStreams.Kafka.Transport.SerDes;
using QuixStreams.Kafka.Transport.SerDes.Legacy;
using Xunit;
Expand All @@ -16,7 +16,7 @@ public class KafkaMessageSplitterShould
public void Split_WithDataOutsideAbsoluteMaxSize_ShouldThrowSerializationException()
{
// Arrange
QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = PackageSerializationMode.LegacyValue;
PackageSerializationSettings.Mode = PackageSerializationMode.LegacyValue;
const int maxMsgLength = 50;
var splitter = new KafkaMessageSplitter(maxMsgLength);
var length = maxMsgLength * byte.MaxValue + 10;
Expand All @@ -37,7 +37,7 @@ public void Split_WithDataOutsideAbsoluteMaxSize_ShouldThrowSerializationExcepti
public void SplitLegacy_WithDataOutsideAllowedMessageSizeButWithinAbsoluteMaxSize_ShouldReturnSplitBytes()
{
// Arrange
QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = PackageSerializationMode.LegacyValue;
PackageSerializationSettings.Mode = PackageSerializationMode.LegacyValue;
const int maxMsgLength = 50;
var splitter = new KafkaMessageSplitter(maxMsgLength);
var length = 10199; // just a bit less than max (10200), but greater because of split info.
Expand Down Expand Up @@ -88,7 +88,7 @@ public void SplitLegacy_WithDataOutsideAllowedMessageSizeButWithinAbsoluteMaxSiz
public void Split_GreaterThanSupportedByLegacy_ShouldReturnSplitBytes()
{
// Arrange
QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = PackageSerializationMode.Header;
PackageSerializationSettings.Mode = PackageSerializationMode.Header;
int maxMsgLength = KafkaMessageSplitter.ExpectedHeaderSplitInfoSize + 50;
var splitter = new KafkaMessageSplitter(maxMsgLength);
var length = 40000; // legacy would die around 10200 in this config
Expand Down Expand Up @@ -142,7 +142,7 @@ public void Split_GreaterThanSupportedByLegacy_ShouldReturnSplitBytes()
public void Split_WithDataWithinAllowedMessageSize_ShouldReturnSameBytes(PackageSerializationMode mode)
{
// Arrange
QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode;
PackageSerializationSettings.Mode = mode;
var splitter = new KafkaMessageSplitter(50);
var data = new byte[50];
var random = new Random();
Expand All @@ -163,7 +163,7 @@ public void Split_WithDataWithinAllowedMessageSize_ShouldReturnSameBytes(Package
public void Split_WithDataWithinAllowedMessageSizeUsingHeaderAndKey_ShouldReturnSameBytes(PackageSerializationMode mode)
{
// Arrange
QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode;
PackageSerializationSettings.Mode = mode;
var splitter = new KafkaMessageSplitter(50);
var data = new byte[5];
var random = new Random();
Expand Down Expand Up @@ -194,7 +194,7 @@ public void Split_UsingHeaderProtocolWithExcessiveValueSize_ShouldUseCompression
data[ii] = $"Some_value_that_should_get_compressed_{ii}";
}

var dataBytes = Encoding.UTF8.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(data));
var dataBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));

var key = Encoding.UTF8.GetBytes("My super key");
var testCodecId = "Stuff";
Expand Down Expand Up @@ -232,7 +232,7 @@ public void Split_UsingHeaderProtocolNoCodecIdWithExcessiveValueSize_ShouldUseCo
data[ii] = $"Some_value_that_should_get_compressed_{ii}";
}

var dataBytes = Encoding.UTF8.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(data));
var dataBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));

var key = Encoding.UTF8.GetBytes("My super key");
var message = new KafkaMessage(key, dataBytes);
Expand Down
2 changes: 1 addition & 1 deletion src/QuixStreams.Kafka.Transport/AutoCommitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace QuixStreams.Kafka.Transport
internal sealed class AutoCommitter
{
private readonly Action<ICollection<TopicPartitionOffset>, Action> wrappingCommitCallback;
private readonly ILogger logger = QuixStreams.Logging.CreateLogger<AutoCommitter>();
private readonly ILogger logger = Logging.CreateLogger<AutoCommitter>();
private readonly Func<TransportPackage, CancellationToken, Task> onPublish = null;
private Action closeAction = null;
private bool closed = false;
Expand Down
2 changes: 1 addition & 1 deletion src/QuixStreams.Kafka.Transport/SerDes/Codecs/Codec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public abstract class Codec<TContent> : ICodec<TContent>
/// </summary>
protected Codec()
{
logger = QuixStreams.Logging.CreateLogger(this.GetType());
logger = Logging.CreateLogger(this.GetType());
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.IO;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;

namespace QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs
{
Expand All @@ -21,7 +22,7 @@ public class DefaultJsonCodec<TContent> : Codec<TContent>
public DefaultJsonCodec()
{
serializer = JsonSerializer.Create();
serializer.Converters.Add(new Newtonsoft.Json.Converters.StringEnumConverter());
serializer.Converters.Add(new StringEnumConverter());
serializer.DefaultValueHandling = DefaultValueHandling.Ignore;
}

Expand Down Expand Up @@ -74,7 +75,7 @@ public class DefaultJsonCodec : ICodec
private DefaultJsonCodec()
{
serializer = JsonSerializer.Create();
serializer.Converters.Add(new Newtonsoft.Json.Converters.StringEnumConverter());
serializer.Converters.Add(new StringEnumConverter());
serializer.DefaultValueHandling = DefaultValueHandling.Ignore;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public KafkaMessageBuffer(TimeSpan timeToLive, int bufferPerMessageGroupKey = 50
if (bufferPerMessageGroupKey < 1) throw new ArgumentOutOfRangeException(nameof(bufferPerMessageGroupKey), "Value must be at least 1");
this.timeToLive = timeToLive;
this.bufferPerMessageGroupKey = bufferPerMessageGroupKey;
this.logger = QuixStreams.Logging.CreateLogger(typeof(KafkaMessageBuffer));
this.logger = Logging.CreateLogger(typeof(KafkaMessageBuffer));
}

/// <summary>
Expand Down
3 changes: 1 addition & 2 deletions src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMerger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using QuixStreams.Kafka.Transport.SerDes.Legacy;

namespace QuixStreams.Kafka.Transport.SerDes
{
Expand All @@ -18,7 +17,7 @@ namespace QuixStreams.Kafka.Transport.SerDes
/// </summary>
public class KafkaMessageMerger
{
private readonly ILogger logger = QuixStreams.Logging.CreateLogger<KafkaMessageMerger>();
private readonly ILogger logger = Logging.CreateLogger<KafkaMessageMerger>();

private long bufferCounter = 0;
private readonly ConcurrentDictionary<MergerBufferId, PendingKafkaMessage> pendingMessages = new ConcurrentDictionary<MergerBufferId, PendingKafkaMessage>(); // Messages that are queued up
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using QuixStreams.Kafka.Transport.SerDes.Legacy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class KafkaMessageSplitter : IKafkaMessageSplitter
/// </summary>
private static int MovingWarnAboveSize = 0;

private static readonly ILogger logger = QuixStreams.Logging.CreateLogger(typeof(KafkaMessageSplitter));
private static readonly ILogger logger = Logging.CreateLogger(typeof(KafkaMessageSplitter));

/// <summary>
/// The expected size of the the details to describe the split info
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.IO;
using System.IO;
using System.Runtime.Serialization;
using QuixStreams.Kafka.Transport.SerDes.Codecs;
using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Linq;
using System.Linq;
using System.Runtime.Serialization;
using QuixStreams.Kafka.Transport.SerDes.Codecs;
using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs;
Expand Down
3 changes: 1 addition & 2 deletions src/QuixStreams.Kafka.Transport/SerDes/PackageSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Collections.Generic;
using System.Linq;
using System.Linq;
using System.Runtime.Serialization;
using QuixStreams.Kafka.Transport.SerDes.Codecs;
using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs;
Expand Down
2 changes: 1 addition & 1 deletion src/QuixStreams.Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace QuixStreams.Kafka
/// </summary>
public class KafkaConsumer : IKafkaConsumer
{
private readonly ILogger logger = QuixStreams.Logging.CreateLogger<KafkaConsumer>();
private readonly ILogger logger = Logging.CreateLogger<KafkaConsumer>();
private bool disableKafkaLogsByConnectWorkaround = false; // if enabled, no actual kafka logs should be shown
private readonly ConsumerConfig config;

Expand Down
2 changes: 1 addition & 1 deletion src/QuixStreams.Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class KafkaProducer : IKafkaProducer
private readonly object sendLock = new object();

private readonly object openLock = new object();
private readonly ILogger logger = QuixStreams.Logging.CreateLogger<KafkaProducer>();
private readonly ILogger logger = Logging.CreateLogger<KafkaProducer>();
private IDictionary<string, string> brokerStates = new Dictionary<string, string>();
private bool checkBrokerStateBeforeSend = false;
private bool logOnNextBrokerStateUp = false;
Expand Down
3 changes: 2 additions & 1 deletion src/QuixStreams.ManyStreamTest/StreamingTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using Microsoft.Extensions.Logging;
using QuixStreams.Streaming;
using QuixStreams.Streaming.Models;

namespace QuixStreams.ManyStreamTest
{
Expand Down Expand Up @@ -40,7 +41,7 @@ public void Run(CancellationToken ct)
while (!ct.IsCancellationRequested)
{
var stream = topicProducer.CreateStream();
var data = new QuixStreams.Streaming.Models.TimeseriesData();
var data = new TimeseriesData();
data.AddTimestampNanoseconds(10).AddValue("test", DateTime.UtcNow.ToBinary());
stream.Timeseries.Buffer.Publish(data);
stream.Events.AddTimestampNanoseconds(10).AddValue("test1", "val1");
Expand Down
1 change: 0 additions & 1 deletion src/QuixStreams.PerformanceTest/PerformanceTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Threading;
using QuixStreams.Streaming.UnitTests;
using QuixStreams.Streaming.UnitTests.Helpers;
using QuixStreams.Telemetry.Models;

Expand Down
1 change: 0 additions & 1 deletion src/QuixStreams.PerformanceTest/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Threading;
using QuixStreams.Kafka.Transport.SerDes;
using QuixStreams.Kafka.Transport.SerDes.Legacy.MessageValue;
using QuixStreams.Streaming.Utils;
using QuixStreams.Telemetry.Models;

Expand Down
Loading

0 comments on commit 9f2dc31

Please sign in to comment.