Skip to content

Commit

Permalink
Add ICrc32 interface
Browse files Browse the repository at this point in the history
If a library user wishes to have Crc32 checking, they must implement the
`ICrc32` interface and set an implementation on the configuration.

References:
* #19
* #285
  • Loading branch information
lukebakken committed Sep 7, 2023
1 parent c819931 commit ae797f3
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 22 deletions.
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ public ushort InitialCredits
// enables the check of the crc on the delivery.
// the server will send the crc for each chunk and the client will check it.
// It is not enabled by default because it is could reduce the performance.
public bool CheckCrcOnDelivery { get; set; } = false;
public ICrc32 Crc32 { get; set; } = null;
}
17 changes: 17 additions & 0 deletions RabbitMQ.Stream.Client/ICrc32.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

namespace RabbitMQ.Stream.Client
{
/// <summary>
/// ICrc32 defines an interface for implementing crc32 hashing.
/// Library users who wish to perform crc32 checks on data from RabbitMQ
/// should implement this interface and assign an instance to
/// <see cref="IConsumerConfig.Crc32"><code>IConsumerConfig.Crc32</code></see>.
/// </summary>
public interface ICrc32
{
byte[] Hash(byte[] data);
}
}
10 changes: 6 additions & 4 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.IConsumerConfig.CheckCrcOnDelivery.get -> bool
RabbitMQ.Stream.Client.IConsumerConfig.CheckCrcOnDelivery.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.CommandVersions
RabbitMQ.Stream.Client.CommandVersions.Command.get -> ushort
Expand All @@ -50,6 +50,8 @@ RabbitMQ.Stream.Client.ICommandVersions.MinVersion.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand All @@ -69,8 +71,8 @@ RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.CheckCrcOnDelivery.get -> bool
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.CheckCrcOnDelivery.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort
Expand Down
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="System.IO.Hashing" />
<PackageReference Include="System.IO.Pipelines" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
</ItemGroup>
Expand Down
5 changes: 2 additions & 3 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Hashing;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -467,10 +466,10 @@ private async Task Init()
if (Token.IsCancellationRequested)
return;
if (_config.CheckCrcOnDelivery)
if (_config.Crc32 is not null)
{
var crcCalculated = BitConverter.ToUInt32(
Crc32.Hash(deliver.Chunk.Data.ToArray())
_config.Crc32.Hash(deliver.Chunk.Data.ToArray())
);
if (crcCalculated != deliver.Chunk.Crc)
{
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private RawConsumerConfig FromStreamConfig(string stream)
IsSingleActiveConsumer = _config.IsSingleActiveConsumer,
ConsumerUpdateListener = _config.ConsumerUpdateListener,
ConsumerFilter = _config.ConsumerFilter,
CheckCrcOnDelivery = _config.CheckCrcOnDelivery,
Crc32 = _config.Crc32,
ConnectionClosedHandler = async (string s) =>
{
// if the stream is still in the consumer list
Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ public record ConsumerConfig : ReliableConfig
public ConsumerFilter Filter { get; set; } = null;

/// <summary>
/// CheckCrcOnDelivery enables the check of the crc on the delivery.
/// the server will send the crc for each chunk and the client will check it.
/// Eenables the check of the crc on the delivery when set to an implementation
/// of <see cref="ICrc32"><code>ICrc32</code></see>.
/// >he server will send the crc for each chunk and the client will check it.
/// It is not enabled by default because it is could reduce the performance.
/// Default value is false.
/// </summary>
public bool CheckCrcOnDelivery { get; set; } = false;
public ICrc32 Crc32 { get; set; } = null;

public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream)
{
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
InitialCredits = _consumerConfig.InitialCredits,
OffsetSpec = offsetSpec,
ConsumerFilter = _consumerConfig.Filter,
CheckCrcOnDelivery = _consumerConfig.CheckCrcOnDelivery,
Crc32 = _consumerConfig.Crc32,
ConnectionClosedHandler = async _ =>
{
await TryToReconnect(_consumerConfig.ReconnectStrategy).ConfigureAwait(false);
Expand Down Expand Up @@ -111,7 +111,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer,
InitialCredits = _consumerConfig.InitialCredits,
ConsumerFilter = _consumerConfig.Filter,
CheckCrcOnDelivery = _consumerConfig.CheckCrcOnDelivery,
Crc32 = _consumerConfig.Crc32,
OffsetSpec = offsetSpecs,
MessageHandler = async (stream, consumer, ctx, message) =>
{
Expand Down
15 changes: 15 additions & 0 deletions Tests/Crc32.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using RabbitMQ.Stream.Client;

namespace Tests;

public class Crc32 : ICrc32
{
public byte[] Hash(byte[] data)
{
return System.IO.Hashing.Crc32.Hash(data);
}
}
7 changes: 4 additions & 3 deletions Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Tests
{
public class ConsumerSystemTests
{
private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper testOutputHelper;

public ConsumerSystemTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -416,7 +417,7 @@ public async void Amqp091MessagesConsumer()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
CheckCrcOnDelivery = true,
Crc32 = _crc32,
Reference = "consumer",
MessageHandler = async (consumer, ctx, message) =>
{
Expand Down Expand Up @@ -463,7 +464,7 @@ public async void ConsumerQueryOffset()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
CheckCrcOnDelivery = true,
Crc32 = _crc32,
Reference = reference,
OffsetSpec = new OffsetTypeOffset(),
MessageHandler = async (consumer, ctx, message) =>
Expand Down Expand Up @@ -534,7 +535,7 @@ public async void ShouldConsumeFromStoredOffset()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
CheckCrcOnDelivery = false,
Crc32 = _crc32,
Reference = Reference,
OffsetSpec = new OffsetTypeOffset(),
MessageHandler = async (consumer, ctx, _) =>
Expand Down
5 changes: 3 additions & 2 deletions Tests/ReliableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Tests;

public class ReliableTests
{
private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper _testOutputHelper;

public ReliableTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -323,7 +324,7 @@ public async void FirstConsumeAfterKillConnectionShouldContinueToWork()
var messagesReceived = 0;
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
{
CheckCrcOnDelivery = true,
Crc32 = _crc32,
Reference = reference,
ClientProvidedName = clientProviderName,
OffsetSpec = new OffsetTypeFirst(),
Expand Down Expand Up @@ -369,7 +370,7 @@ public async void ConsumeAfterKillConnectionShouldContinueToWork()
var messagesReceived = 0;
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
{
CheckCrcOnDelivery = true,
Crc32 = _crc32,
Reference = reference,
ClientProvidedName = clientProviderName,
OffsetSpec = new OffsetTypeFirst(),
Expand Down
3 changes: 2 additions & 1 deletion Tests/SuperStreamConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Tests;

public class SuperStreamConsumerTests
{
private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper _testOutputHelper;

public SuperStreamConsumerTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -64,7 +65,7 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished()
var consumer = await system.CreateSuperStreamConsumer(
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange)
{
CheckCrcOnDelivery = true,
Crc32 = _crc32,
ClientProvidedName = clientProvidedName,
OffsetSpec = await SystemUtils.OffsetsForSuperStreamConsumer(system, SystemUtils.InvoicesExchange, new OffsetTypeFirst()),
MessageHandler = (stream, consumer1, context, message) =>
Expand Down
1 change: 1 addition & 0 deletions Tests/Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<PackageReference Include="AmqpNetLite" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="RabbitMQ.Client" />
<PackageReference Include="System.IO.Hashing" />
<PackageReference Include="xunit" />
<PackageReference Include="xunit.runner.visualstudio">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down

0 comments on commit ae797f3

Please sign in to comment.