Skip to content

Commit

Permalink
[Host.Serialization] Introduce multi-targeting for net8, net6 and net…
Browse files Browse the repository at this point in the history
…standard #211

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Feb 4, 2024
1 parent 5825393 commit 10f8e20
Show file tree
Hide file tree
Showing 25 changed files with 224 additions and 98 deletions.
1 change: 1 addition & 0 deletions src/Common.NuGet.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<Import Project="Common.Properties.xml" />

<PropertyGroup>
<TargetFrameworks>net8.0;net6.0;netstandard2.0</TargetFrameworks>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageReadmeFile>NuGet.md</PackageReadmeFile>
<PackageIcon>icon.png</PackageIcon>
Expand Down
2 changes: 1 addition & 1 deletion src/Common.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Product>SlimMessageBus</Product>
<Company>zarusz</Company>
<Authors>zarusz</Authors>
<Copyright>Copyright © 2023</Copyright>
<Copyright>Copyright © 2024</Copyright>
<PackageReleaseNotes>See https://github.com/zarusz/SlimMessageBus/releases</PackageReleaseNotes>
<RepositoryUrl>https://github.com/zarusz/SlimMessageBus</RepositoryUrl>
<PackageProjectUrl>https://github.com/zarusz/SlimMessageBus</PackageProjectUrl>
Expand Down
4 changes: 2 additions & 2 deletions src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFrameworks>netstandard2.1;net6.0;net8.0</TargetFrameworks>
<Version>2.2.1</Version>
<TargetFrameworks>netstandard2.0;net6.0;net8.0</TargetFrameworks>
<Version>2.2.2</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.AzureEventHub;

using System.Diagnostics.CodeAnalysis;

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;

Expand Down Expand Up @@ -85,7 +83,7 @@ private async Task Checkpoint(ProcessEventArgs args)
public Task TryCheckpoint()
=> Checkpoint(_lastMessage);

protected static IReadOnlyDictionary<string, object> GetHeadersFromTransportMessage([NotNull] EventData e)
protected static IReadOnlyDictionary<string, object> GetHeadersFromTransportMessage(EventData e)
// Note: Try to see if the Properties are already IReadOnlyDictionary or Dictionary prior allocating a new collection
=> e.Properties as IReadOnlyDictionary<string, object> ?? new Dictionary<string, object>(e.Properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0;net8.0</TargetFrameworks>
<Version>2.0.6</Version>
<Description>Core configuration interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
<RootNamespace>SlimMessageBus.Host</RootNamespace>
<Version>2.0.5</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFramework>netstandard1.3</TargetFramework>
<Version>2.0.1</Version>
<Version>2.0.2</Version>
<Description>Core interceptor interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;
using ConsumeResult = Confluent.Kafka.ConsumeResult<Confluent.Kafka.Ignore, byte[]>;

/// <summary>
Expand All @@ -10,9 +9,9 @@ public interface IKafkaPartitionConsumer : IDisposable
{
TopicPartition TopicPartition { get; }

void OnPartitionAssigned([NotNull] TopicPartition partition);
Task OnMessage([NotNull] ConsumeResult message);
void OnPartitionEndReached([NotNull] TopicPartitionOffset offset);
void OnPartitionAssigned(TopicPartition partition);
Task OnMessage(ConsumeResult message);
void OnPartitionEndReached(TopicPartitionOffset offset);
void OnPartitionRevoked();

void OnClose();
Expand Down
5 changes: 1 addition & 4 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

public static class KafkaExtensions
{
public static TopicPartitionOffset AddOffset([NotNull] this TopicPartitionOffset topicPartitionOffset, int addOffset)
public static TopicPartitionOffset AddOffset(this TopicPartitionOffset topicPartitionOffset, int addOffset)
=> new(topicPartitionOffset.TopicPartition, topicPartitionOffset.Offset + addOffset);

public static IReadOnlyDictionary<string, object> ToHeaders(this ConsumeResult<Ignore, byte[]> consumeResult, IMessageSerializer headerSerializer)
Expand Down
12 changes: 5 additions & 7 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using ConsumeResult = ConsumeResult<Ignore, byte[]>;
using IConsumer = IConsumer<Ignore, byte[]>;

Expand Down Expand Up @@ -175,7 +173,7 @@ protected override async Task OnStop()
}
}

protected virtual void OnPartitionAssigned([NotNull] ICollection<TopicPartition> partitions)
protected virtual void OnPartitionAssigned(ICollection<TopicPartition> partitions)
{
// Ensure processors exist for each assigned topic-partition
foreach (var partition in partitions)
Expand All @@ -187,7 +185,7 @@ protected virtual void OnPartitionAssigned([NotNull] ICollection<TopicPartition>
}
}

protected virtual void OnPartitionRevoked([NotNull] ICollection<TopicPartitionOffset> partitions)
protected virtual void OnPartitionRevoked(ICollection<TopicPartitionOffset> partitions)
{
foreach (var partition in partitions)
{
Expand All @@ -198,23 +196,23 @@ protected virtual void OnPartitionRevoked([NotNull] ICollection<TopicPartitionOf
}
}

protected virtual void OnPartitionEndReached([NotNull] TopicPartitionOffset offset)
protected virtual void OnPartitionEndReached(TopicPartitionOffset offset)
{
Logger.LogDebug("Group [{Group}]: Reached end of partition, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);

var processor = _processors[offset.TopicPartition];
processor.OnPartitionEndReached(offset);
}

protected async virtual ValueTask OnMessage([NotNull] ConsumeResult message)
protected async virtual ValueTask OnMessage(ConsumeResult message)
{
Logger.LogDebug("Group [{Group}]: Received message with Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, payload size: {MessageSize}", Group, message.Topic, message.Partition, message.Offset, message.Message.Value?.Length ?? 0);

var processor = _processors[message.TopicPartition];
await processor.OnMessage(message).ConfigureAwait(false);
}

protected virtual void OnOffsetsCommitted([NotNull] CommittedOffsets e)
protected virtual void OnOffsetsCommitted(CommittedOffsets e)
{
if (e.Error.IsError || e.Error.IsFatal)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using ConsumeResult = ConsumeResult<Ignore, byte[]>;

public abstract class KafkaPartitionConsumer : IKafkaPartitionConsumer
Expand Down Expand Up @@ -30,7 +28,7 @@ protected KafkaPartitionConsumer(ILoggerFactory loggerFactory, AbstractConsumerS
_logger = loggerFactory.CreateLogger<KafkaPartitionConsumer>();

_logger.LogInformation("Creating consumer for Group: {Group}, Topic: {Topic}, Partition: {Partition}", group, topicPartition.Topic, topicPartition.Partition);

ConsumerSettings = consumerSettings ?? throw new ArgumentNullException(nameof(consumerSettings));
Group = group;
TopicPartition = topicPartition;
Expand Down Expand Up @@ -76,7 +74,7 @@ public void Dispose()

#region Implementation of IKafkaTopicPartitionProcessor

public void OnPartitionAssigned([NotNull] TopicPartition partition)
public void OnPartitionAssigned(TopicPartition partition)
{
_lastCheckpointOffset = null;
_lastOffset = null;
Expand All @@ -91,7 +89,7 @@ public void OnPartitionAssigned([NotNull] TopicPartition partition)
}
}

public async Task OnMessage([NotNull] ConsumeResult message)
public async Task OnMessage(ConsumeResult message)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
Expand Down
21 changes: 8 additions & 13 deletions src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using IProducer = Confluent.Kafka.IProducer<byte[], byte[]>;
using Message = Confluent.Kafka.Message<byte[], byte[]>;

Expand Down Expand Up @@ -156,15 +154,15 @@ protected override async Task ProduceToTransport(object message, string path, by
var deliveryResult = await task.ConfigureAwait(false);
if (deliveryResult.Status == PersistenceStatus.NotPersisted)
{
throw new ProducerMessageBusException($"Error while publish message {message} of type {messageType.Name} to topic {path}. Kafka persistence status: {deliveryResult.Status}");
throw new ProducerMessageBusException($"Error while publish message {message} of type {messageType?.Name} to topic {path}. Kafka persistence status: {deliveryResult.Status}");
}

// log some debug information
_logger.LogDebug("Message {Message} of type {MessageType} delivered to topic {Topic}, partition {Partition}, offset: {Offset}",
message, messageType?.Name, deliveryResult.Topic, deliveryResult.Partition, deliveryResult.Offset);
}

protected byte[] GetMessageKey(ProducerSettings producerSettings, [NotNull] Type messageType, object message, string topic)
protected byte[] GetMessageKey(ProducerSettings producerSettings, Type messageType, object message, string topic)
{
var keyProvider = producerSettings?.GetKeyProvider();
if (keyProvider != null)
Expand All @@ -173,27 +171,24 @@ protected byte[] GetMessageKey(ProducerSettings producerSettings, [NotNull] Type

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("The message {Message} type {MessageType} calculated key is {Key} (Base64)", message, messageType.Name, Convert.ToBase64String(key));
_logger.LogDebug("The message {Message} type {MessageType} calculated key is {Key} (Base64)", message, messageType?.Name, Convert.ToBase64String(key));
}

return key;
}
return Array.Empty<byte>();
return [];
}

private const int NoPartition = -1;

protected int GetMessagePartition(ProducerSettings producerSettings, [NotNull] Type messageType, object message, string topic)
protected int GetMessagePartition(ProducerSettings producerSettings, Type messageType, object message, string topic)
{
var partitionProvider = producerSettings.GetPartitionProvider();
if (partitionProvider != null)
{
var partition = partitionProvider(message, topic);

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("The Message {Message} type {MessageType} calculated partition is {Partition}", message, messageType.Name, partition);
}
var partition = partitionProvider(message, topic);

_logger.LogDebug("The Message {Message} type {MessageType} calculated partition is {Partition}", message, messageType?.Name, partition);

return partition;
}
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected override async Task ProduceToTransport(object message, string path, by
{
var m = new MqttApplicationMessage
{
PayloadSegment = messagePayload,
PayloadSegment = new ArraySegment<byte>(messagePayload),
Topic = path
};

Expand Down
18 changes: 17 additions & 1 deletion src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,47 @@ protected virtual SqlCommand CreateCommand()
return cmd;
}

#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
public virtual SqlTransaction CurrentTransaction => _transaction;

public async virtual ValueTask BeginTransaction()
{
ValidateNoTransactionStarted();
#if NETSTANDARD2_0
_transaction = Connection.BeginTransaction(Settings.TransactionIsolationLevel);
#else
_transaction = (SqlTransaction)await Connection.BeginTransactionAsync(Settings.TransactionIsolationLevel);
#endif
}

public async virtual ValueTask CommitTransaction()
{
ValidateTransactionStarted();

#if NETSTANDARD2_0
_transaction.Commit();
_transaction.Dispose();
#else
await _transaction.CommitAsync();
await _transaction.DisposeAsync();
#endif
_transaction = null;
}

public async virtual ValueTask RollbackTransaction()
{
ValidateTransactionStarted();

#if NETSTANDARD2_0
_transaction.Rollback();
_transaction.Dispose();
#else
await _transaction.RollbackAsync();
await _transaction.DisposeAsync();
#endif
_transaction = null;
}
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously

protected void ValidateNoTransactionStarted()
{
Expand Down Expand Up @@ -186,7 +202,7 @@ private async Task CreateIndex(string indexName, IEnumerable<string> columns, Ca
BEGIN
CREATE NONCLUSTERED INDEX [{indexName}] ON {_sqlTemplate.TableNameQualified}
(
{string.Join(',', columns.Select(c => $"{c} ASC"))}
{string.Join(",", columns.Select(c => $"{c} ASC"))}
)
END", token: token);
}
Expand Down
12 changes: 8 additions & 4 deletions src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.RabbitMQ;

using System.Runtime;

using global::RabbitMQ.Client;

public class RabbitMqTopologyService
Expand Down Expand Up @@ -117,8 +115,14 @@ private string DeclareQueue(HasProviderExtensions settings, string queueName, Ac
_logger.LogInformation("Declaring queue {QueueName}, Durable: {Durable}, AutoDelete: {AutoDelete}, Exclusive: {Exclusive}", queueName, queueDurable, queueAutoDelete, queueExclusive);
try
{
var arguments = new Dictionary<string, object>(queueArguments ?? Enumerable.Empty<KeyValuePair<string, object>>());
argumentModifier?.Invoke(arguments);
var arguments = queueArguments;
if (argumentModifier != null)
{
arguments = arguments != null
? new Dictionary<string, object>(queueArguments) // make a copy of the arguments for the modifier to mutate
: [];
argumentModifier(arguments);
}

_channel.QueueDeclare(queueName, durable: queueDurable, exclusive: queueExclusive, autoDelete: queueAutoDelete, arguments: arguments);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFramework>netstandard1.3</TargetFramework>
<Version>2.0.1</Version>
<Version>2.0.2</Version>
<Description>Core serialization interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,27 @@ public class CheckpointTrigger : ICheckpointTrigger
private int _lastCheckpointCount;
private readonly Stopwatch _lastCheckpointDuration;

public CheckpointTrigger(int countLimit, TimeSpan durationlimit, ILoggerFactory loggerFactory)
public CheckpointTrigger(CheckpointValue checkpointValue, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<CheckpointTrigger>();

_checkpointCount = countLimit;
_checkpointDuration = (int)durationlimit.TotalMilliseconds;
_checkpointCount = checkpointValue.CheckpointCount;
_checkpointDuration = (int)checkpointValue.CheckpointDuration.TotalMilliseconds;

_lastCheckpointCount = 0;
_lastCheckpointDuration = new Stopwatch();
}

public CheckpointTrigger(HasProviderExtensions settings, ILoggerFactory loggerFactory)
: this(settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
settings.GetOrDefault(CheckpointSettings.CheckpointDuration, CheckpointSettings.CheckpointDurationDefault),
loggerFactory)
: this(GetCheckpointValue(settings), loggerFactory)
{
}

public static bool IsConfigured(HasProviderExtensions settings)
=> settings.GetOrDefault<int?>(CheckpointSettings.CheckpointCount, null) != null || settings.GetOrDefault<TimeSpan?>(CheckpointSettings.CheckpointDuration, null) != null;

public static (int CheckpontCount, TimeSpan CheckpointDuration) GetConfiguration(HasProviderExtensions settings)
=> (settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
public static CheckpointValue GetCheckpointValue(HasProviderExtensions settings)
=> new(settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
settings.GetOrDefault(CheckpointSettings.CheckpointDuration, CheckpointSettings.CheckpointDurationDefault));


Expand Down Expand Up @@ -68,4 +66,4 @@ public void Reset()
}

#endregion
}
}
Loading

0 comments on commit 10f8e20

Please sign in to comment.