Skip to content

Commit

Permalink
Merge branch 'lukebakken/use-concurrent-dict' into lukebakken/fix-bas…
Browse files Browse the repository at this point in the history
…ic-roundtrip-tests
  • Loading branch information
lukebakken committed May 24, 2024
2 parents 9dbb052 + e132c9b commit b13957a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace RabbitMQ
#if NETSTANDARD
internal static class DictionaryExtension
{
public static bool Remove<TKey, TValue>(this Dictionary<TKey, TValue> dictionary, TKey key, out TValue value)
public static bool Remove<TKey, TValue>(this IDictionary<TKey, TValue> dictionary, TKey key, out TValue value)
{
return dictionary.TryGetValue(key, out value) && dictionary.Remove(key);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

Expand All @@ -8,7 +10,7 @@ namespace RabbitMQ.Client.ConsumerDispatching
internal abstract class ConsumerDispatcherBase
{
private static readonly FallbackConsumer s_fallbackConsumer = new FallbackConsumer();
private readonly Dictionary<string, IBasicConsumer> _consumers = new Dictionary<string, IBasicConsumer>();
private readonly IDictionary<string, IBasicConsumer> _consumers = new ConcurrentDictionary<string, IBasicConsumer>();

public IBasicConsumer? DefaultConsumer { get; set; }

Expand All @@ -18,26 +20,17 @@ protected ConsumerDispatcherBase()

protected void AddConsumer(IBasicConsumer consumer, string tag)
{
lock (_consumers)
{
_consumers[tag] = consumer;
}
_consumers[tag] = consumer;
}

protected IBasicConsumer GetConsumerOrDefault(string tag)
{
lock (_consumers)
{
return _consumers.TryGetValue(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer();
}
return _consumers.TryGetValue(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer();
}

public IBasicConsumer GetAndRemoveConsumer(string tag)
{
lock (_consumers)
{
return _consumers.Remove(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer();
}
return _consumers.Remove(tag, out IBasicConsumer? consumer) ? consumer : GetDefaultOrFallbackConsumer();
}

public void Shutdown(ShutdownEventArgs reason)
Expand All @@ -54,14 +47,11 @@ public Task ShutdownAsync(ShutdownEventArgs reason)

private void DoShutdownConsumers(ShutdownEventArgs reason)
{
lock (_consumers)
foreach (KeyValuePair<string, IBasicConsumer> pair in _consumers.ToArray())
{
foreach (KeyValuePair<string, IBasicConsumer> pair in _consumers)
{
ShutdownConsumer(pair.Value, reason);
}
_consumers.Clear();
ShutdownConsumer(pair.Value, reason);
}
_consumers.Clear();
}

protected abstract void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason);
Expand Down

0 comments on commit b13957a

Please sign in to comment.