From 4f2746e8349f21bc4b6483757620d071feb6011f Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 08:33:48 -0700 Subject: [PATCH 01/14] Misc changes * Ensure that `RABBITMQ_LONG_RUNNING_TESTS` is set when appropriate. --- .github/workflows/build-test.yaml | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 9713c86e7..199451745 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -73,8 +73,8 @@ jobs: Receive-Job -Job $tx; ` & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; ` dotnet test ` - --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` + --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` --environment 'RABBITMQ_TOXIPROXY_TESTS=true' ` --environment 'PASSWORD=grapefruit' ` --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" ` @@ -114,7 +114,12 @@ jobs: id: install-start-rabbitmq run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - name: Sequential Integration Tests - run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' + run: dotnet test ` + --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` + --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` + --environment 'PASSWORD=grapefruit' ` + --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" ` + "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() uses: actions/upload-artifact@v4 @@ -182,8 +187,8 @@ jobs: - name: Integration Tests run: | dotnet test \ - --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ + --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ --environment 'RABBITMQ_TOXIPROXY_TESTS=true' \ --environment 'PASSWORD=grapefruit' \ --environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \ @@ -222,7 +227,10 @@ jobs: - name: Sequential Integration Tests run: | dotnet test \ + --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ + --environment 'PASSWORD=grapefruit' \ + --environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \ "${{ github.workspace }}/projects/Test/SequentialIntegration/SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() From 405e6568e7466f6d6e49a858e7e9f38bc99b770a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 08:44:26 -0700 Subject: [PATCH 02/14] * Ensure that `RABBITMQ_LONG_RUNNING_TESTS` is set when appropriate in the `Makefile` --- Makefile | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index e8f308954..42de4527d 100644 --- a/Makefile +++ b/Makefile @@ -21,13 +21,17 @@ build: test: dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed' dotnet test --environment 'GITHUB_ACTIONS=true' \ - --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ + --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ --environment 'RABBITMQ_TOXIPROXY_TESTS=true' \ --environment 'PASSWORD=grapefruit' \ --environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \ "$(CURDIR)/projects/Test/Integration/Integration.csproj" --logger 'console;verbosity=detailed' - dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed' + dotnet test --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ + --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ + --environment 'PASSWORD=grapefruit' \ + --environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \ + $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed' # Note: # You must have the expected OAuth2 environment set up for this target From 1e42d0093e127eecf1b8bb6ed3f33bb81c83bf4e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:05:25 -0700 Subject: [PATCH 03/14] * Ignore `OperationCanceledException` in heartbeat loop if main loop is canceled --- .../RabbitMQ.Client/client/impl/Connection.Heartbeat.cs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs index 97b3b900d..d045555ab 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs @@ -130,6 +130,13 @@ await FinishCloseAsync(cts.Token) _heartbeatReadTimer?.Change((int)Heartbeat.TotalMilliseconds, Timeout.Infinite); } } + catch (OperationCanceledException) + { + if (false == _mainLoopCts.IsCancellationRequested) + { + throw; + } + } catch (ObjectDisposedException) { // timer is already disposed, From 0f4dfbb5722ee23213c8581c142ff93fc95823ac Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:08:49 -0700 Subject: [PATCH 04/14] * Ignore `OperationCanceledException` in consumer dispatchers if cancellation is requested --- .../AsyncConsumerDispatcher.cs | 48 +++++++------ .../ConsumerDispatching/ConsumerDispatcher.cs | 68 +++++++++++-------- 2 files changed, 68 insertions(+), 48 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs index 761e6d23e..cc5b61885 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -16,39 +16,49 @@ internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency) protected override async Task ProcessChannelAsync(CancellationToken token) { - while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) + try { - while (_reader.TryRead(out WorkStruct work)) + while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) { - using (work) + while (_reader.TryRead(out WorkStruct work)) { - try + using (work) { - Task task = work.WorkType switch + try { - WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver( - work.ConsumerTag, work.DeliveryTag, work.Redelivered, - work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory), + Task task = work.WorkType switch + { + WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver( + work.ConsumerTag, work.DeliveryTag, work.Redelivered, + work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory), - WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag), + WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag), - WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag), + WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag), - WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag), + WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag), - WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason), + WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason), - _ => Task.CompletedTask - }; - await task.ConfigureAwait(false); - } - catch (Exception e) - { - _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); + _ => Task.CompletedTask + }; + await task.ConfigureAwait(false); + } + catch (Exception e) + { + _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); + } } } } } + catch (OperationCanceledException) + { + if (false == token.IsCancellationRequested) + { + throw; + } + } } } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs index 632b7cf72..a3929d398 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs @@ -16,45 +16,55 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency) protected override async Task ProcessChannelAsync(CancellationToken token) { - while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) + try { - while (_reader.TryRead(out var work)) + while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) { - using (work) + while (_reader.TryRead(out WorkStruct work)) { - try + using (work) { - IBasicConsumer consumer = work.Consumer; - string? consumerTag = work.ConsumerTag; - switch (work.WorkType) + try { - case WorkType.Deliver: - await consumer.HandleBasicDeliverAsync( - consumerTag, work.DeliveryTag, work.Redelivered, - work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory) - .ConfigureAwait(false); - break; - case WorkType.Cancel: - consumer.HandleBasicCancel(consumerTag); - break; - case WorkType.CancelOk: - consumer.HandleBasicCancelOk(consumerTag); - break; - case WorkType.ConsumeOk: - consumer.HandleBasicConsumeOk(consumerTag); - break; - case WorkType.Shutdown: - consumer.HandleChannelShutdown(_channel, work.Reason); - break; + IBasicConsumer consumer = work.Consumer; + string? consumerTag = work.ConsumerTag; + switch (work.WorkType) + { + case WorkType.Deliver: + await consumer.HandleBasicDeliverAsync( + consumerTag, work.DeliveryTag, work.Redelivered, + work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory) + .ConfigureAwait(false); + break; + case WorkType.Cancel: + consumer.HandleBasicCancel(consumerTag); + break; + case WorkType.CancelOk: + consumer.HandleBasicCancelOk(consumerTag); + break; + case WorkType.ConsumeOk: + consumer.HandleBasicConsumeOk(consumerTag); + break; + case WorkType.Shutdown: + consumer.HandleChannelShutdown(_channel, work.Reason); + break; + } + } + catch (Exception e) + { + _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); } - } - catch (Exception e) - { - _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); } } } } + catch (OperationCanceledException) + { + if (false == token.IsCancellationRequested) + { + throw; + } + } } } } From 4f01e194f0131c29aec4d4622215e0f1b08efcf6 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:17:43 -0700 Subject: [PATCH 05/14] * `EventingWrapper` and `AsyncEventingWrapper` improvements and cleanups --- .../client/impl/AsyncEventingWrapper.cs | 88 +++++++++++++++++++ .../client/impl/EventingWrapper.cs | 69 ++------------- 2 files changed, 94 insertions(+), 63 deletions(-) create mode 100644 projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs diff --git a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs new file mode 100644 index 000000000..29f007b7c --- /dev/null +++ b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs @@ -0,0 +1,88 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; + +namespace RabbitMQ.Client.Impl +{ +#nullable enable + internal struct AsyncEventingWrapper + { + private event AsyncEventHandler? _event; + private Delegate[]? _handlers; + private string? _context; + private Func? _onException; + + public readonly bool IsEmpty => _event is null; + + public AsyncEventingWrapper(string context, Func onException) + { + _event = null; + _handlers = null; + _context = context; + _onException = onException; + } + + public void AddHandler(AsyncEventHandler? handler) + { + _event += handler; + _handlers = null; + } + + public void RemoveHandler(AsyncEventHandler? handler) + { + _event -= handler; + _handlers = null; + } + + // Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied) + public Task InvokeAsync(object sender, T parameter) + { + Delegate[]? handlers = _handlers; + if (handlers is null) + { + handlers = _event?.GetInvocationList(); + if (handlers is null) + { + return Task.CompletedTask; + } + + _handlers = handlers; + } + + return InternalInvoke(handlers, sender, parameter); + } + + private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter) + { + foreach (AsyncEventHandler action in handlers.Cast>()) + { + try + { + await action(sender, parameter) + .ConfigureAwait(false); + } + catch (Exception exception) + { + if (_onException != null) + { + await _onException(exception, _context!) + .ConfigureAwait(false); + } + else + { + throw; + } + } + } + } + + public void Takeover(in AsyncEventingWrapper other) + { + _event = other._event; + _handlers = other._handlers; + _context = other._context; + _onException = other._onException; + } + } +} diff --git a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs index b716af5ff..4017fb405 100644 --- a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs @@ -1,6 +1,5 @@ using System; -using System.Threading.Tasks; -using RabbitMQ.Client.Events; +using System.Linq; namespace RabbitMQ.Client.Impl { @@ -12,7 +11,7 @@ internal struct EventingWrapper private string? _context; private Action? _onExceptionAction; - public bool IsEmpty => _event is null; + public readonly bool IsEmpty => _event is null; public EventingWrapper(string context, Action onExceptionAction) { @@ -42,7 +41,7 @@ public void ClearHandlers() public void Invoke(object sender, T parameter) { - var handlers = _handlers; + Delegate[]? handlers = _handlers; if (handlers is null) { handlers = _event?.GetInvocationList(); @@ -53,7 +52,8 @@ public void Invoke(object sender, T parameter) _handlers = handlers; } - foreach (EventHandler action in handlers) + + foreach (EventHandler action in handlers.Cast>()) { try { @@ -61,7 +61,7 @@ public void Invoke(object sender, T parameter) } catch (Exception exception) { - var onException = _onExceptionAction; + Action? onException = _onExceptionAction; if (onException != null) { onException(exception, _context!); @@ -82,61 +82,4 @@ public void Takeover(in EventingWrapper other) _onExceptionAction = other._onExceptionAction; } } - - internal struct AsyncEventingWrapper - { - private event AsyncEventHandler? _event; - private Delegate[]? _handlers; - - public bool IsEmpty => _event is null; - - public void AddHandler(AsyncEventHandler? handler) - { - _event += handler; - _handlers = null; - } - - public void RemoveHandler(AsyncEventHandler? handler) - { - _event -= handler; - _handlers = null; - } - - // Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied) - public Task InvokeAsync(object sender, T parameter) - { - var handlers = _handlers; - if (handlers is null) - { - handlers = _event?.GetInvocationList(); - if (handlers is null) - { - return Task.CompletedTask; - } - - _handlers = handlers; - } - - if (handlers.Length == 1) - { - return ((AsyncEventHandler)handlers[0])(sender, parameter); - } - return InternalInvoke(handlers, sender, parameter); - } - - private static async Task InternalInvoke(Delegate[] handlers, object sender, T parameter) - { - foreach (AsyncEventHandler action in handlers) - { - await action(sender, parameter) - .ConfigureAwait(false); - } - } - - public void Takeover(in AsyncEventingWrapper other) - { - _event = other._event; - _handlers = other._handlers; - } - } } From 5ae75ed338ac45b730151fa764b10cedf27d6f2f Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:20:38 -0700 Subject: [PATCH 06/14] * Integration test `DisposeAsync` should only close channel and conn when not-null and still open --- projects/Test/Common/IntegrationFixture.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 938eb3000..62846d397 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -179,13 +179,13 @@ public virtual async Task DisposeAsync() { try { - if (_channel != null) + if (_conn != null && _conn.IsOpen) { - await _channel.CloseAsync(); - } + if (_channel != null && _channel.IsOpen) + { + await _channel.CloseAsync(); + } - if (_conn != null) - { await _conn.CloseAsync(); } } From e9e59a890c29c26ac579c8edd83d8c78b0a60091 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:22:32 -0700 Subject: [PATCH 07/14] * Add publisher confirmations to `PublishMessagesWhileClosingConnAsync` --- projects/Test/Common/TestConnectionRecoveryBase.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index 0718d8c28..e8eb2884e 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -202,6 +202,8 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) { using (IChannel publishingChannel = await publishingConn.CreateChannelAsync()) { + await publishingChannel.ConfirmSelectAsync(); + for (ushort i = 0; i < TotalMessageCount; i++) { if (i == CloseAtCount) @@ -210,6 +212,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) } await publishingChannel.BasicPublishAsync(string.Empty, queueName, _messageBody); + await publishingChannel.WaitForConfirmsOrDieAsync(); } await publishingChannel.CloseAsync(); From 3ba88557431779e6ad8c9a6e99dc61dde5badbf4 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:24:25 -0700 Subject: [PATCH 08/14] * Use `WaitSpan` --- projects/Test/Integration/TestAsyncConsumer.cs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 990f29f10..c6b03bb92 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -64,9 +64,8 @@ public async Task TestBasicRoundtripConcurrent() var publish1SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var publish2SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var maximumWaitTime = TimeSpan.FromSeconds(10); - var tokenSource = new CancellationTokenSource(maximumWaitTime); + var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { publish1SyncSource.TrySetResult(false); @@ -120,10 +119,10 @@ public async Task TestBasicRoundtripConcurrent() await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); } finally { @@ -145,8 +144,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() var publish1SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var publish2SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var maximumWaitTime = TimeSpan.FromSeconds(30); - var tokenSource = new CancellationTokenSource(maximumWaitTime); + + var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { publish1SyncSource.TrySetResult(false); @@ -234,10 +233,10 @@ public async Task TestBasicRoundtripConcurrentManyMessages() await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); await consumeChannel.CloseAsync(); } From 6d6210c108988d443a7a5205ec37973e5276f98a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:26:36 -0700 Subject: [PATCH 09/14] * Use exclusive queue and publisher confirms in `TestBasicRoundtripConcurrentManyMessages` --- projects/Test/Integration/TestAsyncConsumer.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index c6b03bb92..87ab923a0 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -135,7 +135,6 @@ public async Task TestBasicRoundtripConcurrent() public async Task TestBasicRoundtripConcurrentManyMessages() { const int publish_total = 4096; - string queueName = $"{nameof(TestBasicRoundtripConcurrentManyMessages)}-{Guid.NewGuid()}"; string publish1 = GetUniqueString(32768); byte[] body1 = _encoding.GetBytes(publish1); @@ -178,19 +177,23 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }); }; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: queueName, exclusive: false, durable: true); - Assert.Equal(q, queueName); + QueueDeclareOk q = await _channel.QueueDeclareAsync(); + string queueName = q.QueueName; Task publishTask = Task.Run(async () => { using (IChannel publishChannel = await _conn.CreateChannelAsync()) { + await publishChannel.ConfirmSelectAsync(); + QueueDeclareOk pubQ = await publishChannel.QueueDeclareAsync(queue: queueName, exclusive: false, durable: true); Assert.Equal(queueName, pubQ.QueueName); + for (int i = 0; i < publish_total; i++) { await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); await publishChannel.BasicPublishAsync(string.Empty, queueName, body2); + await publishChannel.WaitForConfirmsOrDieAsync(); } await publishChannel.CloseAsync(); From a6cfa912c11169cb9e19f6c62430e3b0029c3313 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:28:49 -0700 Subject: [PATCH 10/14] * More improvements to `TestBasicRoundtripConcurrentManyMessages` --- .../Test/Integration/TestAsyncConsumer.cs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 87ab923a0..4c217a844 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -159,8 +159,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + publish1SyncSource.TrySetException(ea.Exception); + publish2SyncSource.TrySetException(ea.Exception); } }); }; @@ -171,8 +171,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + publish1SyncSource.TrySetException(ea.Exception); + publish2SyncSource.TrySetException(ea.Exception); } }); }; @@ -209,7 +209,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() int publish1_count = 0; int publish2_count = 0; - consumer.Received += async (o, a) => + consumer.Received += (o, a) => { string decoded = _encoding.GetString(a.Body.ToArray()); if (decoded == publish1) @@ -217,7 +217,6 @@ public async Task TestBasicRoundtripConcurrentManyMessages() if (Interlocked.Increment(ref publish1_count) >= publish_total) { publish1SyncSource.TrySetResult(true); - await publish2SyncSource.Task; } } else if (decoded == publish2) @@ -225,9 +224,15 @@ public async Task TestBasicRoundtripConcurrentManyMessages() if (Interlocked.Increment(ref publish2_count) >= publish_total) { publish2SyncSource.TrySetResult(true); - await publish1SyncSource.Task; } } + else + { + var ex = new InvalidOperationException("incorrect message - should never happen!"); + publish1SyncSource.TrySetException(ex); + publish2SyncSource.TrySetException(ex); + } + return Task.CompletedTask; }; await consumeChannel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer); From 1a748f441be3f175958b65342f663a8c9a5e1dbf Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:30:33 -0700 Subject: [PATCH 11/14] * Increase concurrency in `TestConcurrentChannelOperationsAsync` --- .../Integration/TestConcurrentAccessWithSharedConnection.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index 8d2edccc0..5b3ff5729 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -70,12 +70,14 @@ private async Task TestConcurrentChannelOperationsAsync(Func var tasks = new List(); for (int i = 0; i < _processorCount; i++) { - tasks.Add(Task.Run(async () => + tasks.Add(Task.Run(() => { + var subTasks = new List(); for (int j = 0; j < iterations; j++) { - await action(_conn); + subTasks.Add(action(_conn)); } + return Task.WhenAll(subTasks); })); } From 7e8a1e9a66ce5e7f8aea2026ba983198268302f1 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 09:32:51 -0700 Subject: [PATCH 12/14] * `TestDisposedWithSocketClosedOutOfBand` improvements --- projects/Test/Integration/TestAsyncConsumer.cs | 3 --- .../Test/Integration/TestConnectionShutdown.cs | 17 ++++++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 4c217a844..41c49de28 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -186,9 +186,6 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { await publishChannel.ConfirmSelectAsync(); - QueueDeclareOk pubQ = await publishChannel.QueueDeclareAsync(queue: queueName, exclusive: false, durable: true); - Assert.Equal(queueName, pubQ.QueueName); - for (int i = 0; i < publish_total; i++) { await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index 2f3dce5f5..2ea9333a8 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -100,13 +100,20 @@ public async Task TestDisposedWithSocketClosedOutOfBand() }; var c = (AutorecoveringConnection)_conn; - await c.CloseFrameHandlerAsync(); + Task frameHandlerCloseTask = c.CloseFrameHandlerAsync(); - _conn.Dispose(); - _conn = null; + try + { + _conn.Dispose(); + await WaitAsync(tcs, WaitSpan, "channel shutdown"); + await frameHandlerCloseTask.WaitAsync(WaitSpan); + } + finally + { + _conn = null; + _channel = null; + } - TimeSpan waitSpan = TimeSpan.FromSeconds(10); - await WaitAsync(tcs, waitSpan, "channel shutdown"); } [Fact] From b23e91fd0bc7e573e2f59c90a4a98c64273b0cd6 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 10:35:53 -0700 Subject: [PATCH 13/14] * Catch `System.Threading.Channels.ChannelClosedException` when aborting --- projects/RabbitMQ.Client/client/impl/Connection.cs | 8 ++++++++ projects/Test/Integration/TestConnectionShutdown.cs | 1 - 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 62a1dba4a..9e3d12f6d 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -35,6 +35,7 @@ using System.IO; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -335,6 +336,13 @@ await _session0.TransmitAsync(method, cancellationToken) .ConfigureAwait(false); } } + catch (ChannelClosedException) + { + if (false == abort) + { + throw; + } + } catch (AlreadyClosedException) { if (false == abort) diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index 2ea9333a8..b0d88c80b 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -113,7 +113,6 @@ public async Task TestDisposedWithSocketClosedOutOfBand() _conn = null; _channel = null; } - } [Fact] From 0a06a230009310aaae93780fd85ba611d89a46b6 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 9 May 2024 14:19:51 -0700 Subject: [PATCH 14/14] * Major improvements in `TestAsyncConsumer` test suite. --- .../Test/Integration/TestAsyncConsumer.cs | 256 ++++++++++++------ 1 file changed, 173 insertions(+), 83 deletions(-) diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 41c49de28..c76e9acd7 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; @@ -52,6 +53,7 @@ public TestAsyncConsumer(ITestOutputHelper output) public async Task TestBasicRoundtripConcurrent() { QueueDeclareOk q = await _channel.QueueDeclareAsync(); + string publish1 = GetUniqueString(1024); byte[] body = _encoding.GetBytes(publish1); await _channel.BasicPublishAsync("", q.QueueName, body); @@ -68,8 +70,8 @@ public async Task TestBasicRoundtripConcurrent() var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + publish1SyncSource.TrySetCanceled(); + publish2SyncSource.TrySetCanceled(); }); try @@ -80,8 +82,7 @@ public async Task TestBasicRoundtripConcurrent() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } }); }; @@ -92,25 +93,28 @@ public async Task TestBasicRoundtripConcurrent() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } }); }; - consumer.Received += async (o, a) => + consumer.Received += (o, a) => { string decoded = _encoding.GetString(a.Body.ToArray()); if (decoded == publish1) { publish1SyncSource.TrySetResult(true); - await publish2SyncSource.Task; } else if (decoded == publish2) { publish2SyncSource.TrySetResult(true); - await publish1SyncSource.Task; } + else + { + var ex = new InvalidOperationException("incorrect message - should never happen!"); + SetException(ex, publish1SyncSource, publish2SyncSource); + } + return Task.CompletedTask; }; await _channel.BasicConsumeAsync(q.QueueName, true, string.Empty, false, false, null, consumer); @@ -126,8 +130,8 @@ public async Task TestBasicRoundtripConcurrent() } finally { - tokenSource.Dispose(); ctsr.Dispose(); + tokenSource.Dispose(); } } @@ -135,6 +139,7 @@ public async Task TestBasicRoundtripConcurrent() public async Task TestBasicRoundtripConcurrentManyMessages() { const int publish_total = 4096; + string queueName = GenerateQueueName(); string publish1 = GetUniqueString(32768); byte[] body1 = _encoding.GetBytes(publish1); @@ -143,12 +148,14 @@ public async Task TestBasicRoundtripConcurrentManyMessages() var publish1SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var publish2SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var consumerSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + publish1SyncSource.TrySetCanceled(); + publish2SyncSource.TrySetCanceled(); + consumerSyncSource.TrySetCanceled(); }); try @@ -159,8 +166,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetException(ea.Exception); - publish2SyncSource.TrySetException(ea.Exception); + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } }); }; @@ -171,94 +177,146 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetException(ea.Exception); - publish2SyncSource.TrySetException(ea.Exception); + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } }); }; - QueueDeclareOk q = await _channel.QueueDeclareAsync(); - string queueName = q.QueueName; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: queueName, exclusive: false, autoDelete: true); + Assert.Equal(queueName, q.QueueName); Task publishTask = Task.Run(async () => { - using (IChannel publishChannel = await _conn.CreateChannelAsync()) + using (IConnection publishConn = await _connFactory.CreateConnectionAsync()) { - await publishChannel.ConfirmSelectAsync(); - - for (int i = 0; i < publish_total; i++) + publishConn.ConnectionShutdown += (o, ea) => + { + HandleConnectionShutdown(publishConn, ea, (args) => + { + if (args.Initiator == ShutdownInitiator.Peer) + { + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); + } + }); + }; + using (IChannel publishChannel = await publishConn.CreateChannelAsync()) { - await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); - await publishChannel.BasicPublishAsync(string.Empty, queueName, body2); - await publishChannel.WaitForConfirmsOrDieAsync(); + publishChannel.ChannelShutdown += (o, ea) => + { + HandleChannelShutdown(publishChannel, ea, (args) => + { + if (args.Initiator == ShutdownInitiator.Peer) + { + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); + } + }); + }; + await publishChannel.ConfirmSelectAsync(); + + for (int i = 0; i < publish_total; i++) + { + await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); + await publishChannel.BasicPublishAsync(string.Empty, queueName, body2); + await publishChannel.WaitForConfirmsOrDieAsync(); + } + + await publishChannel.CloseAsync(); } - await publishChannel.CloseAsync(); + await publishConn.CloseAsync(); } }); Task consumeTask = Task.Run(async () => { - using (IChannel consumeChannel = await _conn.CreateChannelAsync()) + using (IConnection consumeConn = await _connFactory.CreateConnectionAsync()) { - var consumer = new AsyncEventingBasicConsumer(consumeChannel); - - int publish1_count = 0; - int publish2_count = 0; - - consumer.Received += (o, a) => + consumeConn.ConnectionShutdown += (o, ea) => { - string decoded = _encoding.GetString(a.Body.ToArray()); - if (decoded == publish1) + HandleConnectionShutdown(consumeConn, ea, (args) => { - if (Interlocked.Increment(ref publish1_count) >= publish_total) + if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(true); + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } - } - else if (decoded == publish2) + }); + }; + using (IChannel consumeChannel = await consumeConn.CreateChannelAsync()) + { + consumeChannel.ChannelShutdown += (o, ea) => { - if (Interlocked.Increment(ref publish2_count) >= publish_total) + HandleChannelShutdown(consumeChannel, ea, (args) => { - publish2SyncSource.TrySetResult(true); - } - } - else - { - var ex = new InvalidOperationException("incorrect message - should never happen!"); - publish1SyncSource.TrySetException(ex); - publish2SyncSource.TrySetException(ex); - } - return Task.CompletedTask; - }; + if (args.Initiator == ShutdownInitiator.Peer) + { + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); + } + }); + }; - await consumeChannel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer); + var consumer = new AsyncEventingBasicConsumer(consumeChannel); - // ensure we get a delivery - await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); + int publish1_count = 0; + int publish2_count = 0; + + consumer.Received += (o, a) => + { + string decoded = _encoding.GetString(a.Body.ToArray()); + if (decoded == publish1) + { + if (Interlocked.Increment(ref publish1_count) >= publish_total) + { + publish1SyncSource.TrySetResult(true); + } + } + else if (decoded == publish2) + { + if (Interlocked.Increment(ref publish2_count) >= publish_total) + { + publish2SyncSource.TrySetResult(true); + } + } + else + { + var ex = new InvalidOperationException("incorrect message - should never happen!"); + SetException(ex, publish1SyncSource, publish2SyncSource); + } + return Task.CompletedTask; + }; - bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + await consumeChannel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer); + await consumerSyncSource.Task; - bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + await consumeChannel.CloseAsync(); + } - await consumeChannel.CloseAsync(); + await consumeConn.CloseAsync(); } }); - await AssertRanToCompletion(publishTask, consumeTask); + await AssertRanToCompletion(publishTask); + + await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); + consumerSyncSource.TrySetResult(true); + + bool result1 = await publish1SyncSource.Task; + Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + + bool result2 = await publish2SyncSource.Task; + Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); } finally { - tokenSource.Dispose(); ctsr.Dispose(); + tokenSource.Dispose(); } } [Fact] public async Task TestBasicRejectAsync() { + string queueName = GenerateQueueName(); + var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cancellationTokenSource = new CancellationTokenSource(TestTimeout); CancellationTokenRegistration ctsr = cancellationTokenSource.Token.Register(() => @@ -274,7 +332,7 @@ public async Task TestBasicRejectAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -283,10 +341,7 @@ public async Task TestBasicRejectAsync() { HandleChannelShutdown(_channel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - publishSyncSource.TrySetResult(false); - } + MaybeSetException(ea, publishSyncSource); }); }; @@ -311,9 +366,8 @@ public async Task TestBasicRejectAsync() publishSyncSource.TrySetResult(true); }; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty, - durable: false, exclusive: true, autoDelete: false); - string queueName = q.QueueName; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName, false, false, false); + const string publish1 = "sync-hi-1"; byte[] _body = _encoding.GetBytes(publish1); await _channel.BasicPublishAsync(string.Empty, queueName, _body); @@ -353,14 +407,17 @@ public async Task TestBasicRejectAsync() } finally { - cancellationTokenSource.Dispose(); + await _channel.QueueDeleteAsync(queue: queueName); ctsr.Dispose(); + cancellationTokenSource.Dispose(); } } [Fact] public async Task TestBasicAckAsync() { + string queueName = GenerateQueueName(); + const int messageCount = 1024; int messagesReceived = 0; @@ -372,7 +429,7 @@ public async Task TestBasicAckAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -383,7 +440,7 @@ public async Task TestBasicAckAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -403,15 +460,14 @@ public async Task TestBasicAckAsync() } }; - QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true); - string queueName = q.QueueName; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName, false, false, true); await _channel.BasicQosAsync(0, 1, false); await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumerTag: string.Empty, noLocal: false, exclusive: false, arguments: null, consumer); - var publishTask = Task.Run(async () => + Task publishTask = Task.Run(async () => { for (int i = 0; i < messageCount; i++) { @@ -419,10 +475,14 @@ public async Task TestBasicAckAsync() await _channel.BasicPublishAsync(string.Empty, queueName, _body); await _channel.WaitForConfirmsOrDieAsync(); } + + return true; }); + Assert.True(await publishTask); Assert.True(await publishSyncSource.Task); Assert.Equal(messageCount, messagesReceived); + await _channel.QueueDeleteAsync(queue: queueName); await _channel.CloseAsync(_closeArgs, false, CancellationToken.None); } @@ -437,7 +497,7 @@ public async Task TestBasicNackAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -448,7 +508,7 @@ public async Task TestBasicNackAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -527,15 +587,45 @@ public async Task NonAsyncConsumerShouldThrowInvalidOperationException() public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() { AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); - for (int i = 0; i < 1000; i++) + var tasks = new List(); + for (int i = 0; i < 256; i++) { - string q = Guid.NewGuid().ToString(); - await _channel.QueueDeclareAsync(q, false, false, true); - var dummy = new AsyncEventingBasicConsumer(_channel); - string tag = await _channel.BasicConsumeAsync(q, true, dummy); - await _channel.BasicCancelAsync(tag); + tasks.Add(Task.Run(async () => + { + string q = GenerateQueueName(); + await _channel.QueueDeclareAsync(q, false, false, true); + var dummy = new AsyncEventingBasicConsumer(_channel); + string tag = await _channel.BasicConsumeAsync(q, true, dummy); + await _channel.BasicCancelAsync(tag); + })); } + await Task.WhenAll(tasks); AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); } + + private static void SetException(Exception ex, params TaskCompletionSource[] tcsAry) + { + foreach (TaskCompletionSource tcs in tcsAry) + { + tcs.TrySetException(ex); + } + } + + private static void MaybeSetException(ShutdownEventArgs ea, params TaskCompletionSource[] tcsAry) + { + foreach (TaskCompletionSource tcs in tcsAry) + { + MaybeSetException(ea, tcs); + } + } + + private static void MaybeSetException(ShutdownEventArgs ea, TaskCompletionSource tcs) + { + if (ea.Initiator == ShutdownInitiator.Peer) + { + Exception ex = ea.Exception ?? new Exception(ea.ReplyText); + tcs.TrySetException(ex); + } + } } }