Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions src/main/java/com/rabbitmq/client/impl/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Manages a set of channels, indexed by channel number (<code><b>1.._channelMax</b></code>).
Expand All @@ -40,8 +42,8 @@ public class ChannelManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);

private final AtomicBoolean closed = new AtomicBoolean(false);
/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
private final Object monitor = new Object();
/** Lock for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
private final Lock lock = new ReentrantLock();
/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
private final Map<Integer, ChannelN> _channelMap = new HashMap<>();
private final IntAllocator channelNumberAllocator;
Expand Down Expand Up @@ -97,10 +99,13 @@ public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFac
* @throws UnknownChannelException if there is no channel with number <code><b>channelNumber</b></code> on this connection
*/
public ChannelN getChannel(int channelNumber) {
synchronized (this.monitor) {
lock.lock();
try {
ChannelN ch = _channelMap.get(channelNumber);
if(ch == null) throw new UnknownChannelException(channelNumber);
return ch;
} finally {
lock.unlock();
}
}

Expand All @@ -111,8 +116,11 @@ public ChannelN getChannel(int channelNumber) {
public void handleSignal(final ShutdownSignalException signal) {
if (this.closed.compareAndSet(false, true)) {
Set<ChannelN> channels;
synchronized(this.monitor) {
lock.lock();
try {
channels = new HashSet<>(_channelMap.values());
} finally {
lock.unlock();
}
Set<CountDownLatch> shutdownSet = new HashSet<>();
for (final ChannelN channel : channels) {
Expand Down Expand Up @@ -171,26 +179,32 @@ private void scheduleShutdownProcessing(Set<CountDownLatch> shutdownSet) {

public ChannelN createChannel(AMQConnection connection) throws IOException {
ChannelN ch;
synchronized (this.monitor) {
lock.lock();
try {
int channelNumber = channelNumberAllocator.allocate();
if (channelNumber == -1) {
return null;
} else {
ch = addNewChannel(connection, channelNumber);
}
} finally {
lock.unlock();
}
ch.open(); // now that it's been safely added
return ch;
}

public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
ChannelN ch;
synchronized (this.monitor) {
lock.lock();
try {
if (channelNumberAllocator.reserve(channelNumber)) {
ch = addNewChannel(connection, channelNumber);
} else {
return null;
}
} finally {
lock.unlock();
}
ch.open(); // now that it's been safely added
return ch;
Expand Down Expand Up @@ -233,7 +247,8 @@ public void releaseChannelNumber(ChannelN channel) {
// a way as to cause disconnectChannel on the old channel to try to
// remove the new one. Ideally we would fix this race at the source,
// but it's much easier to just catch it here.
synchronized (this.monitor) {
lock.lock();
try {
int channelNumber = channel.getChannelNumber();
ChannelN existing = _channelMap.remove(channelNumber);
// Nothing to do here. Move along.
Expand All @@ -246,13 +261,11 @@ else if (existing != channel) {
return;
}
channelNumberAllocator.free(channelNumber);
} finally {
lock.unlock();
}
}

public ExecutorService getShutdownExecutor() {
return shutdownExecutor;
}

public void setShutdownExecutor(ExecutorService shutdownExecutor) {
this.shutdownExecutor = shutdownExecutor;
}
Expand Down