From 1823cc85109db734437743cf6708f125c18cb5d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 18 Sep 2025 16:03:00 +0200 Subject: [PATCH] Use lock in channel manager Instead of synchronized blocks. --- .../rabbitmq/client/impl/ChannelManager.java | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelManager.java b/src/main/java/com/rabbitmq/client/impl/ChannelManager.java index 7dc3811cd..c92d74726 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelManager.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelManager.java @@ -31,6 +31,8 @@ import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Manages a set of channels, indexed by channel number (1.._channelMax). @@ -40,8 +42,8 @@ public class ChannelManager { private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class); private final AtomicBoolean closed = new AtomicBoolean(false); - /** Monitor for _channelMap and channelNumberAllocator */ - private final Object monitor = new Object(); + /** Lock for _channelMap and channelNumberAllocator */ + private final Lock lock = new ReentrantLock(); /** Mapping from 1.._channelMax to {@link ChannelN} instance */ private final Map _channelMap = new HashMap<>(); private final IntAllocator channelNumberAllocator; @@ -97,10 +99,13 @@ public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFac * @throws UnknownChannelException if there is no channel with number channelNumber on this connection */ public ChannelN getChannel(int channelNumber) { - synchronized (this.monitor) { + lock.lock(); + try { ChannelN ch = _channelMap.get(channelNumber); if(ch == null) throw new UnknownChannelException(channelNumber); return ch; + } finally { + lock.unlock(); } } @@ -111,8 +116,11 @@ public ChannelN getChannel(int channelNumber) { public void handleSignal(final ShutdownSignalException signal) { if (this.closed.compareAndSet(false, true)) { Set channels; - synchronized(this.monitor) { + lock.lock(); + try { channels = new HashSet<>(_channelMap.values()); + } finally { + lock.unlock(); } Set shutdownSet = new HashSet<>(); for (final ChannelN channel : channels) { @@ -171,13 +179,16 @@ private void scheduleShutdownProcessing(Set shutdownSet) { public ChannelN createChannel(AMQConnection connection) throws IOException { ChannelN ch; - synchronized (this.monitor) { + lock.lock(); + try { int channelNumber = channelNumberAllocator.allocate(); if (channelNumber == -1) { return null; } else { ch = addNewChannel(connection, channelNumber); } + } finally { + lock.unlock(); } ch.open(); // now that it's been safely added return ch; @@ -185,12 +196,15 @@ public ChannelN createChannel(AMQConnection connection) throws IOException { public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException { ChannelN ch; - synchronized (this.monitor) { + lock.lock(); + try { if (channelNumberAllocator.reserve(channelNumber)) { ch = addNewChannel(connection, channelNumber); } else { return null; } + } finally { + lock.unlock(); } ch.open(); // now that it's been safely added return ch; @@ -233,7 +247,8 @@ public void releaseChannelNumber(ChannelN channel) { // a way as to cause disconnectChannel on the old channel to try to // remove the new one. Ideally we would fix this race at the source, // but it's much easier to just catch it here. - synchronized (this.monitor) { + lock.lock(); + try { int channelNumber = channel.getChannelNumber(); ChannelN existing = _channelMap.remove(channelNumber); // Nothing to do here. Move along. @@ -246,13 +261,11 @@ else if (existing != channel) { return; } channelNumberAllocator.free(channelNumber); + } finally { + lock.unlock(); } } - public ExecutorService getShutdownExecutor() { - return shutdownExecutor; - } - public void setShutdownExecutor(ExecutorService shutdownExecutor) { this.shutdownExecutor = shutdownExecutor; }