Skip to content

Commit

Permalink
AMQP-459: Add Channel Limit Option to CCF
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-459

Add configuration such that the `channelCacheSize` can be
considered a limit of how many channels can be created and
a timeout when requesting a channel.

Polishing
  • Loading branch information
garyrussell authored and artembilan committed Jan 5, 2015
1 parent a50c47b commit 1718335
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 13 deletions.
@@ -0,0 +1,43 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp;

/**
* Exception thrown when some time-bound operation fails to execute in the
* desired time.
*
* @author Gary Russell
* @since 1.4.2
*
*/
public class AmqpTimeoutException extends AmqpException {

private static final long serialVersionUID = -1981629885617675621L;

public AmqpTimeoutException(String message, Throwable cause) {
super(message, cause);
}

public AmqpTimeoutException(String message) {
super(message);
}

public AmqpTimeoutException(Throwable cause) {
super(cause);
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -28,9 +28,12 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.rabbit.support.PublisherCallbackChannel;
import org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl;
import org.springframework.beans.factory.InitializingBean;
Expand Down Expand Up @@ -82,8 +85,6 @@ public enum CacheMode {
CONNECTION
}

private volatile CacheMode cacheMode = CacheMode.CHANNEL;

private final Set<ChannelCachingConnectionProxy> openConnections = new HashSet<ChannelCachingConnectionProxy>();

private final Map<ChannelCachingConnectionProxy, LinkedList<ChannelProxy>>
Expand All @@ -94,6 +95,12 @@ public enum CacheMode {

private final BlockingQueue<ChannelCachingConnectionProxy> idleConnections = new LinkedBlockingQueue<ChannelCachingConnectionProxy>();

private final Map<Connection, Semaphore> checkoutPermits = new HashMap<Connection, Semaphore>();

private volatile long channelCheckoutTimeout = 0;

private volatile CacheMode cacheMode = CacheMode.CHANNEL;

private volatile int channelCacheSize = 1;

private volatile int connectionCacheSize = 1;
Expand Down Expand Up @@ -130,7 +137,6 @@ public CachingConnectionFactory() {
/**
* Create a new CachingConnectionFactory given a host name
* and port.
*
* @param hostname the host name to connect to
* @param port the port number
*/
Expand All @@ -146,7 +152,6 @@ public CachingConnectionFactory(String hostname, int port) {
/**
* Create a new CachingConnectionFactory given a port on the hostname returned from
* InetAddress.getLocalHost(), or "localhost" if getLocalHost() throws an exception.
*
* @param port the port number
*/
public CachingConnectionFactory(int port) {
Expand All @@ -155,7 +160,6 @@ public CachingConnectionFactory(int port) {

/**
* Create a new CachingConnectionFactory given a host name.
*
* @param hostname the host name to connect to
*/
public CachingConnectionFactory(String hostname) {
Expand All @@ -164,13 +168,19 @@ public CachingConnectionFactory(String hostname) {

/**
* Create a new CachingConnectionFactory for the given target ConnectionFactory.
*
* @param rabbitConnectionFactory the target ConnectionFactory
*/
public CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory) {
super(rabbitConnectionFactory);
}

/**
* The number of channels to maintain in the cache. By default, channels are allocated on
* demand (unbounded) and this represents the maximum cache size. To limit the available
* channels, see {@link #setChannelCheckoutTimeout(long)}.
* @param sessionCacheSize the channel cache size.
* @see #setChannelCheckoutTimeout(long)
*/
public void setChannelCacheSize(int sessionCacheSize) {
Assert.isTrue(sessionCacheSize >= 1, "Channel cache size must be 1 or higher");
this.channelCacheSize = sessionCacheSize;
Expand All @@ -190,8 +200,13 @@ public void setCacheMode(CacheMode cacheMode) {
this.cacheMode = cacheMode;
}

@Deprecated
public int getConnectionCachesize() {
return connectionCacheSize;
return getConnectionCacheSize();
}

public int getConnectionCacheSize() {
return this.connectionCacheSize;
}

public void setConnectionCacheSize(int connectionCacheSize) {
Expand All @@ -200,11 +215,11 @@ public void setConnectionCacheSize(int connectionCacheSize) {
}

public boolean isPublisherConfirms() {
return publisherConfirms;
return this.publisherConfirms;
}

public boolean isPublisherReturns() {
return publisherReturns;
return this.publisherReturns;
}

public void setPublisherReturns(boolean publisherReturns) {
Expand All @@ -215,11 +230,25 @@ public void setPublisherConfirms(boolean publisherConfirms) {
this.publisherConfirms = publisherConfirms;
}

/**
* Sets the channel checkout timeout. When greater than 0, enables channel limiting
* in that the {@link #channelCacheSize} becomes the total number of available channels per
* connection rather than a simple cache size. Note that changing the {@link #channelCacheSize}
* does not affect the limit on existing connection(s), invoke {@link #destroy()} to cause a
* new connection to be created with the new limit.
* @param channelCheckoutTimeout the timeout in milliseconds; default 0 (channel limiting not enabled).
* @since 1.4.2
*/
public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {
this.channelCheckoutTimeout = channelCheckoutTimeout;
}

@Override
public void afterPropertiesSet() throws Exception {
this.initialized = true;
if (this.cacheMode == CacheMode.CHANNEL) {
Assert.isTrue(this.connectionCacheSize == 1, "When the cache mode is 'CHANNEL', the connection cache size cannot be configured.");
Assert.isTrue(this.connectionCacheSize == 1,
"When the cache mode is 'CHANNEL', the connection cache size cannot be configured.");
}
}

Expand Down Expand Up @@ -254,6 +283,20 @@ else if (!RabbitUtils.isNormalChannelClose(cause)) {
}

private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
if (this.channelCheckoutTimeout > 0) {
Semaphore checkoutPermits = this.checkoutPermits.get(connection);
if (checkoutPermits != null) {
try {
if (!checkoutPermits.tryAcquire(this.channelCheckoutTimeout, TimeUnit.MILLISECONDS)) {
throw new AmqpTimeoutException("No available channels");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AmqpTimeoutException("Interrupted while acquiring a channel", e);
}
}
}
LinkedList<ChannelProxy> channelList;
if (this.cacheMode == CacheMode.CHANNEL) {
channelList = transactional ? this.cachedChannelsTransactional
Expand Down Expand Up @@ -338,6 +381,7 @@ private Channel createBareChannel(ChannelCachingConnectionProxy connection, bool
synchronized (this.connectionMonitor) {
if (this.connection != null && !this.connection.isOpen()) {
this.connection.notifyCloseIfNecessary();
this.checkoutPermits.remove(this.connection);
}
if (this.connection == null || !this.connection.isOpen()) {
this.connection = null;
Expand Down Expand Up @@ -397,6 +441,7 @@ public final Connection createConnection() throws AmqpException {
this.connection = new ChannelCachingConnectionProxy(super.createBareConnection());
// invoke the listener *after* this.connection is assigned
getConnectionListener().onCreate(connection);
this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
}
return this.connection;
}
Expand All @@ -413,6 +458,7 @@ else if (this.cacheMode == CacheMode.CONNECTION) {
this.openConnections.remove(connection);
this.openConnectionNonTransactionalChannels.remove(connection);
this.openConnectionTransactionalChannels.remove(connection);
this.checkoutPermits.remove(connection);
connection = null;
}
}
Expand All @@ -426,6 +472,7 @@ else if (this.cacheMode == CacheMode.CONNECTION) {
this.openConnections.add(connection);
this.openConnectionNonTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());
this.openConnectionTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());
this.checkoutPermits.put(connection, new Semaphore(this.channelCacheSize));
}
else {
if (logger.isDebugEnabled()) {
Expand All @@ -449,10 +496,12 @@ public final void destroy() {
synchronized (this.connectionMonitor) {
if (connection != null) {
this.connection.destroy();
this.checkoutPermits.remove(this.connection);
this.connection = null;
}
for (ChannelCachingConnectionProxy connection : this.openConnections) {
connection.destroy();
this.checkoutPermits.remove(connection);
}
this.openConnections.clear();
this.idleConnections.clear();
Expand Down Expand Up @@ -482,7 +531,8 @@ protected void reset(List<ChannelProxy> channels, List<ChannelProxy> txChannels)
for (ChannelProxy channel : txChannels) {
try {
channel.close();
} catch (Throwable ex) {
}
catch (Throwable ex) {
logger.trace("Could not close cached Rabbit Channel", ex);
}
}
Expand Down Expand Up @@ -545,13 +595,15 @@ else if (methodName.equals("close")) {
if (!RabbitUtils.isPhysicalCloseRequired() && this.channelList.size() < getChannelCacheSize()) {
logicalClose((ChannelProxy) proxy);
// Remain open in the channel list.
releasePermit();
return null;
}
}
}

// If we get here, we're supposed to shut down.
physicalClose();
releasePermit();
return null;
}
else if (methodName.equals("getTargetChannel")) {
Expand Down Expand Up @@ -590,9 +642,17 @@ else if (methodName.equals("isOpen")) {
}
}

private void releasePermit() {
if (CachingConnectionFactory.this.channelCheckoutTimeout > 0) {
Semaphore checkoutPermits = CachingConnectionFactory.this.checkoutPermits.get(this.theConnection);
if (checkoutPermits != null) {
checkoutPermits.release();
}
}
}

/**
* GUARDED by channelList
*
* @param proxy the channel to close
*/
private void logicalClose(ChannelProxy proxy) throws Exception {
Expand Down
Expand Up @@ -15,6 +15,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -43,6 +44,7 @@
import org.junit.rules.ExpectedException;

import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
import org.springframework.amqp.rabbit.core.ChannelCallback;
Expand Down Expand Up @@ -119,6 +121,41 @@ public void testCachedConnections() {
}
}

@Test
public void testCachedConnectionsChannelLimit() throws Exception {
connectionFactory.setCacheMode(CacheMode.CONNECTION);
connectionFactory.setConnectionCacheSize(2);
connectionFactory.setChannelCheckoutTimeout(10);
connectionFactory.setExecutor(Executors.newCachedThreadPool());
List<Connection> connections = new ArrayList<Connection>();
connections.add(connectionFactory.createConnection());
connections.add(connectionFactory.createConnection());
List<Channel> channels = new ArrayList<Channel>();
channels.add(connections.get(0).createChannel(false));
try {
channels.add(connections.get(0).createChannel(false));
fail("Exception expected");
}
catch (AmqpTimeoutException e) {}
channels.add(connections.get(1).createChannel(false));
try {
channels.add(connections.get(1).createChannel(false));
fail("Exception expected");
}
catch (AmqpTimeoutException e) {}
channels.get(0).close();
channels.get(1).close();
channels.add(connections.get(0).createChannel(false));
channels.add(connections.get(1).createChannel(false));
assertSame(channels.get(0), channels.get(2));
assertSame(channels.get(1), channels.get(3));
channels.get(2).close();
channels.get(3).close();
for (Connection connection : connections) {
connection.close();
}
}

@Test
public void testCachedConnectionsAndChannels() throws Exception {
connectionFactory.setCacheMode(CacheMode.CONNECTION);
Expand Down

0 comments on commit 1718335

Please sign in to comment.