From 926b2f848576e9afc7033ca3d5e3728538458ce6 Mon Sep 17 00:00:00 2001 From: zavarkog Date: Sat, 25 Jun 2022 09:15:26 +0200 Subject: [PATCH] Change the type of Exchange and Routing key to ROM --- .../AsyncBasicConsumerFake.cs | 4 ++-- .../ConsumerDispatching/ConsumerDispatcher.cs | 12 ++++++----- .../WireFormatting/MethodSerialization.cs | 2 +- .../client/api/AsyncDefaultBasicConsumer.cs | 6 +++--- .../client/api/DefaultBasicConsumer.cs | 4 ++-- .../client/api/IAsyncBasicConsumer.cs | 4 ++-- .../client/api/IBasicConsumer.cs | 4 ++-- .../events/AsyncEventingBasicConsumer.cs | 2 +- .../client/events/BasicDeliverEventArgs.cs | 8 +++---- .../client/events/EventingBasicConsumer.cs | 2 +- .../client/framing/BasicDeliver.cs | 16 +++++++------- .../AsyncConsumerDispatcher.cs | 5 +++++ .../ConsumerDispatching/ConsumerDispatcher.cs | 5 +++++ .../ConsumerDispatcherChannelBase.cs | 12 ++++++----- .../ConsumerDispatching/FallbackConsumer.cs | 4 ++-- .../IConsumerDispatcher.cs | 5 +++-- .../client/impl/IncomingCommand.cs | 5 +++++ .../RabbitMQ.Client/client/impl/ModelBase.cs | 4 ++-- .../client/impl/WireFormatting.Read.cs | 21 +++++++++++++++++++ projects/Unit/TestAsyncConsumerExceptions.cs | 4 ++-- projects/Unit/TestConnectionRecovery.cs | 4 ++-- projects/Unit/TestConsumerExceptions.cs | 4 ++-- .../Unit/TestConsumerOperationDispatch.cs | 2 +- projects/Unit/TestMainLoop.cs | 4 ++-- 24 files changed, 92 insertions(+), 51 deletions(-) diff --git a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs index 38e38a8a1a..abb1bbb938 100644 --- a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs +++ b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs @@ -18,7 +18,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent) _autoResetEvent = autoResetEvent; } - public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { if (Interlocked.Increment(ref _current) == Count) { @@ -28,7 +28,7 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel return Task.CompletedTask; } - void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, + void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { if (Interlocked.Increment(ref _current) == Count) diff --git a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs index b751091c4d..028396007e 100644 --- a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System.Text; +using System.Threading; using BenchmarkDotNet.Attributes; using RabbitMQ.Client; using RabbitMQ.Client.ConsumerDispatching; @@ -15,9 +16,10 @@ public class ConsumerDispatcherBase private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent); protected readonly string _consumerTag = "ConsumerTag"; protected readonly ulong _deliveryTag = 500UL; - protected readonly string _exchange = "Exchange"; - protected readonly string _routingKey = "RoutingKey"; + protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange"); + protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey"); protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties(); + protected readonly byte[] _method = new byte[512]; protected readonly byte[] _body = new byte[512]; } @@ -41,7 +43,7 @@ public void AsyncConsumerDispatcher() { for (int i = 0; i < Count; i++) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body); + _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); @@ -59,7 +61,7 @@ public void ConsumerDispatcher() { for (int i = 0; i < Count; i++) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body); + _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); diff --git a/projects/Benchmarks/WireFormatting/MethodSerialization.cs b/projects/Benchmarks/WireFormatting/MethodSerialization.cs index 1a67657c80..eb406e98d1 100644 --- a/projects/Benchmarks/WireFormatting/MethodSerialization.cs +++ b/projects/Benchmarks/WireFormatting/MethodSerialization.cs @@ -45,7 +45,7 @@ public override void SetUp() } [Benchmark] - public object BasicDeliverRead() => new BasicDeliver(_buffer.Span)._consumerTag; // return one property to not box when returning an object instead + public object BasicDeliverRead() => new BasicDeliver(_buffer)._consumerTag; // return one property to not box when returning an object instead [Benchmark] public int BasicPublishWrite() => _basicPublish.WriteTo(_buffer.Span); diff --git a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs index fdab6e02dd..8db3f2a1b0 100644 --- a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs @@ -110,8 +110,8 @@ public virtual Task HandleBasicConsumeOk(string consumerTag) public virtual Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { @@ -165,7 +165,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag) throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); } - void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); } diff --git a/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs index 9e1b92beb0..0f490baf4b 100644 --- a/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs @@ -150,8 +150,8 @@ public virtual void HandleBasicConsumeOk(string consumerTag) public virtual void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { diff --git a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs index 24e787d691..da09792a9e 100644 --- a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs @@ -49,8 +49,8 @@ public interface IAsyncBasicConsumer Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body); diff --git a/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs index 94627c5a35..1034585ce8 100644 --- a/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs @@ -92,8 +92,8 @@ public interface IBasicConsumer void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body); diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index ac83df6b25..b7e7835186 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -71,7 +71,7 @@ public override async Task HandleBasicConsumeOk(string consumerTag) } ///Fires the Received event. - public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { // No need to call base, it's empty. return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)); diff --git a/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs b/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs index 96e90f4cdf..e3d86bb7ca 100644 --- a/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs @@ -47,8 +47,8 @@ public BasicDeliverEventArgs() public BasicDeliverEventArgs(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { @@ -77,13 +77,13 @@ public BasicDeliverEventArgs(string consumerTag, ///The exchange the message was originally published ///to. - public string Exchange { get; set; } + public ReadOnlyMemory Exchange { get; set; } ///The AMQP "redelivered" flag. public bool Redelivered { get; set; } ///The routing key used when the message was ///originally published. - public string RoutingKey { get; set; } + public ReadOnlyMemory RoutingKey { get; set; } } } diff --git a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs index 842ce37a11..159c7b84d4 100644 --- a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs @@ -84,7 +84,7 @@ public override void HandleBasicConsumeOk(string consumerTag) /// Accessing the body at a later point is unsafe as its memory can /// be already released. /// - public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); Received?.Invoke( diff --git a/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs b/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs index ebc49cd4da..67dfc38493 100644 --- a/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs +++ b/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs @@ -41,16 +41,16 @@ namespace RabbitMQ.Client.Framing.Impl public readonly string _consumerTag; public readonly ulong _deliveryTag; public readonly bool _redelivered; - public readonly string _exchange; - public readonly string _routingKey; + public readonly ReadOnlyMemory _exchange; + public readonly ReadOnlyMemory _routingKey; - public BasicDeliver(ReadOnlySpan span) + public BasicDeliver(ReadOnlyMemory data) { - int offset = WireFormatting.ReadShortstr(span, out _consumerTag); - offset += WireFormatting.ReadLonglong(span.Slice(offset), out _deliveryTag); - offset += WireFormatting.ReadBits(span.Slice(offset), out _redelivered); - offset += WireFormatting.ReadShortstr(span.Slice(offset), out _exchange); - WireFormatting.ReadShortstr(span.Slice(offset), out _routingKey); + int offset = WireFormatting.ReadShortstr(data.Span, out _consumerTag); + offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag); + offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered); + offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange); + WireFormatting.ReadShortMemory(data.Slice(offset), out _routingKey); } public ProtocolCommandId ProtocolCommandId => ProtocolCommandId.BasicDeliver; diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs index d51177da06..7ed4e21d63 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -39,6 +39,11 @@ protected override async Task ProcessChannelAsync() } finally { + if (work.RentedMethodArray != null) + { + ArrayPool.Shared.Return(work.RentedMethodArray); + } + if (work.RentedArray != null) { ArrayPool.Shared.Return(work.RentedArray); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs index 37d0c69366..b8ba70c35f 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs @@ -49,6 +49,11 @@ protected override async Task ProcessChannelAsync() } finally { + if (work.RentedMethodArray != null) + { + ArrayPool.Shared.Return(work.RentedMethodArray); + } + if (work.RentedArray != null) { ArrayPool.Shared.Return(work.RentedArray); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index ff5d3535f4..2884f0c76c 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -54,11 +54,11 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag) } public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedArray) + ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedMethodArray, byte[] rentedArray) { if (!IsShutdown) { - _writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray)); + _writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray)); } } @@ -108,10 +108,11 @@ protected readonly struct WorkStruct public readonly string? ConsumerTag; public readonly ulong DeliveryTag; public readonly bool Redelivered; - public readonly string? Exchange; - public readonly string? RoutingKey; + public readonly ReadOnlyMemory Exchange; + public readonly ReadOnlyMemory RoutingKey; public readonly ReadOnlyBasicProperties BasicProperties; public readonly ReadOnlyMemory Body; + public readonly byte[]? RentedMethodArray; public readonly byte[]? RentedArray; public readonly ShutdownEventArgs? Reason; public readonly WorkType WorkType; @@ -133,7 +134,7 @@ public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason) } public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedArray) + ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedMethodArray, byte[] rentedArray) { WorkType = WorkType.Deliver; Consumer = consumer; @@ -144,6 +145,7 @@ public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag RoutingKey = routingKey; BasicProperties = basicProperties; Body = body; + RentedMethodArray = rentedMethodArray; RentedArray = rentedArray; Reason = default; } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs index 866cba8069..070c1d36fa 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs @@ -37,7 +37,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag) ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicConsumeOk)} for tag {consumerTag}"); } - void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, + void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicDeliver)} for tag {consumerTag}"); @@ -66,7 +66,7 @@ Task IAsyncBasicConsumer.HandleBasicConsumeOk(string consumerTag) return Task.CompletedTask; } - Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, + Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { ((IBasicConsumer)this).HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index ba7c462529..dda93e7577 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -48,10 +48,11 @@ internal interface IConsumerDispatcher void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, + byte[] rentedMethodArray, byte[] rentedArray); void HandleBasicCancelOk(string consumerTag); diff --git a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs index 5f3aa7fbb8..b8d33ec2a4 100644 --- a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs +++ b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs @@ -32,6 +32,11 @@ public IncomingCommand(ProtocolCommandId commandId, ReadOnlyMemory methodB _rentedBodyArray = rentedBodyArray; } + public byte[] TakeoverMethod() + { + return _rentedMethodBytes; + } + public byte[] TakeoverBody() { return _rentedBodyArray; diff --git a/projects/RabbitMQ.Client/client/impl/ModelBase.cs b/projects/RabbitMQ.Client/client/impl/ModelBase.cs index 6913218232..10489e47f2 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelBase.cs @@ -573,8 +573,7 @@ protected void HandleBasicConsumeOk(in IncomingCommand cmd) protected void HandleBasicDeliver(in IncomingCommand cmd) { - var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes.Span); - cmd.ReturnMethodBuffer(); + var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes); var header = new ReadOnlyBasicProperties(cmd.HeaderBytes.Span); cmd.ReturnHeaderBuffer(); @@ -586,6 +585,7 @@ protected void HandleBasicDeliver(in IncomingCommand cmd) method._routingKey, header, cmd.Body, + cmd.TakeoverMethod(), cmd.TakeoverBody()); } diff --git a/projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs b/projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs index 085c052cd0..0c10581756 100644 --- a/projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs +++ b/projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs @@ -189,6 +189,27 @@ public static int ReadShortstr(ReadOnlySpan span, out string value) return ThrowArgumentOutOfRangeException(span.Length, byteCount + 1); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int ReadShortMemory(ReadOnlyMemory data, out ReadOnlyMemory value) + { + int byteCount = data.Span[0]; + if (byteCount == 0) + { + value = default; + return 1; + } + + // equals data.Length >= byteCount + 1 + if (data.Length > byteCount) + { + value = data.Slice(1, byteCount); + return 1 + byteCount; + } + + value = default; + return ThrowArgumentOutOfRangeException(data.Length, byteCount + 1); + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int ReadBits(ReadOnlySpan span, out bool val) { diff --git a/projects/Unit/TestAsyncConsumerExceptions.cs b/projects/Unit/TestAsyncConsumerExceptions.cs index ee11f8ea69..6ac2cd8ab1 100644 --- a/projects/Unit/TestAsyncConsumerExceptions.cs +++ b/projects/Unit/TestAsyncConsumerExceptions.cs @@ -125,8 +125,8 @@ public ConsumerFailingOnDelivery(IModel model) : base(model) public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { diff --git a/projects/Unit/TestConnectionRecovery.cs b/projects/Unit/TestConnectionRecovery.cs index eef7b84e33..518de2ecef 100644 --- a/projects/Unit/TestConnectionRecovery.cs +++ b/projects/Unit/TestConnectionRecovery.cs @@ -1210,8 +1210,8 @@ public TestBasicConsumer(IModel model, ushort totalMessageCount, ManualResetEven public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { diff --git a/projects/Unit/TestConsumerExceptions.cs b/projects/Unit/TestConsumerExceptions.cs index 63b91a8408..1cccf284cd 100644 --- a/projects/Unit/TestConsumerExceptions.cs +++ b/projects/Unit/TestConsumerExceptions.cs @@ -49,8 +49,8 @@ public ConsumerFailingOnDelivery(IModel model) : base(model) public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { diff --git a/projects/Unit/TestConsumerOperationDispatch.cs b/projects/Unit/TestConsumerOperationDispatch.cs index b48e9615a8..86ba7704e6 100644 --- a/projects/Unit/TestConsumerOperationDispatch.cs +++ b/projects/Unit/TestConsumerOperationDispatch.cs @@ -87,7 +87,7 @@ public CollectingConsumer(IModel model) } public override void HandleBasicDeliver(string consumerTag, - ulong deliveryTag, bool redelivered, string exchange, string routingKey, + ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { // we test concurrent dispatch from the moment basic.delivery is returned. diff --git a/projects/Unit/TestMainLoop.cs b/projects/Unit/TestMainLoop.cs index 23f9edee9b..889bc29ecc 100644 --- a/projects/Unit/TestMainLoop.cs +++ b/projects/Unit/TestMainLoop.cs @@ -53,8 +53,8 @@ public FaultyConsumer(IModel model) : base(model) { } public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) {