Skip to content

Commit

Permalink
Merge pull request #1579 from rabbitmq/lukebakken/fix-basic-roundtrip…
Browse files Browse the repository at this point in the history
…-tests

Fix two flaky tests
  • Loading branch information
lukebakken committed May 25, 2024
2 parents 5da0720 + a8825c6 commit 7bea841
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 114 deletions.
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/InternalConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ internal static class InternalConstants
/// This is not configurable, but was discovered while working on this issue:
/// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/980
/// </summary>
internal const int DefaultRabbitMqMaxClientProvideNameLength = 3652;
internal const int DefaultRabbitMqMaxClientProvideNameLength = 3000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ await _innerChannel.BasicCancelAsync(consumerTag, noWait, cancellationToken)
{
string resultConsumerTag = await InnerChannel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal,
exclusive, arguments, consumer, cancellationToken)
.ConfigureAwait(false);
.ConfigureAwait(false) ?? throw new InvalidOperationException("basic.consume returned null consumer tag");
var rc = new RecordedConsumer(channel: this, consumer: consumer, consumerTag: resultConsumerTag,
queue: queue, autoAck: autoAck, exclusive: exclusive, arguments: arguments);
await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -400,6 +401,11 @@ await _recordedEntitiesSemaphore.WaitAsync()

private void DoDeleteRecordedConsumer(string consumerTag)
{
if (consumerTag is null)
{
throw new ArgumentNullException(nameof(consumerTag));
}

if (_recordedConsumers.Remove(consumerTag, out RecordedConsumer recordedConsumer))
{
DeleteAutoDeleteQueue(recordedConsumer.Queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
internal abstract class ConsumerDispatcherBase
{
private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer();
private readonly IDictionary<string, IBasicConsumer> _consumers = new ConcurrentDictionary<string, IBasicConsumer>();
private readonly ConcurrentDictionary<string, IBasicConsumer> _consumers = new ConcurrentDictionary<string, IBasicConsumer>();

public IBasicConsumer? DefaultConsumer { get; set; }

Expand Down
71 changes: 32 additions & 39 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public abstract class IntegrationFixture : IAsyncLifetime
protected readonly ushort _consumerDispatchConcurrency = 1;
protected readonly bool _openChannel = true;

public static readonly TimeSpan ShortSpan;
public static readonly TimeSpan WaitSpan;
public static readonly TimeSpan LongWaitSpan;
public static readonly TimeSpan RecoveryInterval = TimeSpan.FromSeconds(2);
Expand All @@ -95,12 +96,14 @@ static IntegrationFixture()

if (s_isRunningInCI)
{
ShortSpan = TimeSpan.FromSeconds(20);
WaitSpan = TimeSpan.FromSeconds(60);
LongWaitSpan = TimeSpan.FromSeconds(120);
RequestedConnectionTimeout = TimeSpan.FromSeconds(4);
}
else
{
ShortSpan = TimeSpan.FromSeconds(10);
WaitSpan = TimeSpan.FromSeconds(30);
LongWaitSpan = TimeSpan.FromSeconds(60);
}
Expand Down Expand Up @@ -160,9 +163,8 @@ public virtual async Task InitializeAsync()
if (IsVerbose)
{
AddCallbackShutdownHandlers();
AddCallbackExceptionHandlers();
}

AddCallbackExceptionHandlers();
}

if (_connFactory.AutomaticRecoveryEnabled)
Expand Down Expand Up @@ -221,59 +223,55 @@ protected virtual void DisposeAssertions()

protected void AddCallbackExceptionHandlers()
{
if (_conn != null)
AddCallbackExceptionHandlers(_conn, _channel);
}

protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel)
{
if (conn != null)
{
_conn.ConnectionRecoveryError += (s, ea) =>
conn.ConnectionRecoveryError += (s, ea) =>
{
_connectionRecoveryException = ea.Exception;
if (IsVerbose)
try
{
_output.WriteLine($"{0} connection recovery exception: {1}",
_testDisplayName, _connectionRecoveryException);
}
catch (InvalidOperationException)
{
try
{
_output.WriteLine($"{0} connection recovery exception: {1}",
_testDisplayName, _connectionRecoveryException);
}
catch (InvalidOperationException)
{
}
}
};

_conn.CallbackException += (o, ea) =>
conn.CallbackException += (o, ea) =>
{
_connectionCallbackException = ea.Exception;
if (IsVerbose)
try
{
_output.WriteLine("{0} connection callback exception: {1}",
_testDisplayName, _connectionCallbackException);
}
catch (InvalidOperationException)
{
try
{
_output.WriteLine("{0} connection callback exception: {1}",
_testDisplayName, _connectionCallbackException);
}
catch (InvalidOperationException)
{
}
}
};
}

if (_channel != null)
if (channel != null)
{
_channel.CallbackException += (o, ea) =>
channel.CallbackException += (o, ea) =>
{
_channelCallbackException = ea.Exception;
if (IsVerbose)
try
{
_output.WriteLine("{0} channel callback exception: {1}",
_testDisplayName, _channelCallbackException);
}
catch (InvalidOperationException)
{
try
{
_output.WriteLine("{0} channel callback exception: {1}",
_testDisplayName, _channelCallbackException);
}
catch (InvalidOperationException)
{
}
}
};
}
Expand Down Expand Up @@ -491,11 +489,6 @@ protected static void AssertPreconditionFailed(ShutdownEventArgs args)
AssertShutdownError(args, Constants.PreconditionFailed);
}

protected static Task AssertRanToCompletion(params Task[] tasks)
{
return DoAssertRanToCompletion(tasks);
}

protected static Task AssertRanToCompletion(IEnumerable<Task> tasks)
{
return DoAssertRanToCompletion(tasks);
Expand Down
27 changes: 15 additions & 12 deletions projects/Test/Common/Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,32 @@ public static async Task CloseConnectionAsync(IConnection conn)
connectionToClose = connections.Where(c0 =>
string.Equals((string)c0.ClientProperties["connection_name"], conn.ClientProvidedName,
StringComparison.InvariantCultureIgnoreCase)).FirstOrDefault();

if (connectionToClose == null)
{
tries++;
}
else
{
break;
}
}
catch (ArgumentNullException)
{
// Sometimes we see this in GitHub CI
tries++;
continue;
}
} while (tries <= 30);

if (connectionToClose != null)
{
try
{
await s_managementClient.CloseConnectionAsync(connectionToClose);
return;
}
catch (UnexpectedHttpStatusCodeException)
{
tries++;
}
}
} while (tries <= 10);

if (connectionToClose == null)
{
throw new InvalidOperationException($"Could not delete connection: '{conn.ClientProvidedName}'");
}

await s_managementClient.CloseConnectionAsync(connectionToClose);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public async Task TestExchangeRecoveryTest()
[Fact]
public async Task TestExchangeToExchangeBindingRecovery()
{
await _channel.ConfirmSelectAsync();

string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName;

string ex_source = GenerateExchangeName();
Expand All @@ -70,7 +72,8 @@ public async Task TestExchangeToExchangeBindingRecovery()
{
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);
await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"));
await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"), mandatory: true);
await _channel.WaitForConfirmsOrDieAsync();
await AssertMessageCountAsync(q, 1);
}
finally
Expand Down
Loading

0 comments on commit 7bea841

Please sign in to comment.