Skip to content

Commit

Permalink
Change the type of Exchange and Routing key to CachedString
Browse files Browse the repository at this point in the history
  • Loading branch information
zavarkog committed Jun 27, 2022
1 parent e5e960b commit 2e71e8b
Show file tree
Hide file tree
Showing 50 changed files with 670 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
_autoResetEvent = autoResetEvent;
}

public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, in CachedString exchange, in CachedString routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
{
Expand All @@ -28,7 +28,7 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel
return Task.CompletedTask;
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, in CachedString exchange, in CachedString routingKey,
in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
Expand Down
12 changes: 7 additions & 5 deletions projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System.Text;
using System.Threading;
using BenchmarkDotNet.Attributes;
using RabbitMQ.Client;
using RabbitMQ.Client.ConsumerDispatching;
Expand All @@ -15,9 +16,10 @@ public class ConsumerDispatcherBase
private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent);
protected readonly string _consumerTag = "ConsumerTag";
protected readonly ulong _deliveryTag = 500UL;
protected readonly string _exchange = "Exchange";
protected readonly string _routingKey = "RoutingKey";
protected static readonly CachedString _exchange = new CachedString("Exchange");
protected static readonly CachedString _routingKey = new CachedString("RoutingKey");
protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties();
protected readonly byte[] _method = new byte[512];
protected readonly byte[] _body = new byte[512];
}

Expand All @@ -41,7 +43,7 @@ public void AsyncConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand All @@ -59,7 +61,7 @@ public void ConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand Down
4 changes: 2 additions & 2 deletions projects/Benchmarks/WireFormatting/DataTypeSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ public class DataTypeShortStringSerialization : DataTypeSerialization
private readonly Memory<byte> _populatedShortStringBuffer = GenerateStringBuffer(new string('X', 255));

[Benchmark]
public int ShortstrReadEmpty() => WireFormatting.ReadShortstr(_emptyShortStringBuffer.Span, out _);
public int ShortstrReadEmpty() => WireFormatting.ReadShortstr(_emptyShortStringBuffer.Span, out string _);

[Benchmark]
public int ShortstrReadPopulated() => WireFormatting.ReadShortstr(_populatedShortStringBuffer.Span, out _);
public int ShortstrReadPopulated() => WireFormatting.ReadShortstr(_populatedShortStringBuffer.Span, out string _);

[Benchmark]
public int ShortstrWriteEmpty() => WireFormatting.WriteShortstr(ref _buffer.Span.GetStart(), string.Empty);
Expand Down
8 changes: 2 additions & 6 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ public class MethodFramingBasicAck
[BenchmarkCategory("Framing")]
public class MethodFramingBasicPublish
{
private const string StringValue = "Exchange_OR_RoutingKey";
private BasicPublish _basicPublish = new BasicPublish(StringValue, StringValue, false, false);
private BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false);
private static readonly CachedString StringValue = new CachedString("Exchange_OR_RoutingKey");
private BasicPublish _basicPublish = new BasicPublish(in StringValue, StringValue, false, false);
private EmptyBasicProperty _propertiesEmpty = new EmptyBasicProperty();
private BasicProperties _properties = new BasicProperties { AppId = "Application id", MessageId = "Random message id" };
private readonly ReadOnlyMemory<byte> _bodyEmpty = ReadOnlyMemory<byte>.Empty;
Expand All @@ -45,9 +44,6 @@ public class MethodFramingBasicPublish

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);

[Benchmark]
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
}

[Config(typeof(Config))]
Expand Down
13 changes: 3 additions & 10 deletions projects/Benchmarks/WireFormatting/MethodSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ public class MethodBasicAck : MethodSerializationBase

public class MethodBasicDeliver : MethodSerializationBase
{
private const string StringValue = "Exchange_OR_RoutingKey";
private readonly BasicPublish _basicPublish = new BasicPublish(StringValue, StringValue, false, false);
private readonly BasicPublishMemory _basicPublishMemory = new BasicPublishMemory(Encoding.UTF8.GetBytes(StringValue), Encoding.UTF8.GetBytes(StringValue), false, false);
private static readonly CachedString StringValue = new CachedString("Exchange_OR_RoutingKey");
private readonly BasicPublish _basicPublish = new BasicPublish(in StringValue, in StringValue, false, false);

public override void SetUp()
{
Expand All @@ -45,19 +44,13 @@ public override void SetUp()
}

[Benchmark]
public object BasicDeliverRead() => new BasicDeliver(_buffer.Span)._consumerTag; // return one property to not box when returning an object instead
public object BasicDeliverRead() => new BasicDeliver(_buffer)._consumerTag; // return one property to not box when returning an object instead

[Benchmark]
public int BasicPublishWrite() => _basicPublish.WriteTo(_buffer.Span);

[Benchmark]
public int BasicPublishMemoryWrite() => _basicPublishMemory.WriteTo(_buffer.Span);

[Benchmark]
public int BasicPublishSize() => _basicPublish.GetRequiredBufferSize();

[Benchmark]
public int BasicPublishMemorySize() => _basicPublishMemory.GetRequiredBufferSize();
}

public class MethodChannelClose : MethodSerializationBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
public virtual Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
in CachedString exchange,
in CachedString routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down Expand Up @@ -165,7 +165,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, in CachedString exchange, in CachedString routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
}
Expand Down
64 changes: 61 additions & 3 deletions projects/RabbitMQ.Client/client/api/BasicGetResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace RabbitMQ.Client
/// </remarks>
public sealed class BasicGetResult : IDisposable
{
private readonly byte[] _rentedMethodArray;
private readonly byte[] _rentedArray;

/// <summary>
Expand All @@ -54,6 +55,28 @@ public sealed class BasicGetResult : IDisposable
/// <param name="body">The body</param>
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey,
uint messageCount, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
DeliveryTag = deliveryTag;
Redelivered = redelivered;
Exchange = new CachedString(exchange);
RoutingKey = new CachedString(routingKey);
MessageCount = messageCount;
BasicProperties = basicProperties;
Body = body;
}

/// <summary>
/// Sets the new instance's properties from the arguments passed in.
/// </summary>
/// <param name="deliveryTag">Delivery tag for the message.</param>
/// <param name="redelivered">Redelivered flag for the message</param>
/// <param name="exchange">The exchange this message was published to.</param>
/// <param name="routingKey">Routing key with which the message was published.</param>
/// <param name="messageCount">The number of messages pending on the queue, excluding the message being delivered.</param>
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
/// <param name="body">The body</param>
public BasicGetResult(ulong deliveryTag, bool redelivered, in CachedString exchange, in CachedString routingKey,
uint messageCount, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
DeliveryTag = deliveryTag;
Redelivered = redelivered;
Expand All @@ -74,9 +97,38 @@ public sealed class BasicGetResult : IDisposable
/// <param name="messageCount">The number of messages pending on the queue, excluding the message being delivered.</param>
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
/// <param name="body">The body</param>
/// <param name="rentedMethodArray">The rented array which method is part of.</param>
/// <param name="rentedArray">The rented array which body is part of.</param>
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey,
uint messageCount, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
uint messageCount, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body,
byte[] rentedMethodArray, byte[] rentedArray)
{
DeliveryTag = deliveryTag;
Redelivered = redelivered;
Exchange = new CachedString(exchange);
RoutingKey = new CachedString(routingKey);
MessageCount = messageCount;
BasicProperties = basicProperties;
Body = body;
_rentedMethodArray = rentedMethodArray;
_rentedArray = rentedArray;
}

/// <summary>
/// Sets the new instance's properties from the arguments passed in.
/// </summary>
/// <param name="deliveryTag">Delivery tag for the message.</param>
/// <param name="redelivered">Redelivered flag for the message</param>
/// <param name="exchange">The exchange this message was published to.</param>
/// <param name="routingKey">Routing key with which the message was published.</param>
/// <param name="messageCount">The number of messages pending on the queue, excluding the message being delivered.</param>
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
/// <param name="body">The body</param>
/// <param name="rentedMethodArray">The rented array which method is part of.</param>
/// <param name="rentedArray">The rented array which body is part of.</param>
public BasicGetResult(ulong deliveryTag, bool redelivered, in CachedString exchange, in CachedString routingKey,
uint messageCount, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body,
byte[] rentedMethodArray, byte[] rentedArray)
{
DeliveryTag = deliveryTag;
Redelivered = redelivered;
Expand All @@ -85,6 +137,7 @@ public sealed class BasicGetResult : IDisposable
MessageCount = messageCount;
BasicProperties = basicProperties;
Body = body;
_rentedMethodArray = rentedMethodArray;
_rentedArray = rentedArray;
}

Expand All @@ -106,7 +159,7 @@ public sealed class BasicGetResult : IDisposable
/// <summary>
/// Retrieve the exchange this message was published to.
/// </summary>
public string Exchange { get; }
public CachedString Exchange { get; }

/// <summary>
/// Retrieve the number of messages pending on the queue, excluding the message being delivered.
Expand All @@ -125,11 +178,16 @@ public sealed class BasicGetResult : IDisposable
/// <summary>
/// Retrieve the routing key with which this message was published.
/// </summary>
public string RoutingKey { get; }
public CachedString RoutingKey { get; }

/// <inheritdoc />
public void Dispose()
{
if (_rentedMethodArray != null)
{
ArrayPool<byte>.Shared.Return(_rentedMethodArray);
}

if (_rentedArray != null)
{
ArrayPool<byte>.Shared.Return(_rentedArray);
Expand Down
Loading

0 comments on commit 2e71e8b

Please sign in to comment.