Skip to content

Commit

Permalink
Merge 494a0eb into 8a85779
Browse files Browse the repository at this point in the history
  • Loading branch information
Gsantomaggio committed Sep 7, 2023
2 parents 8a85779 + 494a0eb commit c93b41d
Show file tree
Hide file tree
Showing 21 changed files with 151 additions and 12 deletions.
3 changes: 3 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
</ItemGroup>


<ItemGroup Label=".NET 6 Specific" Condition="'$(TargetFramework)' == 'net6.0'">
<!-- RabbitMQ.Stream.Client -->
<PackageVersion Include="System.IO.Hashing" Version="6.0.0" />
<PackageVersion Include="System.IO.Pipelines" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
</ItemGroup>

<ItemGroup Label=".NET 7 Specific" Condition="'$(TargetFramework)' == 'net7.0'">
<!-- RabbitMQ.Stream.Client -->
<PackageVersion Include="System.IO.Hashing" Version="7.0.0" />
<PackageVersion Include="System.IO.Pipelines" Version="7.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
</ItemGroup>
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/AMQP/AmqpWireFormattingWrite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ internal static int GetAnySize(object value)
bool => GetBoolSize(),
float => GetFloatSize(),
byte[] bArr => GetBytesSize(bArr),
byte => 1,
byte => 1 // FormatCode.Ubyte
+ 1, // value
DateTime d => GetTimestampSize(d),
_ => throw new AmqpParseException($"GetAnySize Invalid type {value}")
};
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
{
if (command.CorrelationId == uint.MaxValue)
{
throw new Exception($"unhandled incoming command {command.GetType()}");
throw new UnknownCommandException($"unhandled incoming command {command.GetType()}");
}

if (requests.TryRemove(command.CorrelationId, out var tsc))
Expand Down
16 changes: 16 additions & 0 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,20 @@ public UnsupportedOperationException(string s)
{
}
}

public class UnknownCommandException : Exception
{
public UnknownCommandException(string s)
: base(s)
{
}
}

public class CrcException : Exception
{
public CrcException(string s)
: base(s)
{
}
}
}
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private async Task ProcessIncomingFrames()
var memory =
ArrayPool<byte>.Shared.Rent((int)frame.Length).AsMemory(0, (int)frame.Length);
frame.CopyTo(memory.Span);

await commandCallback(memory).ConfigureAwait(false);
numFrames += 1;
}
Expand Down
32 changes: 25 additions & 7 deletions RabbitMQ.Stream.Client/Deliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
// We need to pass it to the subEntryChunk that will decode the information
var memory =
ArrayPool<byte>.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen);
var data = reader.Sequence.Slice(reader.Consumed, dataLen);
data.CopyTo(memory.Span);

//see the Chunk.Read for more details about this check
if (!reader.TryCopyTo(memory.Span))
{
throw new NotEnoughDataException(
$"SubEntryChunk Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

subEntryChunk =
new SubEntryChunk(compress, numRecordsInBatch, unCompressedDataSize, dataLen, memory);
Expand All @@ -103,6 +108,7 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
// Here we need to advance the reader to the datalen
// Since Data is passed to the subEntryChunk.
reader.Advance(dataLen);

return offset;
}
}
Expand All @@ -115,7 +121,7 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
long timestamp,
ulong epoch,
ulong chunkId,
int crc,
uint crc,
Memory<byte> data)
{
MagicVersion = magicVersion;
Expand All @@ -135,7 +141,7 @@ internal static int Read(ref SequenceReader<byte> reader, byte entryType, out Su
public long Timestamp { get; }
public ulong Epoch { get; }
public ulong ChunkId { get; }
public int Crc { get; }
public uint Crc { get; }
public Memory<byte> Data { get; }

internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
Expand All @@ -148,15 +154,27 @@ internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
offset += WireFormatting.ReadInt64(ref reader, out var timestamp);
offset += WireFormatting.ReadUInt64(ref reader, out var epoch);
offset += WireFormatting.ReadUInt64(ref reader, out var chunkId);
offset += WireFormatting.ReadInt32(ref reader, out var crc);
offset += WireFormatting.ReadUInt32(ref reader, out var crc);
offset += WireFormatting.ReadUInt32(ref reader, out var dataLen);
offset += WireFormatting.ReadUInt32(ref reader, out _);
// offset += 4; // reserved
offset += WireFormatting.ReadUInt32(ref reader, out _); // reserved
var memory =
ArrayPool<byte>.Shared.Rent((int)dataLen).AsMemory(0, (int)dataLen);
var data = reader.Sequence.Slice(reader.Consumed, dataLen);
data.CopyTo(memory.Span);

// the reader in this position contains the chunk information
// we copy it in a memory stream to be sure that we have all the data
// the memory stream will passed to the consumer to parse the messages
// reader.TryCopyTo will return false if the data is not enough.
// The chunk should be always complete, so this check is just a safety check
// the connection::TryReadFrame reads the frame based on the length
// so here we should have all the data
if (!reader.TryCopyTo(memory.Span))
{
throw new NotEnoughDataException(
$"Chunk: Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
}

chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, memory);
return offset;
}
Expand Down
5 changes: 5 additions & 0 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,9 @@ public ushort InitialCredits
_initialCredits = value;
}
}

// enables the check of the crc on the delivery.
// the server will send the crc for each chunk and the client will check it.
// It is not enabled by default because it is could reduce the performance.
public ICrc32 Crc32 { get; set; } = null;
}
17 changes: 17 additions & 0 deletions RabbitMQ.Stream.Client/ICrc32.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

namespace RabbitMQ.Stream.Client
{
/// <summary>
/// ICrc32 defines an interface for implementing crc32 hashing.
/// Library users who wish to perform crc32 checks on data from RabbitMQ
/// should implement this interface and assign an instance to
/// <see cref="IConsumerConfig.Crc32"><code>IConsumerConfig.Crc32</code></see>.
/// </summary>
public interface ICrc32
{
byte[] Hash(byte[] data);
}
}
16 changes: 16 additions & 0 deletions RabbitMQ.Stream.Client/InternalExceptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;

namespace RabbitMQ.Stream.Client
{
internal class NotEnoughDataException : Exception
{
public NotEnoughDataException(string s)
: base(s)
{
}
}
}
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ RabbitMQ.Stream.Client.Broker.Port.get -> uint
RabbitMQ.Stream.Client.Chunk
RabbitMQ.Stream.Client.Chunk.Chunk() -> void
RabbitMQ.Stream.Client.Chunk.ChunkId.get -> ulong
RabbitMQ.Stream.Client.Chunk.Crc.get -> int
RabbitMQ.Stream.Client.Chunk.Epoch.get -> ulong
RabbitMQ.Stream.Client.Chunk.NumEntries.get -> ushort
RabbitMQ.Stream.Client.Chunk.NumRecords.get -> uint
Expand Down
11 changes: 11 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.Auth
RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException
RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSupportedException(string s) -> void
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
Expand All @@ -21,7 +22,11 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.get -> System.Func<RabbitMQ.Str
RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.CrcException
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.CommandVersions
RabbitMQ.Stream.Client.CommandVersions.Command.get -> ushort
Expand All @@ -47,6 +52,8 @@ RabbitMQ.Stream.Client.ICommandVersions.MinVersion.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand All @@ -66,6 +73,8 @@ RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort
Expand Down Expand Up @@ -110,6 +119,8 @@ RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
RabbitMQ.Stream.Client.UnknownCommandException
RabbitMQ.Stream.Client.UnknownCommandException.UnknownCommandException(string s) -> void
RabbitMQ.Stream.Client.UnsupportedOperationException
RabbitMQ.Stream.Client.UnsupportedOperationException.UnsupportedOperationException(string s) -> void
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
Expand Down
17 changes: 17 additions & 0 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,23 @@ private async Task Init()
// introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
if (Token.IsCancellationRequested)
return;
if (_config.Crc32 is not null)
{
var crcCalculated = BitConverter.ToUInt32(
_config.Crc32.Hash(deliver.Chunk.Data.ToArray())
);
if (crcCalculated != deliver.Chunk.Crc)
{
_logger.LogError(
"CRC32 does not match, server crc {ChunkCrc}, local crc {CrcCalculated}, stream {Stream}",
deliver.Chunk.Crc, crcCalculated, _config.Stream);
throw new CrcException(
$"CRC32 does not match, server crc {deliver.Chunk.Crc}, local crc {crcCalculated}, " +
$"stream {_config.Stream}");
}
}
await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
}, async promotedAsActive =>
{
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private RawConsumerConfig FromStreamConfig(string stream)
IsSingleActiveConsumer = _config.IsSingleActiveConsumer,
ConsumerUpdateListener = _config.ConsumerUpdateListener,
ConsumerFilter = _config.ConsumerFilter,

Crc32 = _config.Crc32,
ConnectionClosedHandler = async (string s) =>
{
// if the stream is still in the consumer list
Expand Down
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public record ConsumerConfig : ReliableConfig
/// </summary>
public ConsumerFilter Filter { get; set; } = null;

/// <summary>
/// Eenables the check of the crc on the delivery when set to an implementation
/// of <see cref="ICrc32"><code>ICrc32</code></see>.
/// >he server will send the crc for each chunk and the client will check it.
/// It is not enabled by default because it is could reduce the performance.
/// </summary>
public ICrc32 Crc32 { get; set; } = null;

public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream)
{
}
Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
InitialCredits = _consumerConfig.InitialCredits,
OffsetSpec = offsetSpec,
ConsumerFilter = _consumerConfig.Filter,
Crc32 = _consumerConfig.Crc32,
ConnectionClosedHandler = async _ =>
{
await TryToReconnect(_consumerConfig.ReconnectStrategy).ConfigureAwait(false);
Expand Down Expand Up @@ -110,6 +111,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer,
InitialCredits = _consumerConfig.InitialCredits,
ConsumerFilter = _consumerConfig.Filter,
Crc32 = _consumerConfig.Crc32,
OffsetSpec = offsetSpecs,
MessageHandler = async (stream, consumer, ctx, message) =>
{
Expand Down
2 changes: 1 addition & 1 deletion Tests/Amqp10Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public void ValidateMapsType()
Assert.Equal(14, AmqpWireFormatting.GetAnySize("string_value"));
Assert.Equal(9, AmqpWireFormatting.GetAnySize(DoubleValue));
Assert.Equal(1, AmqpWireFormatting.GetAnySize(true));
Assert.Equal(1, AmqpWireFormatting.GetAnySize((byte)1));
Assert.Equal(2, AmqpWireFormatting.GetAnySize((byte)1));
Assert.Equal(3, AmqpWireFormatting.GetAnySize((short)1));
// In this case is a byte
Assert.Equal(2, AmqpWireFormatting.GetAnySize(1));
Expand Down
15 changes: 15 additions & 0 deletions Tests/Crc32.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using RabbitMQ.Stream.Client;

namespace Tests;

public class Crc32 : ICrc32
{
public byte[] Hash(byte[] data)
{
return System.IO.Hashing.Crc32.Hash(data);
}
}
4 changes: 4 additions & 0 deletions Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Tests
{
public class ConsumerSystemTests
{
private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper testOutputHelper;

public ConsumerSystemTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -416,6 +417,7 @@ public async void Amqp091MessagesConsumer()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
Crc32 = _crc32,
Reference = "consumer",
MessageHandler = async (consumer, ctx, message) =>
{
Expand Down Expand Up @@ -462,6 +464,7 @@ public async void ConsumerQueryOffset()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
Crc32 = _crc32,
Reference = reference,
OffsetSpec = new OffsetTypeOffset(),
MessageHandler = async (consumer, ctx, message) =>
Expand Down Expand Up @@ -532,6 +535,7 @@ public async void ShouldConsumeFromStoredOffset()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream)
{
Crc32 = _crc32,
Reference = Reference,
OffsetSpec = new OffsetTypeOffset(),
MessageHandler = async (consumer, ctx, _) =>
Expand Down
3 changes: 3 additions & 0 deletions Tests/ReliableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Tests;

public class ReliableTests
{
private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper _testOutputHelper;

public ReliableTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -323,6 +324,7 @@ public async void FirstConsumeAfterKillConnectionShouldContinueToWork()
var messagesReceived = 0;
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
{
Crc32 = _crc32,
Reference = reference,
ClientProvidedName = clientProviderName,
OffsetSpec = new OffsetTypeFirst(),
Expand Down Expand Up @@ -368,6 +370,7 @@ public async void ConsumeAfterKillConnectionShouldContinueToWork()
var messagesReceived = 0;
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
{
Crc32 = _crc32,
Reference = reference,
ClientProvidedName = clientProviderName,
OffsetSpec = new OffsetTypeFirst(),
Expand Down
2 changes: 2 additions & 0 deletions Tests/SuperStreamConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Tests;

public class SuperStreamConsumerTests
{
private readonly ICrc32 _crc32 = new Crc32();
private readonly ITestOutputHelper _testOutputHelper;

public SuperStreamConsumerTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -64,6 +65,7 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished()
var consumer = await system.CreateSuperStreamConsumer(
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange)
{
Crc32 = _crc32,
ClientProvidedName = clientProvidedName,
OffsetSpec = await SystemUtils.OffsetsForSuperStreamConsumer(system, SystemUtils.InvoicesExchange, new OffsetTypeFirst()),
MessageHandler = (stream, consumer1, context, message) =>
Expand Down
Loading

0 comments on commit c93b41d

Please sign in to comment.