diff --git a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs index 028396007e..d25d7ba106 100644 --- a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs @@ -15,6 +15,7 @@ public class ConsumerDispatcherBase private protected IConsumerDispatcher _dispatcher; private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent); protected readonly string _consumerTag = "ConsumerTag"; + protected static readonly byte[] _consumerTagBytes = Encoding.UTF8.GetBytes("ConsumerTag"); protected readonly ulong _deliveryTag = 500UL; protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange"); protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey"); @@ -43,7 +44,7 @@ public void AsyncConsumerDispatcher() { for (int i = 0; i < Count; i++) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); + _dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); @@ -61,7 +62,7 @@ public void ConsumerDispatcher() { for (int i = 0; i < Count; i++) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); + _dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); diff --git a/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs b/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs index 67dfc38493..b0664d97fa 100644 --- a/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs +++ b/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs @@ -38,7 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl { internal readonly struct BasicDeliver : IAmqpMethod { - public readonly string _consumerTag; + public readonly ReadOnlyMemory _consumerTag; public readonly ulong _deliveryTag; public readonly bool _redelivered; public readonly ReadOnlyMemory _exchange; @@ -46,7 +46,7 @@ namespace RabbitMQ.Client.Framing.Impl public BasicDeliver(ReadOnlyMemory data) { - int offset = WireFormatting.ReadShortstr(data.Span, out _consumerTag); + int offset = WireFormatting.ReadShortMemory(data, out _consumerTag); offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag); offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered); offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs index c6b90f4287..87ba94ff5f 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs @@ -1,6 +1,10 @@ -using System.Collections.Generic; +using System; +using System.Buffers; +using System.Collections.Generic; using System.Runtime.CompilerServices; +using System.Text; using System.Threading.Tasks; +using RabbitMQ.Util; namespace RabbitMQ.Client.ConsumerDispatching { @@ -8,28 +12,47 @@ namespace RabbitMQ.Client.ConsumerDispatching internal abstract class ConsumerDispatcherBase { private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer(); - private readonly Dictionary _consumers; + private readonly Dictionary, (IBasicConsumer consumer, string consumerTag)> _consumers; public IBasicConsumer? DefaultConsumer { get; set; } protected ConsumerDispatcherBase() { - _consumers = new Dictionary(); + _consumers = new Dictionary, (IBasicConsumer, string)>(MemoryOfByteEqualityComparer.Instance); } protected void AddConsumer(IBasicConsumer consumer, string tag) { lock (_consumers) { - _consumers[tag] = consumer; + var tagBytes = Encoding.UTF8.GetBytes(tag); + _consumers[tagBytes] = (consumer, tag); } } - protected IBasicConsumer GetConsumerOrDefault(string tag) + protected (IBasicConsumer consumer, string consumerTag) GetConsumerOrDefault(ReadOnlyMemory tag) { lock (_consumers) { - return _consumers.TryGetValue(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer(); + if (_consumers.TryGetValue(tag, out var consumerPair)) + { + return consumerPair; + } + +#if !NETSTANDARD + var consumerTag = Encoding.UTF8.GetString(tag.Span); +#else + string consumerTag; + unsafe + { + fixed (byte* bytes = tag.Span) + { + consumerTag = Encoding.UTF8.GetString(bytes, tag.Length); + } + } +#endif + + return (GetDefaultOrFallbackConsumer(), consumerTag); } } @@ -37,7 +60,18 @@ public IBasicConsumer GetAndRemoveConsumer(string tag) { lock (_consumers) { - return _consumers.Remove(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer(); + var utf8 = Encoding.UTF8; + var pool = ArrayPool.Shared; + var buf = pool.Rent(utf8.GetMaxByteCount(tag.Length)); +#if NETSTANDARD + int count = utf8.GetBytes(tag, 0, tag.Length, buf, 0); +#else + int count = utf8.GetBytes(tag, buf); +#endif + var memory = buf.AsMemory(0, count); + var result = _consumers.Remove(memory, out var consumerPair) ? consumerPair.consumer : GetDefaultOrFallbackConsumer(); + pool.Return(buf); + return result; } } @@ -45,9 +79,9 @@ public Task ShutdownAsync(ShutdownEventArgs reason) { lock (_consumers) { - foreach (KeyValuePair pair in _consumers) + foreach (KeyValuePair, (IBasicConsumer consumer, string consumerTag)> pair in _consumers) { - ShutdownConsumer(pair.Value, reason); + ShutdownConsumer(pair.Value.consumer, reason); } _consumers.Clear(); } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 3d1a7174d4..9c0e5926ff 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -53,12 +53,13 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag) } } - public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, + public void HandleBasicDeliver(ReadOnlyMemory consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedMethodArray, byte[] rentedArray) { if (!IsShutdown) { - _writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray)); + var consumerPair = GetConsumerOrDefault(consumerTag); + _writer.TryWrite(new WorkStruct(consumerPair.consumer, consumerPair.consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray)); } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index dda93e7577..43c6b3e03c 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -45,7 +45,7 @@ internal interface IConsumerDispatcher void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag); - void HandleBasicDeliver(string consumerTag, + void HandleBasicDeliver(ReadOnlyMemory consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, diff --git a/projects/RabbitMQ.Client/util/MemoryOfByteEqualityComparer.cs b/projects/RabbitMQ.Client/util/MemoryOfByteEqualityComparer.cs new file mode 100644 index 0000000000..4504980c3f --- /dev/null +++ b/projects/RabbitMQ.Client/util/MemoryOfByteEqualityComparer.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; + +namespace RabbitMQ.Util; + +internal sealed class MemoryOfByteEqualityComparer : IEqualityComparer> +{ + public static MemoryOfByteEqualityComparer Instance { get; } = new MemoryOfByteEqualityComparer(); + + public bool Equals(ReadOnlyMemory left, ReadOnlyMemory right) + { + return left.Span.SequenceEqual(right.Span); + } + + public int GetHashCode(ReadOnlyMemory value) + { +#if NETSTANDARD + unchecked + { + int hashCode = 0; + var longPart = MemoryMarshal.Cast(value.Span); + foreach (long item in longPart) + { + hashCode = (hashCode * 397) ^ item.GetHashCode(); + } + + foreach (int item in value.Span.Slice(longPart.Length * 8)) + { + hashCode = (hashCode * 397) ^ item.GetHashCode(); + } + + return hashCode; + } +#else + HashCode result = default; + result.AddBytes(value.Span); + return result.ToHashCode(); +#endif + } +}