Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CRC32 checkSum control #285

Merged
merged 10 commits into from
Sep 7, 2023
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ internal static int GetAnySize(object value)
bool => GetBoolSize(),
float => GetFloatSize(),
byte[] bArr => GetBytesSize(bArr),
byte => 1,
byte => 1 // FormatCode.Ubyte
+ 1, // value
DateTime d => GetTimestampSize(d),
_ => throw new AmqpParseException($"GetAnySize Invalid type {value}")
};
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private async Task ProcessIncomingFrames()
var memory =
ArrayPool<byte>.Shared.Rent((int)frame.Length).AsMemory(0, (int)frame.Length);
frame.CopyTo(memory.Span);

await commandCallback(memory).ConfigureAwait(false);
numFrames += 1;
}
Expand Down
32 changes: 25 additions & 7 deletions RabbitMQ.Stream.Client/Deliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
// We need to pass it to the subEntryChunk that will decode the information
var memory =
ArrayPool<byte>.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen);
var data = reader.Sequence.Slice(reader.Consumed, dataLen);
data.CopyTo(memory.Span);

//see the Chunk.Read for more details about this check
if (!reader.TryCopyTo(memory.Span))
lukebakken marked this conversation as resolved.
Show resolved Hide resolved
{
throw new Exception(
$"SubEntryChunk Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

subEntryChunk =
new SubEntryChunk(compress, numRecordsInBatch, unCompressedDataSize, dataLen, memory);
Expand All @@ -103,6 +108,7 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
// Here we need to advance the reader to the datalen
// Since Data is passed to the subEntryChunk.
reader.Advance(dataLen);

return offset;
}
}
Expand All @@ -115,7 +121,7 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
long timestamp,
ulong epoch,
ulong chunkId,
int crc,
uint crc,
Memory<byte> data)
{
MagicVersion = magicVersion;
Expand All @@ -135,7 +141,7 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
public long Timestamp { get; }
public ulong Epoch { get; }
public ulong ChunkId { get; }
public int Crc { get; }
public uint Crc { get; }
public Memory<byte> Data { get; }

internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
Expand All @@ -148,15 +154,27 @@ internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
offset += WireFormatting.ReadInt64(ref reader, out var timestamp);
offset += WireFormatting.ReadUInt64(ref reader, out var epoch);
offset += WireFormatting.ReadUInt64(ref reader, out var chunkId);
offset += WireFormatting.ReadInt32(ref reader, out var crc);
offset += WireFormatting.ReadUInt32(ref reader, out var crc);
offset += WireFormatting.ReadUInt32(ref reader, out var dataLen);
offset += WireFormatting.ReadUInt32(ref reader, out _);
// offset += 4; // reserved
offset += WireFormatting.ReadUInt32(ref reader, out _); // reserved
var memory =
ArrayPool<byte>.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen);
var data = reader.Sequence.Slice(reader.Consumed, dataLen);
lukebakken marked this conversation as resolved.
Show resolved Hide resolved
data.CopyTo(memory.Span);

// the reader in this position contains the chunk information
// we copy it in a memory stream to be sure that we have all the data
// the memory stream will passed to the consumer to parse the messages
// reader.TryCopyTo will return false if the data is not enough.
// The chunk should be always complete, so this check is just a safety check
// the connection::TryReadFrame reads the frame based on the length
// so here we should have all the data
if (!reader.TryCopyTo(memory.Span))
{
throw new Exception(
$"Chunk: Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, memory);
return offset;
}
Expand Down
49 changes: 49 additions & 0 deletions RabbitMQ.Stream.Client/Hash/Crc32.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;

namespace RabbitMQ.Stream.Client;

internal class Crc32
{
private static readonly uint[] s_crcTable;

static Crc32()
{
s_crcTable = new uint[256];
const uint Polynomial = 0xEDB88320U;

for (uint i = 0; i < 256; i++)
{
var crc = i;
for (var j = 0; j < 8; j++)
{
if ((crc & 1) != 0)
{
crc = (crc >> 1) ^ Polynomial;
}
else
{
crc >>= 1;
}
}

s_crcTable[i] = crc;
}
}

internal static uint ComputeHash(Span<byte> Span)
{

var crc = 0xFFFFFFFFU;

foreach (var b in Span)
{
crc = (crc >> 8) ^ s_crcTable[(crc & 0xFF) ^ b];
}

return crc ^ 0xFFFFFFFFU;
}
}
5 changes: 5 additions & 0 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,9 @@ public ushort InitialCredits
_initialCredits = value;
}
}

// enables the check of the crc on the delivery.
// the server will send the crc for each chunk and the client will check it.
// It is not enabled by default because it is could reduce the performance.
public bool CheckCrcOnDelivery { get; set; } = false;
}
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ RabbitMQ.Stream.Client.Broker.Port.get -> uint
RabbitMQ.Stream.Client.Chunk
RabbitMQ.Stream.Client.Chunk.Chunk() -> void
RabbitMQ.Stream.Client.Chunk.ChunkId.get -> ulong
RabbitMQ.Stream.Client.Chunk.Crc.get -> int
RabbitMQ.Stream.Client.Chunk.Epoch.get -> ulong
RabbitMQ.Stream.Client.Chunk.NumEntries.get -> ushort
RabbitMQ.Stream.Client.Chunk.NumRecords.get -> uint
Expand Down
5 changes: 5 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.Auth
RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSupportedException(string s) -> void
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
lukebakken marked this conversation as resolved.
Show resolved Hide resolved
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
Expand All @@ -22,6 +23,8 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.IConsumerConfig.CheckCrcOnDelivery.get -> bool
RabbitMQ.Stream.Client.IConsumerConfig.CheckCrcOnDelivery.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.CommandVersions
RabbitMQ.Stream.Client.CommandVersions.Command.get -> ushort
Expand Down Expand Up @@ -66,6 +69,8 @@ RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.CheckCrcOnDelivery.get -> bool
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.CheckCrcOnDelivery.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort
Expand Down
15 changes: 15 additions & 0 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,21 @@ private async Task Init()
// introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
if (Token.IsCancellationRequested)
return;

if (_config.CheckCrcOnDelivery)
{
var crcCalculated = Crc32.ComputeHash(deliver.Chunk.Data.ToArray());
if (crcCalculated != deliver.Chunk.Crc)
{
_logger.LogError(
"CRC32 does not match, server crc {ChunkCrc}, local crc {CrcCalculated}, stream {Stream}",
deliver.Chunk.Crc, crcCalculated, _config.Stream);
throw new Exception(
$"CRC32 does not match, server crc {deliver.Chunk.Crc}, local crc {crcCalculated}, " +
$"stream {_config.Stream}");
}
}

await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
}, async promotedAsActive =>
{
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private RawConsumerConfig FromStreamConfig(string stream)
IsSingleActiveConsumer = _config.IsSingleActiveConsumer,
ConsumerUpdateListener = _config.ConsumerUpdateListener,
ConsumerFilter = _config.ConsumerFilter,

CheckCrcOnDelivery = _config.CheckCrcOnDelivery,
ConnectionClosedHandler = async (string s) =>
{
// if the stream is still in the consumer list
Expand Down
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public record ConsumerConfig : ReliableConfig
/// </summary>
public ConsumerFilter Filter { get; set; } = null;

/// <summary>
/// CheckCrcOnDelivery enables the check of the crc on the delivery.
/// the server will send the crc for each chunk and the client will check it.
/// It is not enabled by default because it is could reduce the performance.
/// Default value is false.
/// </summary>
public bool CheckCrcOnDelivery { get; set; } = false;

public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream)
{
}
Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
InitialCredits = _consumerConfig.InitialCredits,
OffsetSpec = offsetSpec,
ConsumerFilter = _consumerConfig.Filter,
CheckCrcOnDelivery = _consumerConfig.CheckCrcOnDelivery,
ConnectionClosedHandler = async _ =>
{
await TryToReconnect(_consumerConfig.ReconnectStrategy).ConfigureAwait(false);
Expand Down Expand Up @@ -110,6 +111,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer,
InitialCredits = _consumerConfig.InitialCredits,
ConsumerFilter = _consumerConfig.Filter,
CheckCrcOnDelivery = _consumerConfig.CheckCrcOnDelivery,
OffsetSpec = offsetSpecs,
MessageHandler = async (stream, consumer, ctx, message) =>
{
Expand Down
2 changes: 1 addition & 1 deletion Tests/Amqp10Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public void ValidateMapsType()
Assert.Equal(14, AmqpWireFormatting.GetAnySize("string_value"));
Assert.Equal(9, AmqpWireFormatting.GetAnySize(DoubleValue));
Assert.Equal(1, AmqpWireFormatting.GetAnySize(true));
Assert.Equal(1, AmqpWireFormatting.GetAnySize((byte)1));
Assert.Equal(2, AmqpWireFormatting.GetAnySize((byte)1));
Assert.Equal(3, AmqpWireFormatting.GetAnySize((short)1));
// In this case is a byte
Assert.Equal(2, AmqpWireFormatting.GetAnySize(1));
Expand Down
3 changes: 3 additions & 0 deletions Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ public async void Amqp091MessagesConsumer()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
CheckCrcOnDelivery = true,
Reference = "consumer",
MessageHandler = async (consumer, ctx, message) =>
{
Expand Down Expand Up @@ -462,6 +463,7 @@ public async void ConsumerQueryOffset()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
CheckCrcOnDelivery = true,
Reference = reference,
OffsetSpec = new OffsetTypeOffset(),
MessageHandler = async (consumer, ctx, message) =>
Expand Down Expand Up @@ -532,6 +534,7 @@ public async void ShouldConsumeFromStoredOffset()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
CheckCrcOnDelivery = false,
Reference = Reference,
OffsetSpec = new OffsetTypeOffset(),
MessageHandler = async (consumer, ctx, _) =>
Expand Down
2 changes: 2 additions & 0 deletions Tests/ReliableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ public async void FirstConsumeAfterKillConnectionShouldContinueToWork()
var messagesReceived = 0;
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
{
CheckCrcOnDelivery = true,
Reference = reference,
ClientProvidedName = clientProviderName,
OffsetSpec = new OffsetTypeFirst(),
Expand Down Expand Up @@ -368,6 +369,7 @@ public async void ConsumeAfterKillConnectionShouldContinueToWork()
var messagesReceived = 0;
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
{
CheckCrcOnDelivery = true,
Reference = reference,
ClientProvidedName = clientProviderName,
OffsetSpec = new OffsetTypeFirst(),
Expand Down
1 change: 1 addition & 0 deletions Tests/SuperStreamConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished()
var consumer = await system.CreateSuperStreamConsumer(
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange)
{
CheckCrcOnDelivery = true,
ClientProvidedName = clientProvidedName,
OffsetSpec = await SystemUtils.OffsetsForSuperStreamConsumer(system, SystemUtils.InvoicesExchange, new OffsetTypeFirst()),
MessageHandler = (stream, consumer1, context, message) =>
Expand Down
24 changes: 24 additions & 0 deletions Tests/UnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -394,5 +394,29 @@ public void CheckAvailableFeatures()

Assert.True(AvailableFeaturesSingleton.Instance.PublishFilter);
}

[Fact]
public void Crc32Validation()
{
// validate some crc, values from https://crccalc.com/
// Since the crc is local to the library here we do just some basic test
// to validate the crc32 algorithm.
var memorySpan = new Span<byte>(new byte[4]);
WireFormatting.WriteUInt32(memorySpan, 0x12345678);
Assert.True(1242107544 == Crc32.ComputeHash(memorySpan));

memorySpan = new Span<byte>(new byte[4]);
WireFormatting.WriteUInt32(memorySpan, 0x55555555);
Assert.True(1798160573 == Crc32.ComputeHash(memorySpan));

memorySpan = new Span<byte>(new byte[1]);
WireFormatting.WriteByte(memorySpan, 1);
Assert.True(2768625435 == Crc32.ComputeHash(memorySpan));

memorySpan = new Span<byte>(new byte[2]);
WireFormatting.WriteInt16(memorySpan, 0x1234);
Assert.True(412718745 == Crc32.ComputeHash(memorySpan));

}
}
}