From a8825c67f5d7eb4b7dd706a57899b40dc0413d97 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 23 May 2024 13:15:43 -0700 Subject: [PATCH] Fix two flaky tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These two tests always fail at the same time, too 🤔 * Re-try when using HTTP API to close a connection and an error happens * Introduce `ShortSpan` wait span. * Ensure that message is actually routed and confirmed before checking. * Limit client provided name to 3000 characters * Add callback exception handlers to flaky tests * Check for null consumer tag based on CI failures. * No need for random data, just different data. Update projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs Co-authored-by: Paulo Morgado <470455+paulomorgado@users.noreply.github.com> --- .../client/api/InternalConstants.cs | 2 +- .../client/impl/AutorecoveringChannel.cs | 2 +- .../AutorecoveringConnection.Recording.cs | 6 + .../ConsumerDispatcherBase.cs | 2 +- projects/Test/Common/IntegrationFixture.cs | 71 ++++---- projects/Test/Common/Util.cs | 27 +-- .../TestExchangeRecovery.cs | 5 +- .../Test/Integration/TestAsyncConsumer.cs | 166 ++++++++++++++---- .../TestAsyncConsumerExceptions.cs | 5 +- projects/Test/Integration/TestBasicPublish.cs | 32 +++- .../Test/Integration/TestConnectionFactory.cs | 11 +- .../TestConsumerOperationDispatch.cs | 4 +- .../Test/Integration/TestPublisherConfirms.cs | 15 +- 13 files changed, 234 insertions(+), 114 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/InternalConstants.cs b/projects/RabbitMQ.Client/client/api/InternalConstants.cs index e1c037a727..ac611e4090 100644 --- a/projects/RabbitMQ.Client/client/api/InternalConstants.cs +++ b/projects/RabbitMQ.Client/client/api/InternalConstants.cs @@ -50,6 +50,6 @@ internal static class InternalConstants /// This is not configurable, but was discovered while working on this issue: /// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/980 /// - internal const int DefaultRabbitMqMaxClientProvideNameLength = 3652; + internal const int DefaultRabbitMqMaxClientProvideNameLength = 3000; } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 317099a078..879ac51b38 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -279,7 +279,7 @@ public async Task BasicConsumeAsync(string queue, bool autoAck, string c { string resultConsumerTag = await InnerChannel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer, cancellationToken) - .ConfigureAwait(false); + .ConfigureAwait(false) ?? throw new InvalidOperationException("basic.consume returned null consumer tag"); var rc = new RecordedConsumer(channel: this, consumer: consumer, consumerTag: resultConsumerTag, queue: queue, autoAck: autoAck, exclusive: exclusive, arguments: arguments); await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs index ca08d4c027..8a873749c9 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs @@ -29,6 +29,7 @@ // Copyright (c) 2007-2020 VMware, Inc. All rights reserved. //--------------------------------------------------------------------------- +using System; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -400,6 +401,11 @@ await _recordedEntitiesSemaphore.WaitAsync() private void DoDeleteRecordedConsumer(string consumerTag) { + if (consumerTag is null) + { + throw new ArgumentNullException(nameof(consumerTag)); + } + if (_recordedConsumers.Remove(consumerTag, out RecordedConsumer recordedConsumer)) { DeleteAutoDeleteQueue(recordedConsumer.Queue); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs index 1ee8f96a1e..41960fb5e1 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs @@ -10,7 +10,7 @@ namespace RabbitMQ.Client.ConsumerDispatching internal abstract class ConsumerDispatcherBase { private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer(); - private readonly IDictionary _consumers = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _consumers = new ConcurrentDictionary(); public IBasicConsumer? DefaultConsumer { get; set; } diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 57a17fde6f..b27de42eb8 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -75,6 +75,7 @@ public abstract class IntegrationFixture : IAsyncLifetime protected readonly ushort _consumerDispatchConcurrency = 1; protected readonly bool _openChannel = true; + public static readonly TimeSpan ShortSpan; public static readonly TimeSpan WaitSpan; public static readonly TimeSpan LongWaitSpan; public static readonly TimeSpan RecoveryInterval = TimeSpan.FromSeconds(2); @@ -95,12 +96,14 @@ static IntegrationFixture() if (s_isRunningInCI) { + ShortSpan = TimeSpan.FromSeconds(20); WaitSpan = TimeSpan.FromSeconds(60); LongWaitSpan = TimeSpan.FromSeconds(120); RequestedConnectionTimeout = TimeSpan.FromSeconds(4); } else { + ShortSpan = TimeSpan.FromSeconds(10); WaitSpan = TimeSpan.FromSeconds(30); LongWaitSpan = TimeSpan.FromSeconds(60); } @@ -160,9 +163,8 @@ public virtual async Task InitializeAsync() if (IsVerbose) { AddCallbackShutdownHandlers(); + AddCallbackExceptionHandlers(); } - - AddCallbackExceptionHandlers(); } if (_connFactory.AutomaticRecoveryEnabled) @@ -221,59 +223,55 @@ protected virtual void DisposeAssertions() protected void AddCallbackExceptionHandlers() { - if (_conn != null) + AddCallbackExceptionHandlers(_conn, _channel); + } + + protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel) + { + if (conn != null) { - _conn.ConnectionRecoveryError += (s, ea) => + conn.ConnectionRecoveryError += (s, ea) => { _connectionRecoveryException = ea.Exception; - if (IsVerbose) + try + { + _output.WriteLine($"{0} connection recovery exception: {1}", + _testDisplayName, _connectionRecoveryException); + } + catch (InvalidOperationException) { - try - { - _output.WriteLine($"{0} connection recovery exception: {1}", - _testDisplayName, _connectionRecoveryException); - } - catch (InvalidOperationException) - { - } } }; - _conn.CallbackException += (o, ea) => + conn.CallbackException += (o, ea) => { _connectionCallbackException = ea.Exception; - if (IsVerbose) + try + { + _output.WriteLine("{0} connection callback exception: {1}", + _testDisplayName, _connectionCallbackException); + } + catch (InvalidOperationException) { - try - { - _output.WriteLine("{0} connection callback exception: {1}", - _testDisplayName, _connectionCallbackException); - } - catch (InvalidOperationException) - { - } } }; } - if (_channel != null) + if (channel != null) { - _channel.CallbackException += (o, ea) => + channel.CallbackException += (o, ea) => { _channelCallbackException = ea.Exception; - if (IsVerbose) + try + { + _output.WriteLine("{0} channel callback exception: {1}", + _testDisplayName, _channelCallbackException); + } + catch (InvalidOperationException) { - try - { - _output.WriteLine("{0} channel callback exception: {1}", - _testDisplayName, _channelCallbackException); - } - catch (InvalidOperationException) - { - } } }; } @@ -491,11 +489,6 @@ protected static void AssertPreconditionFailed(ShutdownEventArgs args) AssertShutdownError(args, Constants.PreconditionFailed); } - protected static Task AssertRanToCompletion(params Task[] tasks) - { - return DoAssertRanToCompletion(tasks); - } - protected static Task AssertRanToCompletion(IEnumerable tasks) { return DoAssertRanToCompletion(tasks); diff --git a/projects/Test/Common/Util.cs b/projects/Test/Common/Util.cs index a206744ca9..e9afa4edfd 100644 --- a/projects/Test/Common/Util.cs +++ b/projects/Test/Common/Util.cs @@ -67,29 +67,32 @@ public static async Task CloseConnectionAsync(IConnection conn) connectionToClose = connections.Where(c0 => string.Equals((string)c0.ClientProperties["connection_name"], conn.ClientProvidedName, StringComparison.InvariantCultureIgnoreCase)).FirstOrDefault(); - - if (connectionToClose == null) - { - tries++; - } - else - { - break; - } } catch (ArgumentNullException) { // Sometimes we see this in GitHub CI tries++; + continue; } - } while (tries <= 30); + + if (connectionToClose != null) + { + try + { + await s_managementClient.CloseConnectionAsync(connectionToClose); + return; + } + catch (UnexpectedHttpStatusCodeException) + { + tries++; + } + } + } while (tries <= 10); if (connectionToClose == null) { throw new InvalidOperationException($"Could not delete connection: '{conn.ClientProvidedName}'"); } - - await s_managementClient.CloseConnectionAsync(connectionToClose); } } } diff --git a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs index 21dab6f342..d989fca6f1 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs @@ -55,6 +55,8 @@ public async Task TestExchangeRecoveryTest() [Fact] public async Task TestExchangeToExchangeBindingRecovery() { + await _channel.ConfirmSelectAsync(); + string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; string ex_source = GenerateExchangeName(); @@ -70,7 +72,8 @@ public async Task TestExchangeToExchangeBindingRecovery() { await CloseAndWaitForRecoveryAsync(); Assert.True(_channel.IsOpen); - await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg")); + await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"), mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); await AssertMessageCountAsync(q, 1); } finally diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 7a74712cad..f41a9acf3f 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -52,15 +52,16 @@ public TestAsyncConsumer(ITestOutputHelper output) [Fact] public async Task TestBasicRoundtripConcurrent() { + AddCallbackExceptionHandlers(); + _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); + QueueDeclareOk q = await _channel.QueueDeclareAsync(); - string publish1 = GetUniqueString(512); - byte[] body = _encoding.GetBytes(publish1); - await _channel.BasicPublishAsync("", q.QueueName, body); + const int length = 4096; + (byte[] body1, byte[] body2) = GenerateTwoBodies(length); - string publish2 = GetUniqueString(512); - body = _encoding.GetBytes(publish2); - await _channel.BasicPublishAsync("", q.QueueName, body); + await _channel.BasicPublishAsync("", q.QueueName, body1); + await _channel.BasicPublishAsync("", q.QueueName, body2); var consumer = new AsyncEventingBasicConsumer(_channel); @@ -70,10 +71,14 @@ public async Task TestBasicRoundtripConcurrent() var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { + _output.WriteLine("publish1SyncSource.Task Status: {0}", publish1SyncSource.Task.Status); + _output.WriteLine("publish2SyncSource.Task Status: {0}", publish2SyncSource.Task.Status); publish1SyncSource.TrySetCanceled(); publish2SyncSource.TrySetCanceled(); }); + bool body1Received = false; + bool body2Received = false; try { _conn.ConnectionShutdown += (o, ea) => @@ -94,13 +99,14 @@ public async Task TestBasicRoundtripConcurrent() consumer.Received += (o, a) => { - string decoded = _encoding.GetString(a.Body.ToArray()); - if (decoded == publish1) + if (ByteArraysEqual(a.Body.ToArray(), body1)) { + body1Received = true; publish1SyncSource.TrySetResult(true); } - else if (decoded == publish2) + else if (ByteArraysEqual(a.Body.ToArray(), body2)) { + body2Received = true; publish2SyncSource.TrySetResult(true); } else @@ -113,14 +119,21 @@ public async Task TestBasicRoundtripConcurrent() await _channel.BasicConsumeAsync(q.QueueName, true, string.Empty, false, false, null, consumer); - // ensure we get a delivery - await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); - - bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); - - bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); + try + { + bool result1 = await publish1SyncSource.Task; + Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); + bool result2 = await publish2SyncSource.Task; + Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); + } + catch (Exception ex) + { + _output.WriteLine("EXCEPTION: {0}", ex); + _output.WriteLine("body1Received: {0}, body2Received: {1}", body1Received, body2Received); + _output.WriteLine("publish1SyncSource.Task Status: {0}", publish1SyncSource.Task.Status); + _output.WriteLine("publish2SyncSource.Task Status: {0}", publish2SyncSource.Task.Status); + throw; + } } finally { @@ -132,13 +145,14 @@ public async Task TestBasicRoundtripConcurrent() [Fact] public async Task TestBasicRoundtripConcurrentManyMessages() { + AddCallbackExceptionHandlers(); + _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); + const int publish_total = 4096; + const int length = 512; string queueName = GenerateQueueName(); - string publish1 = GetUniqueString(512); - byte[] body1 = _encoding.GetBytes(publish1); - string publish2 = GetUniqueString(512); - byte[] body2 = _encoding.GetBytes(publish2); + (byte[] body1, byte[] body2) = GenerateTwoBodies(length); var publish1SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var publish2SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -147,6 +161,9 @@ public async Task TestBasicRoundtripConcurrentManyMessages() var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { + _output.WriteLine("publish1SyncSource.Task Status: {0}", publish1SyncSource.Task.Status); + _output.WriteLine("publish2SyncSource.Task Status: {0}", publish2SyncSource.Task.Status); + _output.WriteLine("consumerSyncSource.Task Status: {0}", consumerSyncSource.Task.Status); publish1SyncSource.TrySetCanceled(); publish2SyncSource.TrySetCanceled(); consumerSyncSource.TrySetCanceled(); @@ -186,6 +203,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }; using (IChannel publishChannel = await publishConn.CreateChannelAsync()) { + AddCallbackExceptionHandlers(publishConn, publishChannel); + publishChannel.DefaultConsumer = new DefaultAsyncConsumer("publishChannel,", _output); publishChannel.ChannelShutdown += (o, ea) => { HandleChannelShutdown(publishChannel, ea, (args) => @@ -209,6 +228,10 @@ public async Task TestBasicRoundtripConcurrentManyMessages() } }); + + int publish1_count = 0; + int publish2_count = 0; + Task consumeTask = Task.Run(async () => { using (IConnection consumeConn = await _connFactory.CreateConnectionAsync()) @@ -222,6 +245,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }; using (IChannel consumeChannel = await consumeConn.CreateChannelAsync()) { + AddCallbackExceptionHandlers(consumeConn, consumeChannel); + consumeChannel.DefaultConsumer = new DefaultAsyncConsumer("consumeChannel,", _output); consumeChannel.ChannelShutdown += (o, ea) => { HandleChannelShutdown(consumeChannel, ea, (args) => @@ -231,21 +256,16 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }; var consumer = new AsyncEventingBasicConsumer(consumeChannel); - - int publish1_count = 0; - int publish2_count = 0; - consumer.Received += (o, a) => { - string decoded = _encoding.GetString(a.Body.ToArray()); - if (decoded == publish1) + if (ByteArraysEqual(a.Body.ToArray(), body1)) { if (Interlocked.Increment(ref publish1_count) >= publish_total) { publish1SyncSource.TrySetResult(true); } } - else if (decoded == publish2) + else if (ByteArraysEqual(a.Body.ToArray(), body2)) { if (Interlocked.Increment(ref publish2_count) >= publish_total) { @@ -270,19 +290,28 @@ public async Task TestBasicRoundtripConcurrentManyMessages() } }); - await AssertRanToCompletion(publishTask); - - await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); - consumerSyncSource.TrySetResult(true); - - bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); - - bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + try + { + await publishTask; + bool result1 = await publish1SyncSource.Task; + Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + bool result2 = await publish2SyncSource.Task; + Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); + } + catch (Exception ex) + { + _output.WriteLine("EXCEPTION: {0}", ex); + _output.WriteLine("publish1_count: {0}, publish2_count: {1}", publish1_count, publish2_count); + _output.WriteLine("publishTask Status: {0}", publishTask.Status); + _output.WriteLine("publish1SyncSource.Task Status: {0}", publish1SyncSource.Task.Status); + _output.WriteLine("publish2SyncSource.Task Status: {0}", publish2SyncSource.Task.Status); + _output.WriteLine("consumerSyncSource.Task Status: {0}", consumerSyncSource.Task.Status); + throw; + } } finally { + consumerSyncSource.TrySetResult(true); ctsr.Dispose(); tokenSource.Dispose(); } @@ -658,5 +687,66 @@ private static void MaybeSetException(ShutdownEventArgs args, TaskCompletionSour tcs.TrySetException(ex); } } + + private static bool ByteArraysEqual(ReadOnlySpan a1, ReadOnlySpan a2) + { + return a1.SequenceEqual(a2); + } + + private static (byte[] body1, byte[] body2) GenerateTwoBodies(ushort length) + { + byte[] body1 = _encoding.GetBytes(new string('x', length)); + byte[] body2 = _encoding.GetBytes(new string('y', length)); + return (body1, body2); + } + + private class DefaultAsyncConsumer : AsyncDefaultBasicConsumer + { + private readonly string _logPrefix; + private readonly ITestOutputHelper _output; + + public DefaultAsyncConsumer(string logPrefix, ITestOutputHelper output) + { + _logPrefix = logPrefix; + _output = output; + } + + public override Task HandleBasicCancel(string consumerTag) + { + _output.WriteLine("[ERROR] {0} HandleBasicCancel {1}", _logPrefix, consumerTag); + return base.HandleBasicCancel(consumerTag); + } + + public override Task HandleBasicCancelOk(string consumerTag) + { + _output.WriteLine("[ERROR] {0} HandleBasicCancelOk {1}", _logPrefix, consumerTag); + return base.HandleBasicCancelOk(consumerTag); + } + + public override Task HandleBasicConsumeOk(string consumerTag) + { + _output.WriteLine("[ERROR] {0} HandleBasicConsumeOk {1}", _logPrefix, consumerTag); + return base.HandleBasicConsumeOk(consumerTag); + } + + public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, + string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + { + _output.WriteLine("[ERROR] {0} HandleBasicDeliver {1}", _logPrefix, consumerTag); + return base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + } + + public override Task HandleChannelShutdown(object channel, ShutdownEventArgs reason) + { + _output.WriteLine("[ERROR] {0} HandleChannelShutdown", _logPrefix); + return base.HandleChannelShutdown(channel, reason); + } + + public override Task OnCancel(params string[] consumerTags) + { + _output.WriteLine("[ERROR] {0} OnCancel {1}", _logPrefix, consumerTags[0]); + return base.OnCancel(consumerTags); + } + } } } diff --git a/projects/Test/Integration/TestAsyncConsumerExceptions.cs b/projects/Test/Integration/TestAsyncConsumerExceptions.cs index ee1c8e7579..9262474d09 100644 --- a/projects/Test/Integration/TestAsyncConsumerExceptions.cs +++ b/projects/Test/Integration/TestAsyncConsumerExceptions.cs @@ -97,10 +97,9 @@ public Task TestDeliveryExceptionHandling() protected async Task TestExceptionHandlingWith(IBasicConsumer consumer, Func action) { - var waitSpan = TimeSpan.FromSeconds(5); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var cts = new CancellationTokenSource(waitSpan); - CancellationTokenRegistration ctsr = cts.Token.Register(() => tcs.TrySetResult(false)); + var cts = new CancellationTokenSource(ShortSpan); + CancellationTokenRegistration ctsr = cts.Token.Register(() => tcs.TrySetCanceled()); try { string q = await _channel.QueueDeclareAsync(string.Empty, false, true, false); diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 1578a9f277..0fa55617b9 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -196,18 +196,18 @@ public async Task TestMaxInboundMessageBodySize() bool sawConsumerRegistered = false; bool sawConsumerCancelled = false; - using (IConnection c = await cf.CreateConnectionAsync()) + using (IConnection conn = await cf.CreateConnectionAsync()) { - c.ConnectionShutdown += (o, a) => + conn.ConnectionShutdown += (o, a) => { sawConnectionShutdown = true; }; Assert.Equal(maxMsgSize, cf.MaxInboundMessageBodySize); Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize); - Assert.Equal(maxMsgSize, c.Endpoint.MaxInboundMessageBodySize); + Assert.Equal(maxMsgSize, conn.Endpoint.MaxInboundMessageBodySize); - using (IChannel channel = await c.CreateChannelAsync()) + using (IChannel channel = await conn.CreateChannelAsync()) { channel.ChannelShutdown += (o, a) => { @@ -260,7 +260,29 @@ public async Task TestMaxInboundMessageBodySize() Assert.True(sawConsumerRegistered); Assert.True(sawConsumerCancelled); - await channel.CloseAsync(); + try + { + await channel.CloseAsync(); + } + catch (Exception chex) + { + if (IsVerbose) + { + _output.WriteLine("[INFO] {0} channel exception: {1}", nameof(TestMaxInboundMessageBodySize), chex); + } + } + } + + try + { + await conn.CloseAsync(); + } + catch (Exception connex) + { + if (IsVerbose) + { + _output.WriteLine("[INFO] {0} conn exception: {1}", nameof(TestMaxInboundMessageBodySize), connex); + } } } } diff --git a/projects/Test/Integration/TestConnectionFactory.cs b/projects/Test/Integration/TestConnectionFactory.cs index ef692f99e5..6a8d383912 100644 --- a/projects/Test/Integration/TestConnectionFactory.cs +++ b/projects/Test/Integration/TestConnectionFactory.cs @@ -442,11 +442,12 @@ public async Task TestCreateConnectionAsync_UsesValidEndpointWhenMultipleSupplie } [Theory] - [InlineData(3650)] - [InlineData(3651)] - [InlineData(3652)] - [InlineData(3653)] - [InlineData(3654)] + [InlineData(2998)] + [InlineData(2999)] + [InlineData(3000)] + [InlineData(3001)] + [InlineData(3002)] + [InlineData(3003)] public async Task TestCreateConnectionAsync_TruncatesWhenClientNameIsLong_GH980(ushort count) { string cpn = GetUniqueString(count); diff --git a/projects/Test/Integration/TestConsumerOperationDispatch.cs b/projects/Test/Integration/TestConsumerOperationDispatch.cs index 1ac54195d5..264a300440 100644 --- a/projects/Test/Integration/TestConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestConsumerOperationDispatch.cs @@ -213,12 +213,12 @@ public async Task TestChannelShutdownHandler() await _channel.BasicConsumeAsync(queue: q, autoAck: true, consumer: c); await _channel.CloseAsync(); - await c.Latch.Task.WaitAsync(TimeSpan.FromSeconds(10)); + await c.Latch.Task.WaitAsync(ShortSpan); Assert.True(c.Latch.Task.IsCompletedSuccessfully()); await Assert.ThrowsAsync(() => { - return c.DuplicateLatch.Task.WaitAsync(TimeSpan.FromSeconds(5)); + return c.DuplicateLatch.Task.WaitAsync(ShortSpan); }); Assert.False(c.DuplicateLatch.Task.IsCompletedSuccessfully()); diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 4b24d7c997..5afd88f3fb 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -111,7 +111,7 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout .GetMethod("HandleAckNack", BindingFlags.Instance | BindingFlags.NonPublic) .Invoke(actualChannel, new object[] { 10UL, false, true }); - using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4))) + using (var cts = new CancellationTokenSource(ShortSpan)) { Assert.False(await ch.WaitForConfirmsAsync(cts.Token)); } @@ -121,11 +121,12 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout [Fact] public async Task TestWaitForConfirmsWithEventsAsync() { - string queueName = string.Format("{0}:{1}", _testDisplayName, Guid.NewGuid()); + string queueName = GenerateQueueName(); using (IChannel ch = await _conn.CreateChannelAsync()) { await ch.ConfirmSelectAsync(); - await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null); + await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, + exclusive: true, autoDelete: false, arguments: null); int n = 200; // number of event handler invocations @@ -161,17 +162,19 @@ public async Task TestWaitForConfirmsWithEventsAsync() private async Task TestWaitForConfirmsAsync(int numberOfMessagesToPublish, Func fn) { - string queueName = string.Format("{0}:{1}", _testDisplayName, Guid.NewGuid()); + string queueName = GenerateQueueName(); using (IChannel ch = await _conn.CreateChannelAsync()) { var props = new BasicProperties { Persistent = true }; await ch.ConfirmSelectAsync(); - await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null); + await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, + exclusive: true, autoDelete: false, arguments: null); for (int i = 0; i < numberOfMessagesToPublish; i++) { - await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: _messageBody, mandatory: true, basicProperties: props); + await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, + body: _messageBody, mandatory: true, basicProperties: props); } try