Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CRC32 checkSum control #285

Merged
merged 10 commits into from
Sep 7, 2023
3 changes: 3 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
</ItemGroup>


<ItemGroup Label=".NET 6 Specific" Condition="'$(TargetFramework)' == 'net6.0'">
<!-- RabbitMQ.Stream.Client -->
<PackageVersion Include="System.IO.Hashing" Version="6.0.0" />
<PackageVersion Include="System.IO.Pipelines" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
</ItemGroup>

<ItemGroup Label=".NET 7 Specific" Condition="'$(TargetFramework)' == 'net7.0'">
<!-- RabbitMQ.Stream.Client -->
<PackageVersion Include="System.IO.Hashing" Version="7.0.0" />
<PackageVersion Include="System.IO.Pipelines" Version="7.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
</ItemGroup>
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ internal static int GetAnySize(object value)
bool => GetBoolSize(),
float => GetFloatSize(),
byte[] bArr => GetBytesSize(bArr),
byte => 1,
byte => 1 // FormatCode.Ubyte
+ 1, // value
DateTime d => GetTimestampSize(d),
_ => throw new AmqpParseException($"GetAnySize Invalid type {value}")
};
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
{
if (command.CorrelationId == uint.MaxValue)
{
throw new Exception($"unhandled incoming command {command.GetType()}");
throw new UnknownCommandException($"unhandled incoming command {command.GetType()}");
}

if (requests.TryRemove(command.CorrelationId, out var tsc))
Expand Down
16 changes: 16 additions & 0 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,20 @@ public UnsupportedOperationException(string s)
{
}
}

public class UnknownCommandException : Exception
{
public UnknownCommandException(string s)
: base(s)
{
}
}

public class CrcException : Exception
{
public CrcException(string s)
: base(s)
{
}
}
}
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private async Task ProcessIncomingFrames()
var memory =
ArrayPool<byte>.Shared.Rent((int)frame.Length).AsMemory(0, (int)frame.Length);
frame.CopyTo(memory.Span);

await commandCallback(memory).ConfigureAwait(false);
numFrames += 1;
}
Expand Down
32 changes: 25 additions & 7 deletions RabbitMQ.Stream.Client/Deliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
// We need to pass it to the subEntryChunk that will decode the information
var memory =
ArrayPool<byte>.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen);
var data = reader.Sequence.Slice(reader.Consumed, dataLen);
data.CopyTo(memory.Span);

//see the Chunk.Read for more details about this check
if (!reader.TryCopyTo(memory.Span))
lukebakken marked this conversation as resolved.
Show resolved Hide resolved
{
throw new NotEnoughDataException(
$"SubEntryChunk Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

subEntryChunk =
new SubEntryChunk(compress, numRecordsInBatch, unCompressedDataSize, dataLen, memory);
Expand All @@ -103,6 +108,7 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
// Here we need to advance the reader to the datalen
// Since Data is passed to the subEntryChunk.
reader.Advance(dataLen);

return offset;
}
}
Expand All @@ -115,7 +121,7 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
long timestamp,
ulong epoch,
ulong chunkId,
int crc,
uint crc,
Memory<byte> data)
{
MagicVersion = magicVersion;
Expand All @@ -135,7 +141,7 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
public long Timestamp { get; }
public ulong Epoch { get; }
public ulong ChunkId { get; }
public int Crc { get; }
public uint Crc { get; }
public Memory<byte> Data { get; }

internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
Expand All @@ -148,15 +154,27 @@ internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
offset += WireFormatting.ReadInt64(ref reader, out var timestamp);
offset += WireFormatting.ReadUInt64(ref reader, out var epoch);
offset += WireFormatting.ReadUInt64(ref reader, out var chunkId);
offset += WireFormatting.ReadInt32(ref reader, out var crc);
offset += WireFormatting.ReadUInt32(ref reader, out var crc);
offset += WireFormatting.ReadUInt32(ref reader, out var dataLen);
offset += WireFormatting.ReadUInt32(ref reader, out _);
// offset += 4; // reserved
offset += WireFormatting.ReadUInt32(ref reader, out _); // reserved
var memory =
ArrayPool<byte>.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen);
var data = reader.Sequence.Slice(reader.Consumed, dataLen);
lukebakken marked this conversation as resolved.
Show resolved Hide resolved
data.CopyTo(memory.Span);

// the reader in this position contains the chunk information
// we copy it in a memory stream to be sure that we have all the data
// the memory stream will passed to the consumer to parse the messages
// reader.TryCopyTo will return false if the data is not enough.
// The chunk should be always complete, so this check is just a safety check
// the connection::TryReadFrame reads the frame based on the length
// so here we should have all the data
if (!reader.TryCopyTo(memory.Span))
{
throw new NotEnoughDataException(
$"Chunk: Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, memory);
return offset;
}
Expand Down
5 changes: 5 additions & 0 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,9 @@ public ushort InitialCredits
_initialCredits = value;
}
}

// enables the check of the crc on the delivery.
// the server will send the crc for each chunk and the client will check it.
// It is not enabled by default because it is could reduce the performance.
public ICrc32 Crc32 { get; set; } = null;
}
17 changes: 17 additions & 0 deletions RabbitMQ.Stream.Client/ICrc32.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

namespace RabbitMQ.Stream.Client
{
/// <summary>
/// ICrc32 defines an interface for implementing crc32 hashing.
/// Library users who wish to perform crc32 checks on data from RabbitMQ
/// should implement this interface and assign an instance to
/// <see cref="IConsumerConfig.Crc32"><code>IConsumerConfig.Crc32</code></see>.
/// </summary>
public interface ICrc32
{
byte[] Hash(byte[] data);
}
}
16 changes: 16 additions & 0 deletions RabbitMQ.Stream.Client/InternalExceptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;

namespace RabbitMQ.Stream.Client
{
internal class NotEnoughDataException : Exception
{
public NotEnoughDataException(string s)
: base(s)
{
}
}
}
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ RabbitMQ.Stream.Client.Broker.Port.get -> uint
RabbitMQ.Stream.Client.Chunk
RabbitMQ.Stream.Client.Chunk.Chunk() -> void
RabbitMQ.Stream.Client.Chunk.ChunkId.get -> ulong
RabbitMQ.Stream.Client.Chunk.Crc.get -> int
RabbitMQ.Stream.Client.Chunk.Epoch.get -> ulong
RabbitMQ.Stream.Client.Chunk.NumEntries.get -> ushort
RabbitMQ.Stream.Client.Chunk.NumRecords.get -> uint
Expand Down
11 changes: 11 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.Auth
RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSupportedException(string s) -> void
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
lukebakken marked this conversation as resolved.
Show resolved Hide resolved
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
Expand All @@ -21,7 +22,11 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.get -> System.Func<RabbitMQ.Str
RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.CrcException
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.CommandVersions
RabbitMQ.Stream.Client.CommandVersions.Command.get -> ushort
Expand All @@ -47,6 +52,8 @@ RabbitMQ.Stream.Client.ICommandVersions.MinVersion.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand All @@ -66,6 +73,8 @@ RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort
Expand Down Expand Up @@ -110,6 +119,8 @@ RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
RabbitMQ.Stream.Client.UnknownCommandException
RabbitMQ.Stream.Client.UnknownCommandException.UnknownCommandException(string s) -> void
RabbitMQ.Stream.Client.UnsupportedOperationException
RabbitMQ.Stream.Client.UnsupportedOperationException.UnsupportedOperationException(string s) -> void
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
Expand Down
17 changes: 17 additions & 0 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,23 @@ private async Task Init()
// introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
if (Token.IsCancellationRequested)
return;

if (_config.Crc32 is not null)
{
var crcCalculated = BitConverter.ToUInt32(
_config.Crc32.Hash(deliver.Chunk.Data.ToArray())
);
if (crcCalculated != deliver.Chunk.Crc)
{
_logger.LogError(
"CRC32 does not match, server crc {ChunkCrc}, local crc {CrcCalculated}, stream {Stream}",
deliver.Chunk.Crc, crcCalculated, _config.Stream);
throw new CrcException(
$"CRC32 does not match, server crc {deliver.Chunk.Crc}, local crc {crcCalculated}, " +
$"stream {_config.Stream}");
}
}

await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
}, async promotedAsActive =>
{
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private RawConsumerConfig FromStreamConfig(string stream)
IsSingleActiveConsumer = _config.IsSingleActiveConsumer,
ConsumerUpdateListener = _config.ConsumerUpdateListener,
ConsumerFilter = _config.ConsumerFilter,

Crc32 = _config.Crc32,
ConnectionClosedHandler = async (string s) =>
{
// if the stream is still in the consumer list
Expand Down
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public record ConsumerConfig : ReliableConfig
/// </summary>
public ConsumerFilter Filter { get; set; } = null;

/// <summary>
/// Eenables the check of the crc on the delivery when set to an implementation
/// of <see cref="ICrc32"><code>ICrc32</code></see>.
/// >he server will send the crc for each chunk and the client will check it.
/// It is not enabled by default because it is could reduce the performance.
/// </summary>
public ICrc32 Crc32 { get; set; } = null;

public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream)
{
}
Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
InitialCredits = _consumerConfig.InitialCredits,
OffsetSpec = offsetSpec,
ConsumerFilter = _consumerConfig.Filter,
Crc32 = _consumerConfig.Crc32,
ConnectionClosedHandler = async _ =>
{
await TryToReconnect(_consumerConfig.ReconnectStrategy).ConfigureAwait(false);
Expand Down Expand Up @@ -110,6 +111,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer,
InitialCredits = _consumerConfig.InitialCredits,
ConsumerFilter = _consumerConfig.Filter,
Crc32 = _consumerConfig.Crc32,
OffsetSpec = offsetSpecs,
MessageHandler = async (stream, consumer, ctx, message) =>
{
Expand Down
2 changes: 1 addition & 1 deletion Tests/Amqp10Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public void ValidateMapsType()
Assert.Equal(14, AmqpWireFormatting.GetAnySize("string_value"));
Assert.Equal(9, AmqpWireFormatting.GetAnySize(DoubleValue));
Assert.Equal(1, AmqpWireFormatting.GetAnySize(true));
Assert.Equal(1, AmqpWireFormatting.GetAnySize((byte)1));
Assert.Equal(2, AmqpWireFormatting.GetAnySize((byte)1));
Assert.Equal(3, AmqpWireFormatting.GetAnySize((short)1));
// In this case is a byte
Assert.Equal(2, AmqpWireFormatting.GetAnySize(1));
Expand Down
15 changes: 15 additions & 0 deletions Tests/Crc32.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using RabbitMQ.Stream.Client;

namespace Tests;

public class Crc32 : ICrc32
{
public byte[] Hash(byte[] data)
{
return System.IO.Hashing.Crc32.Hash(data);
}
}
4 changes: 4 additions & 0 deletions Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Tests
{
public class ConsumerSystemTests
{
private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper testOutputHelper;

public ConsumerSystemTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -416,6 +417,7 @@ public async void Amqp091MessagesConsumer()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
Crc32 = _crc32,
Reference = "consumer",
MessageHandler = async (consumer, ctx, message) =>
{
Expand Down Expand Up @@ -462,6 +464,7 @@ public async void ConsumerQueryOffset()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
Crc32 = _crc32,
Reference = reference,
OffsetSpec = new OffsetTypeOffset(),
MessageHandler = async (consumer, ctx, message) =>
Expand Down Expand Up @@ -532,6 +535,7 @@ public async void ShouldConsumeFromStoredOffset()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
Crc32 = _crc32,
Reference = Reference,
OffsetSpec = new OffsetTypeOffset(),
MessageHandler = async (consumer, ctx, _) =>
Expand Down
3 changes: 3 additions & 0 deletions Tests/ReliableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Tests;

public class ReliableTests
{
private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper _testOutputHelper;

public ReliableTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -323,6 +324,7 @@ public async void FirstConsumeAfterKillConnectionShouldContinueToWork()
var messagesReceived = 0;
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
{
Crc32 = _crc32,
Reference = reference,
ClientProvidedName = clientProviderName,
OffsetSpec = new OffsetTypeFirst(),
Expand Down Expand Up @@ -368,6 +370,7 @@ public async void ConsumeAfterKillConnectionShouldContinueToWork()
var messagesReceived = 0;
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
{
Crc32 = _crc32,
Reference = reference,
ClientProvidedName = clientProviderName,
OffsetSpec = new OffsetTypeFirst(),
Expand Down
2 changes: 2 additions & 0 deletions Tests/SuperStreamConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Tests;

public class SuperStreamConsumerTests
{
private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper _testOutputHelper;

public SuperStreamConsumerTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -64,6 +65,7 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished()
var consumer = await system.CreateSuperStreamConsumer(
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange)
{
Crc32 = _crc32,
ClientProvidedName = clientProvidedName,
OffsetSpec = await SystemUtils.OffsetsForSuperStreamConsumer(system, SystemUtils.InvoicesExchange, new OffsetTypeFirst()),
MessageHandler = (stream, consumer1, context, message) =>
Expand Down
Loading