Skip to content

Commit

Permalink
Merge pull request #1431 from rabbitmq/rabbitmq-dotnet-client-1429
Browse files Browse the repository at this point in the history
Fix #1429
  • Loading branch information
lukebakken committed Dec 1, 2023
2 parents e1ffbb5 + 21ecfb1 commit 55a33e2
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 46 deletions.
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static void ExchangeDeclare(this IChannel channel, string exchange, strin
public static ValueTask ExchangeDeclareAsync(this IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false,
IDictionary<string, object> arguments = null)
{
return channel.ExchangeDeclareAsync(exchange, type, durable, autoDelete, arguments);
return channel.ExchangeDeclareAsync(exchange, type, false, durable, autoDelete, arguments);
}

/// <summary>
Expand Down
38 changes: 23 additions & 15 deletions projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics.Tracing;
namespace RabbitMQ.Client
{
Expand All @@ -57,11 +57,13 @@ public class TimerBasedCredentialRefresherEventSource : EventSource
public void TriggeredTimer(string name) => WriteEvent(4, "TriggeredTimer", name);
[Event(5)]
public void RefreshedCredentials(string name, bool succesfully) => WriteEvent(5, "RefreshedCredentials", name, succesfully);
[Event(6)]
public void AlreadyRegistered(string name) => WriteEvent(6, "AlreadyRegistered", name);
}

public class TimerBasedCredentialRefresher : ICredentialsRefresher
{
private Dictionary<ICredentialsProvider, System.Timers.Timer> _registrations = new Dictionary<ICredentialsProvider, System.Timers.Timer>();
private readonly ConcurrentDictionary<ICredentialsProvider, System.Timers.Timer> _registrations = new();

public ICredentialsProvider Register(ICredentialsProvider provider, ICredentialsRefresher.NotifyCredentialRefreshed callback)
{
Expand All @@ -70,25 +72,31 @@ public ICredentialsProvider Register(ICredentialsProvider provider, ICredentials
return provider;
}

_registrations.Add(provider, scheduleTimer(provider, callback));
TimerBasedCredentialRefresherEventSource.Log.Registered(provider.Name);
if (_registrations.TryAdd(provider, scheduleTimer(provider, callback)))
{
TimerBasedCredentialRefresherEventSource.Log.Registered(provider.Name);
}
else
{
TimerBasedCredentialRefresherEventSource.Log.AlreadyRegistered(provider.Name);
}

return provider;
}

public bool Unregister(ICredentialsProvider provider)
{
if (!_registrations.ContainsKey(provider))
if (_registrations.TryRemove(provider, out System.Timers.Timer timer))
{
return false;
}

var timer = _registrations[provider];
if (timer != null)
{
TimerBasedCredentialRefresherEventSource.Log.Unregistered(provider.Name);
timer.Stop();
_registrations.Remove(provider);
timer.Dispose();
try
{
TimerBasedCredentialRefresherEventSource.Log.Unregistered(provider.Name);
timer.Stop();
}
finally
{
timer.Dispose();
}
return true;
}
else
Expand Down
76 changes: 46 additions & 30 deletions projects/Test/OAuth2/TestOAuth2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ public OAuth2Options(Mode mode)
public int TokenExpiresInSeconds => 60;
}

public class TestOAuth2
public class TestOAuth2 : IAsyncLifetime
{
private const string Exchange = "test_direct";

private readonly AutoResetEvent _doneEvent = new AutoResetEvent(false);
private readonly ITestOutputHelper _testOutputHelper;
private readonly IConnection _connection;
private readonly IConnectionFactory _connectionFactory;
private IConnection _connection;
private readonly int _tokenExpiresInSeconds;

public TestOAuth2(ITestOutputHelper testOutputHelper)
Expand All @@ -75,61 +76,76 @@ public TestOAuth2(ITestOutputHelper testOutputHelper)
Mode mode = (Mode)Enum.Parse(typeof(Mode), modeStr.ToLowerInvariant());
var options = new OAuth2Options(mode);

var connectionFactory = new ConnectionFactory
_connectionFactory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
DispatchConsumersAsync = true,
CredentialsProvider = GetCredentialsProvider(options),
CredentialsRefresher = GetCredentialsRefresher(),
ClientProvidedName = nameof(TestOAuth2)
};

_connection = connectionFactory.CreateConnection();
_tokenExpiresInSeconds = options.TokenExpiresInSeconds;
}

public async Task InitializeAsync()
{
_connection = await _connectionFactory.CreateConnectionAsync();
}

public async Task DisposeAsync()
{
await _connection.CloseAsync();
_connection.Dispose();
}

[Fact]
public async void IntegrationTest()
{
using (_connection)
using (IChannel publishChannel = await DeclarePublisherAsync())
using (IChannel consumeChannel = await DeclareConsumerAsync())
{
using (IChannel publisher = declarePublisher())
using (IChannel subscriber = await declareConsumer())
{
await Publish(publisher);
Consume(subscriber);
await PublishAsync(publishChannel);
Consume(consumeChannel);

if (_tokenExpiresInSeconds > 0)
if (_tokenExpiresInSeconds > 0)
{
for (int i = 0; i < 4; i++)
{
for (int i = 0; i < 4; i++)
{
_testOutputHelper.WriteLine("Wait until Token expires. Attempt #" + (i + 1));
_testOutputHelper.WriteLine("Wait until Token expires. Attempt #" + (i + 1));

await Task.Delay(TimeSpan.FromSeconds(_tokenExpiresInSeconds + 10));
_testOutputHelper.WriteLine("Resuming ..");
await Task.Delay(TimeSpan.FromSeconds(_tokenExpiresInSeconds + 10));
_testOutputHelper.WriteLine("Resuming ..");

await Publish(publisher);
_doneEvent.Reset();
await PublishAsync(publishChannel);
_doneEvent.Reset();

Consume(subscriber);
}
}
else
{
throw new InvalidOperationException();
Consume(consumeChannel);
}
}
else
{
Assert.Fail("_tokenExpiresInSeconds is NOT greater than 0");
}
}
}

private IChannel declarePublisher()
[Fact]
public async void SecondConnectionCrashes_GH1429()
{
// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1429
using IConnection secondConnection = await _connectionFactory.CreateConnectionAsync();
}

private async Task<IChannel> DeclarePublisherAsync()
{
IChannel publisher = _connection.CreateChannel();
publisher.ConfirmSelect();
publisher.ExchangeDeclare("test_direct", ExchangeType.Direct, true, false);
IChannel publisher = await _connection.CreateChannelAsync();
await publisher.ConfirmSelectAsync();
await publisher.ExchangeDeclareAsync("test_direct", ExchangeType.Direct, true, false);
return publisher;
}

private async Task Publish(IChannel publisher)
private async Task PublishAsync(IChannel publisher)
{
const string message = "Hello World!";

Expand All @@ -146,7 +162,7 @@ private async Task Publish(IChannel publisher)
_testOutputHelper.WriteLine("Confirmed Sent message");
}

private async ValueTask<IChannel> declareConsumer()
private async ValueTask<IChannel> DeclareConsumerAsync()
{
IChannel subscriber = _connection.CreateChannel();
await subscriber.QueueDeclareAsync(queue: "testqueue", passive: false, true, false, false, arguments: null);
Expand Down
2 changes: 2 additions & 0 deletions projects/Test/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,8 @@ namespace RabbitMQ.Client
{
public TimerBasedCredentialRefresherEventSource() { }
public static RabbitMQ.Client.TimerBasedCredentialRefresherEventSource Log { get; }
[System.Diagnostics.Tracing.Event(6)]
public void AlreadyRegistered(string name) { }
[System.Diagnostics.Tracing.Event(5)]
public void RefreshedCredentials(string name, bool succesfully) { }
[System.Diagnostics.Tracing.Event(1)]
Expand Down

0 comments on commit 55a33e2

Please sign in to comment.