diff --git a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs index d5918dc06d..15e5534adf 100644 --- a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using RabbitMQ.Client; using RabbitMQ.Client.ConsumerDispatching; @@ -8,7 +9,7 @@ namespace RabbitMQ.Benchmarks { [Config(typeof(Config))] [BenchmarkCategory("ConsumerDispatcher")] - internal class ConsumerDispatcherBase + public class ConsumerDispatcherBase { protected static readonly ManualResetEventSlim _autoResetEvent = new ManualResetEventSlim(false); @@ -19,18 +20,16 @@ internal class ConsumerDispatcherBase protected readonly string _exchange = "Exchange"; protected readonly string _routingKey = "RoutingKey"; protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties(); - protected readonly RentedMemory _body; + protected readonly byte[] _body = new byte[512]; public ConsumerDispatcherBase() { var r = new Random(); - byte[] body = new byte[512]; - r.NextBytes(body); - _body = new RentedMemory(body); + r.NextBytes(_body); } } - internal class BasicDeliverConsumerDispatching : ConsumerDispatcherBase + public class BasicDeliverConsumerDispatching : ConsumerDispatcherBase { [Params(1, 30)] public int Count { get; set; } @@ -39,41 +38,49 @@ internal class BasicDeliverConsumerDispatching : ConsumerDispatcherBase public int Concurrency { get; set; } [GlobalSetup(Target = nameof(AsyncConsumerDispatcher))] - public void SetUpAsyncConsumer() + public async Task SetUpAsyncConsumer() { _consumer.Count = Count; _dispatcher = new AsyncConsumerDispatcher(null, Concurrency); - _dispatcher.HandleBasicConsumeOk(_consumer, _consumerTag); + await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None); } [Benchmark] - public void AsyncConsumerDispatcher() + public async Task AsyncConsumerDispatcher() { - for (int i = 0; i < Count; i++) + using (RentedMemory body = new RentedMemory(_body)) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body); + for (int i = 0; i < Count; i++) + { + await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body, + CancellationToken.None); + } + _autoResetEvent.Wait(); + _autoResetEvent.Reset(); } - _autoResetEvent.Wait(); - _autoResetEvent.Reset(); } [GlobalSetup(Target = nameof(ConsumerDispatcher))] - public void SetUpConsumer() + public async Task SetUpConsumer() { _consumer.Count = Count; _dispatcher = new ConsumerDispatcher(null, Concurrency); - _dispatcher.HandleBasicConsumeOk(_consumer, _consumerTag); + await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None); } [Benchmark] - public void ConsumerDispatcher() + public async Task ConsumerDispatcher() { - for (int i = 0; i < Count; i++) + using (RentedMemory body = new RentedMemory(_body)) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body); + for (int i = 0; i < Count; i++) + { + await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body, + CancellationToken.None); + } + _autoResetEvent.Wait(); + _autoResetEvent.Reset(); } - _autoResetEvent.Wait(); - _autoResetEvent.Reset(); } } } diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 9ffaa92b90..02de2f500b 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -78,13 +78,13 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella { case ProtocolCommandId.BasicCancel: { - HandleBasicCancel(in cmd); - return Task.FromResult(true); + // Note: always returns true + return HandleBasicCancelAsync(cmd, cancellationToken); } case ProtocolCommandId.BasicDeliver: { - HandleBasicDeliver(in cmd); - return Task.FromResult(true); + // Note: always returns true + return HandleBasicDeliverAsync(cmd, cancellationToken); } case ProtocolCommandId.BasicAck: { @@ -108,8 +108,8 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.ChannelCloseOk: { - HandleChannelCloseOk(in cmd); - return Task.FromResult(true); + // Note: always returns true + return HandleChannelCloseOkAsync(cmd, cancellationToken); } case ProtocolCommandId.ChannelFlow: { @@ -128,8 +128,8 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.ConnectionSecure: { - HandleConnectionSecure(in cmd); - return Task.FromResult(true); + // Note: always returns true + return HandleConnectionSecureAsync(cmd); } case ProtocolCommandId.ConnectionStart: { @@ -138,8 +138,8 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.ConnectionTune: { - HandleConnectionTune(in cmd); - return Task.FromResult(true); + // Note: always returns true + return HandleConnectionTuneAsync(cmd); } case ProtocolCommandId.ConnectionUnblocked: { diff --git a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs index 92a5bd5197..ce8ca0f853 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs @@ -41,7 +41,7 @@ namespace RabbitMQ.Client.Impl { - internal abstract class AsyncRpcContinuation : IRpcContinuation, IDisposable + internal abstract class AsyncRpcContinuation : IRpcContinuation { private readonly CancellationTokenSource _cancellationTokenSource; private readonly CancellationTokenRegistration _cancellationTokenRegistration; @@ -101,7 +101,7 @@ public ConfiguredTaskAwaitable.ConfiguredTaskAwaiter GetAwaiter() return _tcsConfiguredTaskAwaitable.GetAwaiter(); } - public abstract void HandleCommand(in IncomingCommand cmd); + public abstract Task HandleCommandAsync(IncomingCommand cmd); public virtual void HandleChannelShutdown(ShutdownEventArgs reason) { @@ -135,7 +135,7 @@ public ConnectionSecureOrTuneAsyncRpcContinuation(TimeSpan continuationTimeout) { } - public override void HandleCommand(in IncomingCommand cmd) + public override Task HandleCommandAsync(IncomingCommand cmd) { try { @@ -156,6 +156,8 @@ public override void HandleCommand(in IncomingCommand cmd) { _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } finally { @@ -173,7 +175,7 @@ public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan _expectedCommandId = expectedCommandId; } - public override void HandleCommand(in IncomingCommand cmd) + public override Task HandleCommandAsync(IncomingCommand cmd) { try { @@ -185,6 +187,8 @@ public override void HandleCommand(in IncomingCommand cmd) { _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } finally { @@ -205,7 +209,7 @@ public BasicCancelAsyncRpcContinuation(string consumerTag, IConsumerDispatcher c _consumerDispatcher = consumerDispatcher; } - public override void HandleCommand(in IncomingCommand cmd) + public override async Task HandleCommandAsync(IncomingCommand cmd) { try { @@ -214,7 +218,8 @@ public override void HandleCommand(in IncomingCommand cmd) var method = new Client.Framing.Impl.BasicCancelOk(cmd.MethodSpan); _tcs.TrySetResult(true); Debug.Assert(_consumerTag == method._consumerTag); - _consumerDispatcher.HandleBasicCancelOk(_consumerTag); + await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken) + .ConfigureAwait(false); } else { @@ -240,7 +245,7 @@ public BasicConsumeAsyncRpcContinuation(IBasicConsumer consumer, IConsumerDispat _consumerDispatcher = consumerDispatcher; } - public override void HandleCommand(in IncomingCommand cmd) + public override async Task HandleCommandAsync(IncomingCommand cmd) { try { @@ -248,7 +253,8 @@ public override void HandleCommand(in IncomingCommand cmd) { var method = new Client.Framing.Impl.BasicConsumeOk(cmd.MethodSpan); _tcs.TrySetResult(method._consumerTag); - _consumerDispatcher.HandleBasicConsumeOk(_consumer, method._consumerTag); + await _consumerDispatcher.HandleBasicConsumeOkAsync(_consumer, method._consumerTag, CancellationToken) + .ConfigureAwait(false); } else { @@ -272,7 +278,7 @@ public BasicGetAsyncRpcContinuation(Func adjustDeliveryTag, TimeSp _adjustDeliveryTag = adjustDeliveryTag; } - public override void HandleCommand(in IncomingCommand cmd) + public override Task HandleCommandAsync(IncomingCommand cmd) { try { @@ -300,6 +306,8 @@ public override void HandleCommand(in IncomingCommand cmd) { _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } finally { @@ -389,7 +397,7 @@ public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(con { } - public override void HandleCommand(in IncomingCommand cmd) + public override Task HandleCommandAsync(IncomingCommand cmd) { try { @@ -403,6 +411,8 @@ public override void HandleCommand(in IncomingCommand cmd) { _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } finally { @@ -433,7 +443,7 @@ public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(cont { } - public override void HandleCommand(in IncomingCommand cmd) + public override Task HandleCommandAsync(IncomingCommand cmd) { try { @@ -446,6 +456,8 @@ public override void HandleCommand(in IncomingCommand cmd) { _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } finally { @@ -460,7 +472,7 @@ public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout) : base(conti { } - public override void HandleCommand(in IncomingCommand cmd) + public override Task HandleCommandAsync(IncomingCommand cmd) { try { @@ -473,6 +485,8 @@ public override void HandleCommand(in IncomingCommand cmd) { _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } finally { diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 78d1995d74..cd53d6c960 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -219,14 +219,15 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort) public async Task CloseAsync(ShutdownEventArgs args, bool abort) { - using var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { ChannelShutdown += k.OnConnectionShutdown; - Enqueue(k); + enqueued = Enqueue(k); ConsumerDispatcher.Quiesce(); if (SetCloseReason(args)) @@ -267,6 +268,10 @@ await ConsumerDispatcher.WaitForShutdownAsync() } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); ChannelShutdown -= k.OnConnectionShutdown; } @@ -283,13 +288,14 @@ internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationTok internal async ValueTask ConnectionSecureOkAsync(byte[] response) { - using var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); + enqueued = Enqueue(k); try { @@ -308,6 +314,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -316,13 +326,14 @@ internal async ValueTask ConnectionStartOkAsync( IDictionary clientProperties, string mechanism, byte[] response, string locale) { - using var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); + enqueued = Enqueue(k); try { @@ -341,33 +352,40 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } protected abstract Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken); - protected void Enqueue(IRpcContinuation k) + protected bool Enqueue(IRpcContinuation k) { if (IsOpen) { _continuationQueue.Enqueue(k); + return true; } else { k.HandleChannelShutdown(CloseReason); + return false; } } internal async Task OpenAsync() { - using var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); + enqueued = Enqueue(k); var method = new ChannelOpen(); await ModelSendAsync(method, k.CancellationToken) @@ -379,6 +397,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -396,11 +418,20 @@ internal void FinishClose() private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - // Was asynchronous. Already processed. No need to process further. - if (false == await DispatchCommandAsync(cmd, cancellationToken).ConfigureAwait(false)) + /* + * If DispatchCommandAsync returns `true`, it means that the incoming command is server-originated, and has + * already been handled. + * + * Else, the incoming command is the return of an RPC call, and must be handled. + */ + if (false == await DispatchCommandAsync(cmd, cancellationToken) + .ConfigureAwait(false)) { - IRpcContinuation c = _continuationQueue.Next(); - c.HandleCommand(in cmd); + using (IRpcContinuation c = _continuationQueue.Next()) + { + await c.HandleCommandAsync(cmd) + .ConfigureAwait(false); + } } } @@ -593,12 +624,14 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) } } - protected void HandleBasicCancel(in IncomingCommand cmd) + protected async Task HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken) { try { string consumerTag = new Client.Framing.Impl.BasicCancel(cmd.MethodSpan)._consumerTag; - ConsumerDispatcher.HandleBasicCancel(consumerTag); + await ConsumerDispatcher.HandleBasicCancelAsync(consumerTag, cancellationToken) + .ConfigureAwait(false); + return true; } finally { @@ -606,20 +639,22 @@ protected void HandleBasicCancel(in IncomingCommand cmd) } } - protected void HandleBasicDeliver(in IncomingCommand cmd) + protected async Task HandleBasicDeliverAsync(IncomingCommand cmd, CancellationToken cancellationToken) { try { var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodSpan); var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); - ConsumerDispatcher.HandleBasicDeliver( + await ConsumerDispatcher.HandleBasicDeliverAsync( method._consumerTag, AdjustDeliveryTag(method._deliveryTag), method._redelivered, method._exchange, method._routingKey, header, - cmd.Body); + cmd.Body, + cancellationToken).ConfigureAwait(false); + return true; } finally { @@ -682,7 +717,7 @@ await ModelSendAsync(method, cancellationToken) } } - protected void HandleChannelCloseOk(in IncomingCommand cmd) + protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) { try { @@ -695,8 +730,11 @@ protected void HandleChannelCloseOk(in IncomingCommand cmd) if (_continuationQueue.TryPeek(out var k)) { _continuationQueue.Next(); - k.HandleCommand(cmd); + await k.HandleCommandAsync(cmd) + .ConfigureAwait(false); } + + return true; } finally { @@ -783,10 +821,12 @@ await ModelSendAsync(replyMethod, cancellationToken) } } - protected void HandleConnectionSecure(in IncomingCommand cmd) + protected async Task HandleConnectionSecureAsync(IncomingCommand _) { - using var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); - k.HandleCommand(IncomingCommand.Empty); // release the continuation. + var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); + await k.HandleCommandAsync(IncomingCommand.Empty) + .ConfigureAwait(false); // release the continuation. + return true; } protected async Task HandleConnectionStartAsync(IncomingCommand cmd, CancellationToken cancellationToken) @@ -823,13 +863,16 @@ await Session.Connection.CloseAsync(reason, false, } } - protected void HandleConnectionTune(in IncomingCommand cmd) + protected async Task HandleConnectionTuneAsync(IncomingCommand cmd) { + // Note: `using` here to ensure instance is disposed using var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); - /* - * Note: releases the continuation and returns the buffers - */ - k.HandleCommand(cmd); + + // Note: releases the continuation and returns the buffers + await k.HandleCommandAsync(cmd) + .ConfigureAwait(false); + + return true; } protected void HandleConnectionUnblocked(in IncomingCommand cmd) @@ -852,7 +895,11 @@ protected void HandleConnectionUnblocked(in IncomingCommand cmd) public async Task BasicCancelAsync(string consumerTag, bool noWait) { - using var k = new BasicCancelAsyncRpcContinuation(consumerTag, ConsumerDispatcher, ContinuationTimeout); + // NOTE: + // Maybe don't dispose this instance because the CancellationToken must remain + // valid for processing the response. + bool enqueued = false; + var k = new BasicCancelAsyncRpcContinuation(consumerTag, ConsumerDispatcher, ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -868,7 +915,7 @@ await ModelSendAsync(method, k.CancellationToken) } else { - Enqueue(k); + enqueued = Enqueue(k); await ModelSendAsync(method, k.CancellationToken) .ConfigureAwait(false); @@ -881,6 +928,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -898,13 +949,17 @@ public async Task BasicConsumeAsync(string queue, bool autoAck, string c } } - using var k = new BasicConsumeAsyncRpcContinuation(consumer, ConsumerDispatcher, ContinuationTimeout); + // NOTE: + // Maybe don't dispose this instance because the CancellationToken must remain + // valid for processing the response. + bool enqueued = false; + var k = new BasicConsumeAsyncRpcContinuation(consumer, ConsumerDispatcher, ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); + enqueued = Enqueue(k); var method = new Client.Framing.Impl.BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, false, arguments); await ModelSendAsync(method, k.CancellationToken) @@ -914,19 +969,24 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } public async ValueTask BasicGetAsync(string queue, bool autoAck) { - using var k = new BasicGetAsyncRpcContinuation(AdjustDeliveryTag, ContinuationTimeout); + bool enqueued = false; + var k = new BasicGetAsyncRpcContinuation(AdjustDeliveryTag, ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); + enqueued = Enqueue(k); var method = new BasicGet(queue, autoAck); await ModelSendAsync(method, k.CancellationToken) @@ -946,6 +1006,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -1118,12 +1182,14 @@ public async Task UpdateSecretAsync(string newSecret, string reason) throw new ArgumentNullException(nameof(reason)); } + bool enqueued = false; + var k = new SimpleAsyncRpcContinuation(ProtocolCommandId.ConnectionUpdateSecretOk, ContinuationTimeout); + await _rpcSemaphore.WaitAsync() .ConfigureAwait(false); try { - using var k = new SimpleAsyncRpcContinuation(ProtocolCommandId.ConnectionUpdateSecretOk, ContinuationTimeout); - Enqueue(k); + enqueued = Enqueue(k); byte[] newSecretBytes = Encoding.UTF8.GetBytes(newSecret); var method = new ConnectionUpdateSecret(newSecretBytes, reason); @@ -1136,19 +1202,24 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } public async Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global) { - using var k = new BasicQosAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new BasicQosAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); + enqueued = Enqueue(k); var method = new BasicQos(prefetchSize, prefetchCount, global); await ModelSendAsync(method, k.CancellationToken) @@ -1160,13 +1231,18 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } public async Task ConfirmSelectAsync() { - using var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1178,7 +1254,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) NextPublishSeqNo = 1; } - Enqueue(k); + enqueued = Enqueue(k); var method = new ConfirmSelect(false); await ModelSendAsync(method, k.CancellationToken) @@ -1191,6 +1267,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -1198,7 +1278,8 @@ await ModelSendAsync(method, k.CancellationToken) public async Task ExchangeBindAsync(string destination, string source, string routingKey, IDictionary arguments, bool noWait) { - using var k = new ExchangeBindAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new ExchangeBindAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1213,7 +1294,7 @@ await ModelSendAsync(method, k.CancellationToken) } else { - Enqueue(k); + enqueued = Enqueue(k); await ModelSendAsync(method, k.CancellationToken) .ConfigureAwait(false); @@ -1226,6 +1307,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -1239,14 +1324,14 @@ public Task ExchangeDeclarePassiveAsync(string exchange) public async Task ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, IDictionary arguments, bool passive, bool noWait) { - using var k = new ExchangeDeclareAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new ExchangeDeclareAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { var method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, false, noWait, arguments); - if (noWait) { await ModelSendAsync(method, k.CancellationToken) @@ -1254,7 +1339,7 @@ await ModelSendAsync(method, k.CancellationToken) } else { - Enqueue(k); + enqueued = Enqueue(k); await ModelSendAsync(method, k.CancellationToken) .ConfigureAwait(false); @@ -1267,13 +1352,18 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } public async Task ExchangeDeleteAsync(string exchange, bool ifUnused, bool noWait) { - using var k = new ExchangeDeleteAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new ExchangeDeleteAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1288,7 +1378,7 @@ await ModelSendAsync(method, k.CancellationToken) } else { - Enqueue(k); + enqueued = Enqueue(k); await ModelSendAsync(method, k.CancellationToken) .ConfigureAwait(false); @@ -1301,6 +1391,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -1308,7 +1402,8 @@ await ModelSendAsync(method, k.CancellationToken) public async Task ExchangeUnbindAsync(string destination, string source, string routingKey, IDictionary arguments, bool noWait) { - using var k = new ExchangeUnbindAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new ExchangeUnbindAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1323,7 +1418,7 @@ await ModelSendAsync(method, k.CancellationToken) } else { - Enqueue(k); + enqueued = Enqueue(k); await ModelSendAsync(method, k.CancellationToken) .ConfigureAwait(false); @@ -1336,6 +1431,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -1363,7 +1462,8 @@ public async Task QueueDeclareAsync(string queue, bool durable, } } - using var k = new QueueDeclareAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new QueueDeclareAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1373,6 +1473,7 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) if (noWait) { + await ModelSendAsync(method, k.CancellationToken) .ConfigureAwait(false); @@ -1385,7 +1486,7 @@ await ModelSendAsync(method, k.CancellationToken) } else { - Enqueue(k); + enqueued = Enqueue(k); await ModelSendAsync(method, k.CancellationToken) .ConfigureAwait(false); @@ -1401,6 +1502,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -1408,7 +1513,8 @@ await ModelSendAsync(method, k.CancellationToken) public async Task QueueBindAsync(string queue, string exchange, string routingKey, IDictionary arguments, bool noWait) { - using var k = new QueueBindAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new QueueBindAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1423,7 +1529,7 @@ await ModelSendAsync(method, k.CancellationToken) } else { - Enqueue(k); + enqueued = Enqueue(k); await ModelSendAsync(method, k.CancellationToken) .ConfigureAwait(false); @@ -1436,6 +1542,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } @@ -1454,7 +1564,8 @@ public async Task ConsumerCountAsync(string queue) public async Task QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait) { - using var k = new QueueDeleteAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new QueueDeleteAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1471,7 +1582,7 @@ await ModelSendAsync(method, k.CancellationToken) } else { - Enqueue(k); + enqueued = Enqueue(k); await ModelSendAsync(method, k.CancellationToken) .ConfigureAwait(false); @@ -1481,19 +1592,24 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } public async Task QueuePurgeAsync(string queue) { - using var k = new QueuePurgeAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new QueuePurgeAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); + enqueued = Enqueue(k); var method = new QueuePurge(queue, false); await ModelSendAsync(method, k.CancellationToken) @@ -1503,13 +1619,18 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } public async Task QueueUnbindAsync(string queue, string exchange, string routingKey, IDictionary arguments) { - using var k = new QueueUnbindAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new QueueUnbindAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1527,19 +1648,24 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } public async Task TxCommitAsync() { - using var k = new TxCommitAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new TxCommitAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); + enqueued = Enqueue(k); var method = new TxCommit(); await ModelSendAsync(method, k.CancellationToken) @@ -1551,19 +1677,24 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } public async Task TxRollbackAsync() { - using var k = new TxRollbackAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new TxRollbackAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); try { - Enqueue(k); + enqueued = Enqueue(k); var method = new TxRollback(); await ModelSendAsync(method, k.CancellationToken) @@ -1575,13 +1706,18 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } public async Task TxSelectAsync() { - using var k = new TxSelectAsyncRpcContinuation(ContinuationTimeout); + bool enqueued = false; + var k = new TxSelectAsyncRpcContinuation(ContinuationTimeout); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -1599,6 +1735,10 @@ await ModelSendAsync(method, k.CancellationToken) } finally { + if (false == enqueued) + { + k.Dispose(); + } _rpcSemaphore.Release(); } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 1c5811969a..5d3b3cc610 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -59,37 +59,55 @@ public bool IsShutdown } } - public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag) + public ValueTask HandleBasicConsumeOkAsync(IBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken) { if (false == _disposed && false == _quiesce) { AddConsumer(consumer, consumerTag); - _writer.TryWrite(new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag)); + return _writer.WriteAsync(new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag), cancellationToken); + } + else + { + return default; } } - public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body) + public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, + string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, RentedMemory body, + CancellationToken cancellationToken) { if (false == _disposed && false == _quiesce) { - _writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body)); + var work = new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body); + return _writer.WriteAsync(work, cancellationToken); + } + else + { + return default; } } - public void HandleBasicCancelOk(string consumerTag) + public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken) { if (false == _disposed && false == _quiesce) { - _writer.TryWrite(new WorkStruct(WorkType.CancelOk, GetAndRemoveConsumer(consumerTag), consumerTag)); + return _writer.WriteAsync(new WorkStruct(WorkType.CancelOk, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken); + } + else + { + return default; } } - public void HandleBasicCancel(string consumerTag) + public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken) { if (false == _disposed && false == _quiesce) { - _writer.TryWrite(new WorkStruct(WorkType.Cancel, GetAndRemoveConsumer(consumerTag), consumerTag)); + return _writer.WriteAsync(new WorkStruct(WorkType.Cancel, GetAndRemoveConsumer(consumerTag), consumerTag), cancellationToken); + } + else + { + return default; } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index faa44499e0..3c1646af46 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.Client.ConsumerDispatching @@ -43,19 +44,20 @@ internal interface IConsumerDispatcher : IDisposable IBasicConsumer GetAndRemoveConsumer(string tag); - void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag); + ValueTask HandleBasicConsumeOkAsync(IBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken); - void HandleBasicDeliver(string consumerTag, + ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, - RentedMemory body); + RentedMemory body, + CancellationToken cancellationToken); - void HandleBasicCancelOk(string consumerTag); + ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken); - void HandleBasicCancel(string consumerTag); + ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken); void Quiesce(); diff --git a/projects/RabbitMQ.Client/client/impl/IRpcContinuation.cs b/projects/RabbitMQ.Client/client/impl/IRpcContinuation.cs index 3f4ee17eb7..792e096bfc 100644 --- a/projects/RabbitMQ.Client/client/impl/IRpcContinuation.cs +++ b/projects/RabbitMQ.Client/client/impl/IRpcContinuation.cs @@ -29,11 +29,17 @@ // Copyright (c) 2007-2020 VMware, Inc. All rights reserved. //--------------------------------------------------------------------------- +using System; +using System.Threading.Tasks; + namespace RabbitMQ.Client.Impl { - internal interface IRpcContinuation + internal interface IRpcContinuation : IDisposable { - void HandleCommand(in IncomingCommand cmd); + // Note: + // No CancellationToken because the continuation class itself will create one + // to represent a continuation timeout + Task HandleCommandAsync(IncomingCommand cmd); void HandleChannelShutdown(ShutdownEventArgs reason); } } diff --git a/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs b/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs index 1b722bb8a0..e6ad807aaf 100644 --- a/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs +++ b/projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs @@ -31,6 +31,7 @@ using System; using System.Threading; +using System.Threading.Tasks; namespace RabbitMQ.Client.Impl { @@ -47,13 +48,18 @@ internal class RpcContinuationQueue { private class EmptyRpcContinuation : IRpcContinuation { - public void HandleCommand(in IncomingCommand cmd) + public Task HandleCommandAsync(IncomingCommand _) { + return Task.CompletedTask; } public void HandleChannelShutdown(ShutdownEventArgs reason) { } + + public void Dispose() + { + } } private static readonly EmptyRpcContinuation s_tmp = new EmptyRpcContinuation(); diff --git a/projects/Test/Integration/TestAsyncConsumerExceptions.cs b/projects/Test/Integration/TestAsyncConsumerExceptions.cs index 92403dcb37..72cecf496b 100644 --- a/projects/Test/Integration/TestAsyncConsumerExceptions.cs +++ b/projects/Test/Integration/TestAsyncConsumerExceptions.cs @@ -52,10 +52,7 @@ public TestAsyncConsumerExceptions(ITestOutputHelper output) public Task TestCancelNotificationExceptionHandling() { IBasicConsumer consumer = new ConsumerFailingOnCancel(_channel); - return TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => - { - await ch.QueueDeleteAsync(q, false, false); - }); + return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.QueueDeleteAsync(q, false, false)); } [Fact] @@ -99,6 +96,7 @@ protected async Task TestExceptionHandlingWith(IBasicConsumer consumer, string q = await _channel.QueueDeclareAsync(string.Empty, false, true, false); _channel.CallbackException += (ch, evt) => { + // _output.WriteLine($"[INFO] _channel.CallbackException: {evt.Exception}"); if (evt.Exception == TestException) { tcs.SetResult(true); diff --git a/projects/Test/Integration/TestHeartbeats.cs b/projects/Test/Integration/TestHeartbeats.cs index 99c3481d16..a640222dad 100644 --- a/projects/Test/Integration/TestHeartbeats.cs +++ b/projects/Test/Integration/TestHeartbeats.cs @@ -182,7 +182,7 @@ private bool LongRunningTestsEnabled() private Task SleepFor(int t) { - _output.WriteLine("Testing heartbeats, sleeping for {0} seconds", t); + // _output.WriteLine("Testing heartbeats, sleeping for {0} seconds", t); return Task.Delay(t * 1000); } diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 11d59fbead..6011631a48 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -167,7 +167,7 @@ await Assert.ThrowsAsync(() => sw.Stop(); - _output.WriteLine($"[INFO] heartbeat timeout took {sw.Elapsed}"); + // _output.WriteLine($"[INFO] heartbeat timeout took {sw.Elapsed}"); } [SkippableFact] @@ -227,7 +227,7 @@ public async Task TestTcpReset_GH1464() sw.Stop(); - _output.WriteLine($"[INFO] reset peer took {sw.Elapsed}"); + // _output.WriteLine($"[INFO] reset peer took {sw.Elapsed}"); } private bool AreToxiproxyTestsEnabled