diff --git a/src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java b/src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java
new file mode 100644
index 0000000000..d5bca1eeba
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java
@@ -0,0 +1,63 @@
+package com.rabbitmq.client;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Exception thrown when a channel times out on a continuation during a RPC call.
+ * @since 4.1.0
+ */
+public class ChannelContinuationTimeoutException extends IOException {
+
+ /**
+ * The channel that performed the call.
+ * Typed as Object as the underlying
+ * object that performs the call might
+ * not be an implementation of {@link Channel}.
+ */
+ private final Object channel;
+
+ /**
+ * The number of the channel that performed the call.
+ */
+ private final int channelNumber;
+
+ /**
+ * The request method that timed out.
+ */
+ private final Method method;
+
+ public ChannelContinuationTimeoutException(TimeoutException cause, Object channel, int channelNumber, Method method) {
+ super(
+ "Continuation call for method " + method + " on channel " + channel + " (#" + channelNumber + ") timed out",
+ cause
+ );
+ this.channel = channel;
+ this.channelNumber = channelNumber;
+ this.method = method;
+ }
+
+ /**
+ *
+ * @return request method that timed out
+ */
+ public Method getMethod() {
+ return method;
+ }
+
+ /**
+ * channel that performed the call
+ * @return
+ */
+ public Object getChannel() {
+ return channel;
+ }
+
+ /**
+ *
+ * @return number of the channel that performed the call
+ */
+ public int getChannelNumber() {
+ return channelNumber;
+ }
+}
diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java
index 0788d75714..25fa5a8fdc 100644
--- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java
+++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java
@@ -33,6 +33,8 @@
import java.util.*;
import java.util.concurrent.*;
+import static java.util.concurrent.TimeUnit.*;
+
/**
* Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker.
*/
@@ -73,6 +75,9 @@ public class ConnectionFactory implements Cloneable {
* zero means wait indefinitely */
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
+ /** The default continuation timeout for RPC calls in channels: 10 minutes */
+ public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = (int) MINUTES.toMillis(10);
+
private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";
private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
@@ -116,6 +121,12 @@ public class ConnectionFactory implements Cloneable {
private SSLContext sslContext;
+ /**
+ * Continuation timeout on RPC calls.
+ * @since 4.1.0
+ */
+ private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT;
+
/** @return the default host to use for connections */
public String getHost() {
return host;
@@ -937,6 +948,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setRequestedHeartbeat(requestedHeartbeat);
result.setShutdownExecutor(shutdownExecutor);
result.setHeartbeatExecutor(heartbeatExecutor);
+ result.setChannelRpcTimeout(channelRpcTimeout);
return result;
}
@@ -1079,4 +1091,24 @@ public void useNio() {
public void useBlockingIo() {
this.nio = false;
}
+
+ /**
+ * Set the continuation timeout for RPC calls in channels.
+ * Default is 10 minutes. 0 means no timeout.
+ * @param channelRpcTimeout
+ */
+ public void setChannelRpcTimeout(int channelRpcTimeout) {
+ if(channelRpcTimeout < 0) {
+ throw new IllegalArgumentException("Timeout cannot be less than 0");
+ }
+ this.channelRpcTimeout = channelRpcTimeout;
+ }
+
+ /**
+ * Get the timeout for RPC calls in channels.
+ * @return
+ */
+ public int getChannelRpcTimeout() {
+ return channelRpcTimeout;
+ }
}
diff --git a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java
index c568214d66..6726f728f8 100644
--- a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java
+++ b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java
@@ -19,12 +19,11 @@
import java.io.IOException;
import java.util.concurrent.TimeoutException;
-import com.rabbitmq.client.AlreadyClosedException;
-import com.rabbitmq.client.Command;
+import com.rabbitmq.client.*;
import com.rabbitmq.client.Method;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.BlockingValueOrException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Base class modelling an AMQ channel. Subclasses implement
@@ -37,6 +36,11 @@
* @see Connection
*/
public abstract class AMQChannel extends ShutdownNotifierComponent {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);
+
+ private static final int NO_RPC_TIMEOUT = 0;
+
/**
* Protected; used instead of synchronizing on the channel itself,
* so that clients can themselves use the channel to synchronize
@@ -59,6 +63,9 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;
+ /** Timeout for RPC calls */
+ private final int _rpcTimeout;
+
/**
* Construct a channel on the given connection, with the given channel number.
* @param connection the underlying connection for this channel
@@ -67,6 +74,10 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
public AMQChannel(AMQConnection connection, int channelNumber) {
this._connection = connection;
this._channelNumber = channelNumber;
+ if(connection.getChannelRpcTimeout() < 0) {
+ throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
+ }
+ this._rpcTimeout = connection.getChannelRpcTimeout();
}
/**
@@ -225,8 +236,23 @@ private AMQCommand privateRpc(Method m)
//
// Calling getReply() on the continuation puts us to sleep
// until the connection's reader-thread throws the reply over
- // the fence.
- return k.getReply();
+ // the fence or the RPC times out (if enabled)
+ if(_rpcTimeout == NO_RPC_TIMEOUT) {
+ return k.getReply();
+ } else {
+ try {
+ return k.getReply(_rpcTimeout);
+ } catch (TimeoutException e) {
+ try {
+ // clean RPC channel state
+ nextOutstandingRpc();
+ markRpcFinished();
+ } catch(Exception ex) {
+ LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
+ }
+ throw new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
+ }
+ }
}
private AMQCommand privateRpc(Method m, int timeout)
diff --git a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java
index aeb5e35884..7593fe22ec 100644
--- a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java
+++ b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java
@@ -94,11 +94,7 @@ public static Map defaultClientProperties() {
new Version(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR);
/** The special channel 0 (not managed by the _channelManager) */
- private final AMQChannel _channel0 = new AMQChannel(this, 0) {
- @Override public boolean processAsync(Command c) throws IOException {
- return getConnection().processControlCommand(c);
- }
- };
+ private final AMQChannel _channel0;
protected ConsumerWorkService _workService = null;
@@ -137,6 +133,7 @@ public static Map defaultClientProperties() {
private final String password;
private final Collection blockedListeners = new CopyOnWriteArrayList();
protected final MetricsCollector metricsCollector;
+ private final int channelRpcTimeout;
/* State modified after start - all volatile */
@@ -228,6 +225,16 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
this.heartbeatExecutor = params.getHeartbeatExecutor();
this.shutdownExecutor = params.getShutdownExecutor();
this.threadFactory = params.getThreadFactory();
+ if(params.getChannelRpcTimeout() < 0) {
+ throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
+ }
+ this.channelRpcTimeout = params.getChannelRpcTimeout();
+
+ this._channel0 = new AMQChannel(this, 0) {
+ @Override public boolean processAsync(Command c) throws IOException {
+ return getConnection().processControlCommand(c);
+ }
+ };
this._channelManager = null;
@@ -1045,4 +1052,8 @@ public String getId() {
public void setId(String id) {
this.id = id;
}
+
+ public int getChannelRpcTimeout() {
+ return channelRpcTimeout;
+ }
}
diff --git a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
index d13e6facc9..72c9004544 100644
--- a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
+++ b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
@@ -39,6 +39,7 @@ public class ConnectionParams {
private SaslConfig saslConfig;
private long networkRecoveryInterval;
private boolean topologyRecovery;
+ private int channelRpcTimeout;
private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;
@@ -109,6 +110,10 @@ public ThreadFactory getThreadFactory() {
return threadFactory;
}
+ public int getChannelRpcTimeout() {
+ return channelRpcTimeout;
+ }
+
public void setUsername(String username) {
this.username = username;
}
@@ -180,4 +185,8 @@ public ScheduledExecutorService getHeartbeatExecutor() {
public void setHeartbeatExecutor(ScheduledExecutorService heartbeatExecutor) {
this.heartbeatExecutor = heartbeatExecutor;
}
+
+ public void setChannelRpcTimeout(int channelRpcTimeout) {
+ this.channelRpcTimeout = channelRpcTimeout;
+ }
}
diff --git a/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java b/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java
new file mode 100644
index 0000000000..460ae3e93c
--- /dev/null
+++ b/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java
@@ -0,0 +1,121 @@
+// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.client.test;
+
+import com.rabbitmq.client.ChannelContinuationTimeoutException;
+import com.rabbitmq.client.Command;
+import com.rabbitmq.client.Method;
+import com.rabbitmq.client.impl.AMQChannel;
+import com.rabbitmq.client.impl.AMQCommand;
+import com.rabbitmq.client.impl.AMQConnection;
+import com.rabbitmq.client.impl.AMQImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AMQChannelTest {
+
+ ScheduledExecutorService scheduler;
+
+ @Before public void init() {
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @After public void tearDown() {
+ scheduler.shutdownNow();
+ }
+
+ @Test public void rpcTimesOutWhenResponseDoesNotCome() throws IOException {
+ int rpcTimeout = 100;
+ AMQConnection connection = mock(AMQConnection.class);
+ when(connection.getChannelRpcTimeout()).thenReturn(rpcTimeout);
+
+ DummyAmqChannel channel = new DummyAmqChannel(connection, 1);
+ Method method = new AMQImpl.Queue.Declare.Builder()
+ .queue("")
+ .durable(false)
+ .exclusive(true)
+ .autoDelete(true)
+ .arguments(null)
+ .build();
+
+ try {
+ channel.rpc(method);
+ fail("Should time out and throw an exception");
+ } catch(ChannelContinuationTimeoutException e) {
+ // OK
+ assertThat((DummyAmqChannel) e.getChannel(), is(channel));
+ assertThat(e.getChannelNumber(), is(channel.getChannelNumber()));
+ assertThat(e.getMethod(), is(method));
+ assertNull("outstanding RPC should have been cleaned", channel.nextOutstandingRpc());
+ }
+ }
+
+ @Test public void rpcReturnsResultWhenResponseHasCome() throws IOException {
+ int rpcTimeout = 1000;
+ AMQConnection connection = mock(AMQConnection.class);
+ when(connection.getChannelRpcTimeout()).thenReturn(rpcTimeout);
+
+ final DummyAmqChannel channel = new DummyAmqChannel(connection, 1);
+ Method method = new AMQImpl.Queue.Declare.Builder()
+ .queue("")
+ .durable(false)
+ .exclusive(true)
+ .autoDelete(true)
+ .arguments(null)
+ .build();
+
+ final Method response = new AMQImpl.Queue.DeclareOk.Builder()
+ .queue("whatever")
+ .consumerCount(0)
+ .messageCount(0).build();
+
+ scheduler.schedule(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ channel.handleCompleteInboundCommand(new AMQCommand(response));
+ return null;
+ }
+ }, (long) (rpcTimeout / 2.0), TimeUnit.MILLISECONDS);
+
+ AMQCommand rpcResponse = channel.rpc(method);
+ assertThat(rpcResponse.getMethod(), is(response));
+ }
+
+ static class DummyAmqChannel extends AMQChannel {
+
+ public DummyAmqChannel(AMQConnection connection, int channelNumber) {
+ super(connection, channelNumber);
+ }
+
+ @Override
+ public boolean processAsync(Command command) throws IOException {
+ return false;
+ }
+ }
+
+}
diff --git a/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java b/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java
index 8d5a0d479d..ae8bc1e258 100644
--- a/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java
+++ b/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java
@@ -99,6 +99,16 @@ public class AMQConnectionTest {
cf.setHandshakeTimeout(7000);
}
+ @Test public void negativeRpcTimeoutIsForbidden() {
+ ConnectionFactory cf = TestUtils.connectionFactory();
+ try {
+ cf.setChannelRpcTimeout(-10);
+ fail("expected an exception");
+ } catch (IllegalArgumentException _ignored) {
+ // expected
+ }
+ }
+
/** Check the AMQConnection does send exactly 1 initial header, and deal correctly with
* the frame handler throwing an exception when we try to read data
*/
diff --git a/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java b/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java
new file mode 100644
index 0000000000..dd5b8c0c6f
--- /dev/null
+++ b/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java
@@ -0,0 +1,138 @@
+// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.client.test;
+
+import com.rabbitmq.client.*;
+import com.rabbitmq.client.impl.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+public class ChannelRpcTimeoutIntegrationTest {
+
+ static long waitTimeOnSomeResponses = 1000L;
+
+ ConnectionFactory factory;
+
+ @Before
+ public void setUp() throws Exception {
+ factory = TestUtils.connectionFactory();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ factory = null;
+ }
+
+ @Test public void channelWaitsWhenNoTimeoutSet() throws IOException, TimeoutException {
+ FrameHandler frameHandler = createFrameHandler();
+ ConnectionParams params = factory.params(Executors.newFixedThreadPool(1));
+ WaitingAmqConnection connection = new WaitingAmqConnection(params, frameHandler);
+ try {
+ connection.start();
+ Channel channel = connection.createChannel();
+ channel.queueDeclare();
+ } finally {
+ connection.close();
+ }
+
+ }
+
+ @Test public void channelThrowsExceptionWhenTimeoutIsSet() throws IOException, TimeoutException {
+ FrameHandler frameHandler = createFrameHandler();
+ ConnectionParams params = factory.params(Executors.newFixedThreadPool(1));
+ params.setChannelRpcTimeout((int) (waitTimeOnSomeResponses / 5.0));
+ WaitingAmqConnection connection = new WaitingAmqConnection(params, frameHandler);
+ try {
+ connection.start();
+ Channel channel = connection.createChannel();
+ try {
+ channel.queueDeclare();
+ fail("Should time out and throw an exception");
+ } catch(ChannelContinuationTimeoutException e) {
+ // OK
+ assertThat((Channel) e.getChannel(), is(channel));
+ assertThat(e.getChannelNumber(), is(channel.getChannelNumber()));
+ assertThat(e.getMethod(), instanceOf(AMQP.Queue.Declare.class));
+ }
+ } finally {
+ connection.close();
+ }
+ }
+
+ private FrameHandler createFrameHandler() throws IOException {
+ SocketFrameHandlerFactory socketFrameHandlerFactory = new SocketFrameHandlerFactory(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT,
+ SocketFactory.getDefault(), new DefaultSocketConfigurator(), false, null);
+ return socketFrameHandlerFactory.create(new Address("localhost"));
+ }
+
+ static class WaitingChannel extends ChannelN {
+
+ public WaitingChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
+ super(connection, channelNumber, workService);
+ }
+
+ @Override
+ public void handleCompleteInboundCommand(AMQCommand command) throws IOException {
+ if(command.getMethod() instanceof AMQImpl.Queue.DeclareOk) {
+ try {
+ Thread.sleep(waitTimeOnSomeResponses);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ super.handleCompleteInboundCommand(command);
+ }
+ }
+
+ static class WaitingChannelManager extends ChannelManager {
+
+ public WaitingChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
+ super(workService, channelMax, threadFactory);
+ }
+
+ @Override
+ protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
+ return new WaitingChannel(connection, channelNumber, workService);
+ }
+ }
+
+ static class WaitingAmqConnection extends AMQConnection {
+
+ public WaitingAmqConnection(ConnectionParams params, FrameHandler frameHandler) {
+ super(params, frameHandler);
+ }
+
+ @Override
+ protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
+ WaitingChannelManager channelManager = new WaitingChannelManager(_workService, channelMax, threadFactory);
+ configureChannelManager(channelManager);
+ return channelManager;
+ }
+ }
+
+}
diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java
index 3c1371669a..34ec68de85 100644
--- a/src/test/java/com/rabbitmq/client/test/ClientTests.java
+++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java
@@ -27,6 +27,8 @@
BlockingCellTest.class,
TruncatedInputStreamTest.class,
AMQConnectionTest.class,
+ AMQChannelTest.class,
+ ChannelRpcTimeoutIntegrationTest.class,
ValueOrExceptionTest.class,
BrokenFramesTest.class,
ClonePropertiesTest.class,