Skip to content

Commit

Permalink
Change the type of Exchange and Routing key to ROM
Browse files Browse the repository at this point in the history
  • Loading branch information
zavarkog committed Jun 25, 2022
1 parent e5e960b commit 926b2f8
Show file tree
Hide file tree
Showing 24 changed files with 92 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
_autoResetEvent = autoResetEvent;
}

public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
{
Expand All @@ -28,7 +28,7 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel
return Task.CompletedTask;
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
Expand Down
12 changes: 7 additions & 5 deletions projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System.Text;
using System.Threading;
using BenchmarkDotNet.Attributes;
using RabbitMQ.Client;
using RabbitMQ.Client.ConsumerDispatching;
Expand All @@ -15,9 +16,10 @@ public class ConsumerDispatcherBase
private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent);
protected readonly string _consumerTag = "ConsumerTag";
protected readonly ulong _deliveryTag = 500UL;
protected readonly string _exchange = "Exchange";
protected readonly string _routingKey = "RoutingKey";
protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange");
protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey");
protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties();
protected readonly byte[] _method = new byte[512];
protected readonly byte[] _body = new byte[512];
}

Expand All @@ -41,7 +43,7 @@ public void AsyncConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand All @@ -59,7 +61,7 @@ public void ConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand Down
2 changes: 1 addition & 1 deletion projects/Benchmarks/WireFormatting/MethodSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public override void SetUp()
}

[Benchmark]
public object BasicDeliverRead() => new BasicDeliver(_buffer.Span)._consumerTag; // return one property to not box when returning an object instead
public object BasicDeliverRead() => new BasicDeliver(_buffer)._consumerTag; // return one property to not box when returning an object instead

[Benchmark]
public int BasicPublishWrite() => _basicPublish.WriteTo(_buffer.Span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
public virtual Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down Expand Up @@ -165,7 +165,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
}
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ public virtual void HandleBasicConsumeOk(string consumerTag)
public virtual void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public interface IAsyncBasicConsumer
Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body);

Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public interface IBasicConsumer
void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public override async Task HandleBasicConsumeOk(string consumerTag)
}

///<summary>Fires the Received event.</summary>
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
// No need to call base, it's empty.
return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public BasicDeliverEventArgs()
public BasicDeliverEventArgs(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down Expand Up @@ -77,13 +77,13 @@ public BasicDeliverEventArgs()

///<summary>The exchange the message was originally published
///to.</summary>
public string Exchange { get; set; }
public ReadOnlyMemory<byte> Exchange { get; set; }

///<summary>The AMQP "redelivered" flag.</summary>
public bool Redelivered { get; set; }

///<summary>The routing key used when the message was
///originally published.</summary>
public string RoutingKey { get; set; }
public ReadOnlyMemory<byte> RoutingKey { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public override void HandleBasicConsumeOk(string consumerTag)
/// Accessing the body at a later point is unsafe as its memory can
/// be already released.
/// </remarks>
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Received?.Invoke(
Expand Down
16 changes: 8 additions & 8 deletions projects/RabbitMQ.Client/client/framing/BasicDeliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ namespace RabbitMQ.Client.Framing.Impl
public readonly string _consumerTag;
public readonly ulong _deliveryTag;
public readonly bool _redelivered;
public readonly string _exchange;
public readonly string _routingKey;
public readonly ReadOnlyMemory<byte> _exchange;
public readonly ReadOnlyMemory<byte> _routingKey;

public BasicDeliver(ReadOnlySpan<byte> span)
public BasicDeliver(ReadOnlyMemory<byte> data)
{
int offset = WireFormatting.ReadShortstr(span, out _consumerTag);
offset += WireFormatting.ReadLonglong(span.Slice(offset), out _deliveryTag);
offset += WireFormatting.ReadBits(span.Slice(offset), out _redelivered);
offset += WireFormatting.ReadShortstr(span.Slice(offset), out _exchange);
WireFormatting.ReadShortstr(span.Slice(offset), out _routingKey);
int offset = WireFormatting.ReadShortstr(data.Span, out _consumerTag);
offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag);
offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered);
offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange);
WireFormatting.ReadShortMemory(data.Slice(offset), out _routingKey);
}

public ProtocolCommandId ProtocolCommandId => ProtocolCommandId.BasicDeliver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ protected override async Task ProcessChannelAsync()
}
finally
{
if (work.RentedMethodArray != null)
{
ArrayPool<byte>.Shared.Return(work.RentedMethodArray);
}

if (work.RentedArray != null)
{
ArrayPool<byte>.Shared.Return(work.RentedArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ protected override async Task ProcessChannelAsync()
}
finally
{
if (work.RentedMethodArray != null)
{
ArrayPool<byte>.Shared.Return(work.RentedMethodArray);
}

if (work.RentedArray != null)
{
ArrayPool<byte>.Shared.Return(work.RentedArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag)
}

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedMethodArray, byte[] rentedArray)
{
if (!IsShutdown)
{
_writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray));
_writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray));
}
}

Expand Down Expand Up @@ -108,10 +108,11 @@ public Task WaitForShutdownAsync()
public readonly string? ConsumerTag;
public readonly ulong DeliveryTag;
public readonly bool Redelivered;
public readonly string? Exchange;
public readonly string? RoutingKey;
public readonly ReadOnlyMemory<byte> Exchange;
public readonly ReadOnlyMemory<byte> RoutingKey;
public readonly ReadOnlyBasicProperties BasicProperties;
public readonly ReadOnlyMemory<byte> Body;
public readonly byte[]? RentedMethodArray;
public readonly byte[]? RentedArray;
public readonly ShutdownEventArgs? Reason;
public readonly WorkType WorkType;
Expand All @@ -133,7 +134,7 @@ public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
}

public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedMethodArray, byte[] rentedArray)
{
WorkType = WorkType.Deliver;
Consumer = consumer;
Expand All @@ -144,6 +145,7 @@ public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
RoutingKey = routingKey;
BasicProperties = basicProperties;
Body = body;
RentedMethodArray = rentedMethodArray;
RentedArray = rentedArray;
Reason = default;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicConsumeOk)} for tag {consumerTag}");
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties,
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicDeliver)} for tag {consumerTag}");
Expand Down Expand Up @@ -66,7 +66,7 @@ Task IAsyncBasicConsumer.HandleBasicConsumeOk(string consumerTag)
return Task.CompletedTask;
}

Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties,
Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
((IBasicConsumer)this).HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ internal interface IConsumerDispatcher
void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties basicProperties,
ReadOnlyMemory<byte> body,
byte[] rentedMethodArray,
byte[] rentedArray);

void HandleBasicCancelOk(string consumerTag);
Expand Down
5 changes: 5 additions & 0 deletions projects/RabbitMQ.Client/client/impl/IncomingCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public IncomingCommand(ProtocolCommandId commandId, ReadOnlyMemory<byte> methodB
_rentedBodyArray = rentedBodyArray;
}

public byte[] TakeoverMethod()
{
return _rentedMethodBytes;
}

public byte[] TakeoverBody()
{
return _rentedBodyArray;
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,7 @@ protected void HandleBasicConsumeOk(in IncomingCommand cmd)

protected void HandleBasicDeliver(in IncomingCommand cmd)
{
var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes.Span);
cmd.ReturnMethodBuffer();
var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes);
var header = new ReadOnlyBasicProperties(cmd.HeaderBytes.Span);
cmd.ReturnHeaderBuffer();

Expand All @@ -586,6 +585,7 @@ protected void HandleBasicDeliver(in IncomingCommand cmd)
method._routingKey,
header,
cmd.Body,
cmd.TakeoverMethod(),
cmd.TakeoverBody());
}

Expand Down
21 changes: 21 additions & 0 deletions projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,27 @@ public static int ReadShortstr(ReadOnlySpan<byte> span, out string value)
return ThrowArgumentOutOfRangeException(span.Length, byteCount + 1);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int ReadShortMemory(ReadOnlyMemory<byte> data, out ReadOnlyMemory<byte> value)
{
int byteCount = data.Span[0];
if (byteCount == 0)
{
value = default;
return 1;
}

// equals data.Length >= byteCount + 1
if (data.Length > byteCount)
{
value = data.Slice(1, byteCount);
return 1 + byteCount;
}

value = default;
return ThrowArgumentOutOfRangeException(data.Length, byteCount + 1);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int ReadBits(ReadOnlySpan<byte> span, out bool val)
{
Expand Down
4 changes: 2 additions & 2 deletions projects/Unit/TestAsyncConsumerExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ public ConsumerFailingOnDelivery(IModel model) : base(model)
public override Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down
4 changes: 2 additions & 2 deletions projects/Unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,8 +1210,8 @@ public TestBasicConsumer(IModel model, ushort totalMessageCount, ManualResetEven
public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down
4 changes: 2 additions & 2 deletions projects/Unit/TestConsumerExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public ConsumerFailingOnDelivery(IModel model) : base(model)
public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down

0 comments on commit 926b2f8

Please sign in to comment.