diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 9713c86e7..199451745 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -73,8 +73,8 @@ jobs: Receive-Job -Job $tx; ` & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; ` dotnet test ` - --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` + --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` --environment 'RABBITMQ_TOXIPROXY_TESTS=true' ` --environment 'PASSWORD=grapefruit' ` --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" ` @@ -114,7 +114,12 @@ jobs: id: install-start-rabbitmq run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - name: Sequential Integration Tests - run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' + run: dotnet test ` + --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` + --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` + --environment 'PASSWORD=grapefruit' ` + --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" ` + "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() uses: actions/upload-artifact@v4 @@ -182,8 +187,8 @@ jobs: - name: Integration Tests run: | dotnet test \ - --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ + --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ --environment 'RABBITMQ_TOXIPROXY_TESTS=true' \ --environment 'PASSWORD=grapefruit' \ --environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \ @@ -222,7 +227,10 @@ jobs: - name: Sequential Integration Tests run: | dotnet test \ + --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ + --environment 'PASSWORD=grapefruit' \ + --environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \ "${{ github.workspace }}/projects/Test/SequentialIntegration/SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() diff --git a/Makefile b/Makefile index e8f308954..42de4527d 100644 --- a/Makefile +++ b/Makefile @@ -21,13 +21,17 @@ build: test: dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed' dotnet test --environment 'GITHUB_ACTIONS=true' \ - --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ + --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ --environment 'RABBITMQ_TOXIPROXY_TESTS=true' \ --environment 'PASSWORD=grapefruit' \ --environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \ "$(CURDIR)/projects/Test/Integration/Integration.csproj" --logger 'console;verbosity=detailed' - dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed' + dotnet test --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ + --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ + --environment 'PASSWORD=grapefruit' \ + --environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \ + $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed' # Note: # You must have the expected OAuth2 environment set up for this target diff --git a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs new file mode 100644 index 000000000..29f007b7c --- /dev/null +++ b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs @@ -0,0 +1,88 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using RabbitMQ.Client.Events; + +namespace RabbitMQ.Client.Impl +{ +#nullable enable + internal struct AsyncEventingWrapper + { + private event AsyncEventHandler? _event; + private Delegate[]? _handlers; + private string? _context; + private Func? _onException; + + public readonly bool IsEmpty => _event is null; + + public AsyncEventingWrapper(string context, Func onException) + { + _event = null; + _handlers = null; + _context = context; + _onException = onException; + } + + public void AddHandler(AsyncEventHandler? handler) + { + _event += handler; + _handlers = null; + } + + public void RemoveHandler(AsyncEventHandler? handler) + { + _event -= handler; + _handlers = null; + } + + // Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied) + public Task InvokeAsync(object sender, T parameter) + { + Delegate[]? handlers = _handlers; + if (handlers is null) + { + handlers = _event?.GetInvocationList(); + if (handlers is null) + { + return Task.CompletedTask; + } + + _handlers = handlers; + } + + return InternalInvoke(handlers, sender, parameter); + } + + private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter) + { + foreach (AsyncEventHandler action in handlers.Cast>()) + { + try + { + await action(sender, parameter) + .ConfigureAwait(false); + } + catch (Exception exception) + { + if (_onException != null) + { + await _onException(exception, _context!) + .ConfigureAwait(false); + } + else + { + throw; + } + } + } + } + + public void Takeover(in AsyncEventingWrapper other) + { + _event = other._event; + _handlers = other._handlers; + _context = other._context; + _onException = other._onException; + } + } +} diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs index 97b3b900d..d045555ab 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs @@ -130,6 +130,13 @@ await FinishCloseAsync(cts.Token) _heartbeatReadTimer?.Change((int)Heartbeat.TotalMilliseconds, Timeout.Infinite); } } + catch (OperationCanceledException) + { + if (false == _mainLoopCts.IsCancellationRequested) + { + throw; + } + } catch (ObjectDisposedException) { // timer is already disposed, diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 62a1dba4a..9e3d12f6d 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -35,6 +35,7 @@ using System.IO; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -335,6 +336,13 @@ await _session0.TransmitAsync(method, cancellationToken) .ConfigureAwait(false); } } + catch (ChannelClosedException) + { + if (false == abort) + { + throw; + } + } catch (AlreadyClosedException) { if (false == abort) diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs index 761e6d23e..cc5b61885 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -16,39 +16,49 @@ internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency) protected override async Task ProcessChannelAsync(CancellationToken token) { - while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) + try { - while (_reader.TryRead(out WorkStruct work)) + while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) { - using (work) + while (_reader.TryRead(out WorkStruct work)) { - try + using (work) { - Task task = work.WorkType switch + try { - WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver( - work.ConsumerTag, work.DeliveryTag, work.Redelivered, - work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory), + Task task = work.WorkType switch + { + WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver( + work.ConsumerTag, work.DeliveryTag, work.Redelivered, + work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory), - WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag), + WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag), - WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag), + WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag), - WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag), + WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag), - WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason), + WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason), - _ => Task.CompletedTask - }; - await task.ConfigureAwait(false); - } - catch (Exception e) - { - _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); + _ => Task.CompletedTask + }; + await task.ConfigureAwait(false); + } + catch (Exception e) + { + _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); + } } } } } + catch (OperationCanceledException) + { + if (false == token.IsCancellationRequested) + { + throw; + } + } } } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs index 632b7cf72..a3929d398 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs @@ -16,45 +16,55 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency) protected override async Task ProcessChannelAsync(CancellationToken token) { - while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) + try { - while (_reader.TryRead(out var work)) + while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) { - using (work) + while (_reader.TryRead(out WorkStruct work)) { - try + using (work) { - IBasicConsumer consumer = work.Consumer; - string? consumerTag = work.ConsumerTag; - switch (work.WorkType) + try { - case WorkType.Deliver: - await consumer.HandleBasicDeliverAsync( - consumerTag, work.DeliveryTag, work.Redelivered, - work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory) - .ConfigureAwait(false); - break; - case WorkType.Cancel: - consumer.HandleBasicCancel(consumerTag); - break; - case WorkType.CancelOk: - consumer.HandleBasicCancelOk(consumerTag); - break; - case WorkType.ConsumeOk: - consumer.HandleBasicConsumeOk(consumerTag); - break; - case WorkType.Shutdown: - consumer.HandleChannelShutdown(_channel, work.Reason); - break; + IBasicConsumer consumer = work.Consumer; + string? consumerTag = work.ConsumerTag; + switch (work.WorkType) + { + case WorkType.Deliver: + await consumer.HandleBasicDeliverAsync( + consumerTag, work.DeliveryTag, work.Redelivered, + work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory) + .ConfigureAwait(false); + break; + case WorkType.Cancel: + consumer.HandleBasicCancel(consumerTag); + break; + case WorkType.CancelOk: + consumer.HandleBasicCancelOk(consumerTag); + break; + case WorkType.ConsumeOk: + consumer.HandleBasicConsumeOk(consumerTag); + break; + case WorkType.Shutdown: + consumer.HandleChannelShutdown(_channel, work.Reason); + break; + } + } + catch (Exception e) + { + _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); } - } - catch (Exception e) - { - _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); } } } } + catch (OperationCanceledException) + { + if (false == token.IsCancellationRequested) + { + throw; + } + } } } } diff --git a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs index b716af5ff..4017fb405 100644 --- a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs @@ -1,6 +1,5 @@ using System; -using System.Threading.Tasks; -using RabbitMQ.Client.Events; +using System.Linq; namespace RabbitMQ.Client.Impl { @@ -12,7 +11,7 @@ internal struct EventingWrapper private string? _context; private Action? _onExceptionAction; - public bool IsEmpty => _event is null; + public readonly bool IsEmpty => _event is null; public EventingWrapper(string context, Action onExceptionAction) { @@ -42,7 +41,7 @@ public void ClearHandlers() public void Invoke(object sender, T parameter) { - var handlers = _handlers; + Delegate[]? handlers = _handlers; if (handlers is null) { handlers = _event?.GetInvocationList(); @@ -53,7 +52,8 @@ public void Invoke(object sender, T parameter) _handlers = handlers; } - foreach (EventHandler action in handlers) + + foreach (EventHandler action in handlers.Cast>()) { try { @@ -61,7 +61,7 @@ public void Invoke(object sender, T parameter) } catch (Exception exception) { - var onException = _onExceptionAction; + Action? onException = _onExceptionAction; if (onException != null) { onException(exception, _context!); @@ -82,61 +82,4 @@ public void Takeover(in EventingWrapper other) _onExceptionAction = other._onExceptionAction; } } - - internal struct AsyncEventingWrapper - { - private event AsyncEventHandler? _event; - private Delegate[]? _handlers; - - public bool IsEmpty => _event is null; - - public void AddHandler(AsyncEventHandler? handler) - { - _event += handler; - _handlers = null; - } - - public void RemoveHandler(AsyncEventHandler? handler) - { - _event -= handler; - _handlers = null; - } - - // Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied) - public Task InvokeAsync(object sender, T parameter) - { - var handlers = _handlers; - if (handlers is null) - { - handlers = _event?.GetInvocationList(); - if (handlers is null) - { - return Task.CompletedTask; - } - - _handlers = handlers; - } - - if (handlers.Length == 1) - { - return ((AsyncEventHandler)handlers[0])(sender, parameter); - } - return InternalInvoke(handlers, sender, parameter); - } - - private static async Task InternalInvoke(Delegate[] handlers, object sender, T parameter) - { - foreach (AsyncEventHandler action in handlers) - { - await action(sender, parameter) - .ConfigureAwait(false); - } - } - - public void Takeover(in AsyncEventingWrapper other) - { - _event = other._event; - _handlers = other._handlers; - } - } } diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 938eb3000..62846d397 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -179,13 +179,13 @@ public virtual async Task DisposeAsync() { try { - if (_channel != null) + if (_conn != null && _conn.IsOpen) { - await _channel.CloseAsync(); - } + if (_channel != null && _channel.IsOpen) + { + await _channel.CloseAsync(); + } - if (_conn != null) - { await _conn.CloseAsync(); } } diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index 0718d8c28..e8eb2884e 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -202,6 +202,8 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) { using (IChannel publishingChannel = await publishingConn.CreateChannelAsync()) { + await publishingChannel.ConfirmSelectAsync(); + for (ushort i = 0; i < TotalMessageCount; i++) { if (i == CloseAtCount) @@ -210,6 +212,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) } await publishingChannel.BasicPublishAsync(string.Empty, queueName, _messageBody); + await publishingChannel.WaitForConfirmsOrDieAsync(); } await publishingChannel.CloseAsync(); diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 990f29f10..c76e9acd7 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; @@ -52,6 +53,7 @@ public TestAsyncConsumer(ITestOutputHelper output) public async Task TestBasicRoundtripConcurrent() { QueueDeclareOk q = await _channel.QueueDeclareAsync(); + string publish1 = GetUniqueString(1024); byte[] body = _encoding.GetBytes(publish1); await _channel.BasicPublishAsync("", q.QueueName, body); @@ -64,13 +66,12 @@ public async Task TestBasicRoundtripConcurrent() var publish1SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var publish2SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var maximumWaitTime = TimeSpan.FromSeconds(10); - var tokenSource = new CancellationTokenSource(maximumWaitTime); + var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + publish1SyncSource.TrySetCanceled(); + publish2SyncSource.TrySetCanceled(); }); try @@ -81,8 +82,7 @@ public async Task TestBasicRoundtripConcurrent() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } }); }; @@ -93,25 +93,28 @@ public async Task TestBasicRoundtripConcurrent() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } }); }; - consumer.Received += async (o, a) => + consumer.Received += (o, a) => { string decoded = _encoding.GetString(a.Body.ToArray()); if (decoded == publish1) { publish1SyncSource.TrySetResult(true); - await publish2SyncSource.Task; } else if (decoded == publish2) { publish2SyncSource.TrySetResult(true); - await publish1SyncSource.Task; } + else + { + var ex = new InvalidOperationException("incorrect message - should never happen!"); + SetException(ex, publish1SyncSource, publish2SyncSource); + } + return Task.CompletedTask; }; await _channel.BasicConsumeAsync(q.QueueName, true, string.Empty, false, false, null, consumer); @@ -120,15 +123,15 @@ public async Task TestBasicRoundtripConcurrent() await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); } finally { - tokenSource.Dispose(); ctsr.Dispose(); + tokenSource.Dispose(); } } @@ -136,7 +139,7 @@ public async Task TestBasicRoundtripConcurrent() public async Task TestBasicRoundtripConcurrentManyMessages() { const int publish_total = 4096; - string queueName = $"{nameof(TestBasicRoundtripConcurrentManyMessages)}-{Guid.NewGuid()}"; + string queueName = GenerateQueueName(); string publish1 = GetUniqueString(32768); byte[] body1 = _encoding.GetBytes(publish1); @@ -145,12 +148,14 @@ public async Task TestBasicRoundtripConcurrentManyMessages() var publish1SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var publish2SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var maximumWaitTime = TimeSpan.FromSeconds(30); - var tokenSource = new CancellationTokenSource(maximumWaitTime); + var consumerSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + publish1SyncSource.TrySetCanceled(); + publish2SyncSource.TrySetCanceled(); + consumerSyncSource.TrySetCanceled(); }); try @@ -161,8 +166,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } }); }; @@ -173,88 +177,146 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } }); }; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: queueName, exclusive: false, durable: true); - Assert.Equal(q, queueName); + QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: queueName, exclusive: false, autoDelete: true); + Assert.Equal(queueName, q.QueueName); Task publishTask = Task.Run(async () => { - using (IChannel publishChannel = await _conn.CreateChannelAsync()) + using (IConnection publishConn = await _connFactory.CreateConnectionAsync()) { - QueueDeclareOk pubQ = await publishChannel.QueueDeclareAsync(queue: queueName, exclusive: false, durable: true); - Assert.Equal(queueName, pubQ.QueueName); - for (int i = 0; i < publish_total; i++) + publishConn.ConnectionShutdown += (o, ea) => { - await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); - await publishChannel.BasicPublishAsync(string.Empty, queueName, body2); + HandleConnectionShutdown(publishConn, ea, (args) => + { + if (args.Initiator == ShutdownInitiator.Peer) + { + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); + } + }); + }; + using (IChannel publishChannel = await publishConn.CreateChannelAsync()) + { + publishChannel.ChannelShutdown += (o, ea) => + { + HandleChannelShutdown(publishChannel, ea, (args) => + { + if (args.Initiator == ShutdownInitiator.Peer) + { + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); + } + }); + }; + await publishChannel.ConfirmSelectAsync(); + + for (int i = 0; i < publish_total; i++) + { + await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); + await publishChannel.BasicPublishAsync(string.Empty, queueName, body2); + await publishChannel.WaitForConfirmsOrDieAsync(); + } + + await publishChannel.CloseAsync(); } - await publishChannel.CloseAsync(); + await publishConn.CloseAsync(); } }); Task consumeTask = Task.Run(async () => { - using (IChannel consumeChannel = await _conn.CreateChannelAsync()) + using (IConnection consumeConn = await _connFactory.CreateConnectionAsync()) { - var consumer = new AsyncEventingBasicConsumer(consumeChannel); - - int publish1_count = 0; - int publish2_count = 0; - - consumer.Received += async (o, a) => + consumeConn.ConnectionShutdown += (o, ea) => { - string decoded = _encoding.GetString(a.Body.ToArray()); - if (decoded == publish1) + HandleConnectionShutdown(consumeConn, ea, (args) => { - if (Interlocked.Increment(ref publish1_count) >= publish_total) + if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(true); - await publish2SyncSource.Task; + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); } - } - else if (decoded == publish2) + }); + }; + using (IChannel consumeChannel = await consumeConn.CreateChannelAsync()) + { + consumeChannel.ChannelShutdown += (o, ea) => { - if (Interlocked.Increment(ref publish2_count) >= publish_total) + HandleChannelShutdown(consumeChannel, ea, (args) => { - publish2SyncSource.TrySetResult(true); - await publish1SyncSource.Task; - } - } - }; + if (args.Initiator == ShutdownInitiator.Peer) + { + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); + } + }); + }; - await consumeChannel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer); + var consumer = new AsyncEventingBasicConsumer(consumeChannel); - // ensure we get a delivery - await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); + int publish1_count = 0; + int publish2_count = 0; - bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + consumer.Received += (o, a) => + { + string decoded = _encoding.GetString(a.Body.ToArray()); + if (decoded == publish1) + { + if (Interlocked.Increment(ref publish1_count) >= publish_total) + { + publish1SyncSource.TrySetResult(true); + } + } + else if (decoded == publish2) + { + if (Interlocked.Increment(ref publish2_count) >= publish_total) + { + publish2SyncSource.TrySetResult(true); + } + } + else + { + var ex = new InvalidOperationException("incorrect message - should never happen!"); + SetException(ex, publish1SyncSource, publish2SyncSource); + } + return Task.CompletedTask; + }; - bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + await consumeChannel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer); + await consumerSyncSource.Task; + + await consumeChannel.CloseAsync(); + } - await consumeChannel.CloseAsync(); + await consumeConn.CloseAsync(); } }); - await AssertRanToCompletion(publishTask, consumeTask); + await AssertRanToCompletion(publishTask); + + await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); + consumerSyncSource.TrySetResult(true); + + bool result1 = await publish1SyncSource.Task; + Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + + bool result2 = await publish2SyncSource.Task; + Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); } finally { - tokenSource.Dispose(); ctsr.Dispose(); + tokenSource.Dispose(); } } [Fact] public async Task TestBasicRejectAsync() { + string queueName = GenerateQueueName(); + var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cancellationTokenSource = new CancellationTokenSource(TestTimeout); CancellationTokenRegistration ctsr = cancellationTokenSource.Token.Register(() => @@ -270,7 +332,7 @@ public async Task TestBasicRejectAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -279,10 +341,7 @@ public async Task TestBasicRejectAsync() { HandleChannelShutdown(_channel, ea, (args) => { - if (args.Initiator == ShutdownInitiator.Peer) - { - publishSyncSource.TrySetResult(false); - } + MaybeSetException(ea, publishSyncSource); }); }; @@ -307,9 +366,8 @@ public async Task TestBasicRejectAsync() publishSyncSource.TrySetResult(true); }; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty, - durable: false, exclusive: true, autoDelete: false); - string queueName = q.QueueName; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName, false, false, false); + const string publish1 = "sync-hi-1"; byte[] _body = _encoding.GetBytes(publish1); await _channel.BasicPublishAsync(string.Empty, queueName, _body); @@ -349,14 +407,17 @@ public async Task TestBasicRejectAsync() } finally { - cancellationTokenSource.Dispose(); + await _channel.QueueDeleteAsync(queue: queueName); ctsr.Dispose(); + cancellationTokenSource.Dispose(); } } [Fact] public async Task TestBasicAckAsync() { + string queueName = GenerateQueueName(); + const int messageCount = 1024; int messagesReceived = 0; @@ -368,7 +429,7 @@ public async Task TestBasicAckAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -379,7 +440,7 @@ public async Task TestBasicAckAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -399,15 +460,14 @@ public async Task TestBasicAckAsync() } }; - QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true); - string queueName = q.QueueName; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName, false, false, true); await _channel.BasicQosAsync(0, 1, false); await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumerTag: string.Empty, noLocal: false, exclusive: false, arguments: null, consumer); - var publishTask = Task.Run(async () => + Task publishTask = Task.Run(async () => { for (int i = 0; i < messageCount; i++) { @@ -415,10 +475,14 @@ public async Task TestBasicAckAsync() await _channel.BasicPublishAsync(string.Empty, queueName, _body); await _channel.WaitForConfirmsOrDieAsync(); } + + return true; }); + Assert.True(await publishTask); Assert.True(await publishSyncSource.Task); Assert.Equal(messageCount, messagesReceived); + await _channel.QueueDeleteAsync(queue: queueName); await _channel.CloseAsync(_closeArgs, false, CancellationToken.None); } @@ -433,7 +497,7 @@ public async Task TestBasicNackAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -444,7 +508,7 @@ public async Task TestBasicNackAsync() { if (args.Initiator == ShutdownInitiator.Peer) { - publishSyncSource.TrySetResult(false); + MaybeSetException(ea, publishSyncSource); } }); }; @@ -523,15 +587,45 @@ public async Task NonAsyncConsumerShouldThrowInvalidOperationException() public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() { AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); - for (int i = 0; i < 1000; i++) + var tasks = new List(); + for (int i = 0; i < 256; i++) { - string q = Guid.NewGuid().ToString(); - await _channel.QueueDeclareAsync(q, false, false, true); - var dummy = new AsyncEventingBasicConsumer(_channel); - string tag = await _channel.BasicConsumeAsync(q, true, dummy); - await _channel.BasicCancelAsync(tag); + tasks.Add(Task.Run(async () => + { + string q = GenerateQueueName(); + await _channel.QueueDeclareAsync(q, false, false, true); + var dummy = new AsyncEventingBasicConsumer(_channel); + string tag = await _channel.BasicConsumeAsync(q, true, dummy); + await _channel.BasicCancelAsync(tag); + })); } + await Task.WhenAll(tasks); AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); } + + private static void SetException(Exception ex, params TaskCompletionSource[] tcsAry) + { + foreach (TaskCompletionSource tcs in tcsAry) + { + tcs.TrySetException(ex); + } + } + + private static void MaybeSetException(ShutdownEventArgs ea, params TaskCompletionSource[] tcsAry) + { + foreach (TaskCompletionSource tcs in tcsAry) + { + MaybeSetException(ea, tcs); + } + } + + private static void MaybeSetException(ShutdownEventArgs ea, TaskCompletionSource tcs) + { + if (ea.Initiator == ShutdownInitiator.Peer) + { + Exception ex = ea.Exception ?? new Exception(ea.ReplyText); + tcs.TrySetException(ex); + } + } } } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index 8d2edccc0..5b3ff5729 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -70,12 +70,14 @@ private async Task TestConcurrentChannelOperationsAsync(Func var tasks = new List(); for (int i = 0; i < _processorCount; i++) { - tasks.Add(Task.Run(async () => + tasks.Add(Task.Run(() => { + var subTasks = new List(); for (int j = 0; j < iterations; j++) { - await action(_conn); + subTasks.Add(action(_conn)); } + return Task.WhenAll(subTasks); })); } diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index 2f3dce5f5..b0d88c80b 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -100,13 +100,19 @@ public async Task TestDisposedWithSocketClosedOutOfBand() }; var c = (AutorecoveringConnection)_conn; - await c.CloseFrameHandlerAsync(); + Task frameHandlerCloseTask = c.CloseFrameHandlerAsync(); - _conn.Dispose(); - _conn = null; - - TimeSpan waitSpan = TimeSpan.FromSeconds(10); - await WaitAsync(tcs, waitSpan, "channel shutdown"); + try + { + _conn.Dispose(); + await WaitAsync(tcs, WaitSpan, "channel shutdown"); + await frameHandlerCloseTask.WaitAsync(WaitSpan); + } + finally + { + _conn = null; + _channel = null; + } } [Fact]