From d975201a14beeaad3c48d084d4d38d5c38ddfb76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 1 Dec 2016 16:33:51 +0100 Subject: [PATCH 1/6] Add configurable timeout on RPC calls Fixes #219 --- .../client/ChannelRpcTimeoutException.java | 28 ++++ .../rabbitmq/client/ConnectionFactory.java | 23 +++ .../com/rabbitmq/client/impl/AMQChannel.java | 35 ++++- .../rabbitmq/client/impl/AMQConnection.java | 18 ++- .../client/impl/ConnectionParams.java | 9 ++ .../rabbitmq/client/test/AMQChannelTest.java | 120 +++++++++++++++ .../ChannelRpcTimeoutIntegrationTest.java | 138 ++++++++++++++++++ .../com/rabbitmq/client/test/ClientTests.java | 2 + .../rabbitmq/client/test/RpcTimeoutTest.java | 20 +++ 9 files changed, 382 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java create mode 100644 src/test/java/com/rabbitmq/client/test/AMQChannelTest.java create mode 100644 src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java create mode 100644 src/test/java/com/rabbitmq/client/test/RpcTimeoutTest.java diff --git a/src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java b/src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java new file mode 100644 index 0000000000..5802a68a58 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java @@ -0,0 +1,28 @@ +package com.rabbitmq.client; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * + */ +public class ChannelRpcTimeoutException extends IOException { + + private final Object channel; + + private final Method method; + + public ChannelRpcTimeoutException(TimeoutException cause, Object channel, Method method) { + super(cause); + this.channel = channel; + this.method = method; + } + + public Method getMethod() { + return method; + } + + public Object getChannel() { + return channel; + } +} diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index 0788d75714..a4736bd98c 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -73,6 +73,9 @@ public class ConnectionFactory implements Cloneable { * zero means wait indefinitely */ public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000; + /** The default timeout for RPC calls in channels: no timeout */ + public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = -1; + private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2"; private static final String FALLBACK_TLS_PROTOCOL = "TLSv1"; @@ -116,6 +119,8 @@ public class ConnectionFactory implements Cloneable { private SSLContext sslContext; + private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT; + /** @return the default host to use for connections */ public String getHost() { return host; @@ -937,6 +942,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setRequestedHeartbeat(requestedHeartbeat); result.setShutdownExecutor(shutdownExecutor); result.setHeartbeatExecutor(heartbeatExecutor); + result.setChannelRpcTimeout(channelRpcTimeout); return result; } @@ -1079,4 +1085,21 @@ public void useNio() { public void useBlockingIo() { this.nio = false; } + + /** + * Set the timeout for RPC calls in channels. + * Default is no timeout. + * @param channelRpcTimeout + */ + public void setChannelRpcTimeout(int channelRpcTimeout) { + this.channelRpcTimeout = channelRpcTimeout; + } + + /** + * Get the timeout for RPC calls in channels. + * @return + */ + public int getChannelRpcTimeout() { + return channelRpcTimeout; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java index c568214d66..806d3431b0 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java @@ -19,12 +19,11 @@ import java.io.IOException; import java.util.concurrent.TimeoutException; -import com.rabbitmq.client.AlreadyClosedException; -import com.rabbitmq.client.Command; +import com.rabbitmq.client.*; import com.rabbitmq.client.Method; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.utility.BlockingValueOrException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Base class modelling an AMQ channel. Subclasses implement @@ -37,6 +36,11 @@ * @see Connection */ public abstract class AMQChannel extends ShutdownNotifierComponent { + + private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class); + + private static final int NO_RPC_TIMEOUT = ConnectionFactory.DEFAULT_CHANNEL_RPC_TIMEOUT; + /** * Protected; used instead of synchronizing on the channel itself, * so that clients can themselves use the channel to synchronize @@ -59,6 +63,9 @@ public abstract class AMQChannel extends ShutdownNotifierComponent { /** Whether transmission of content-bearing methods should be blocked */ public volatile boolean _blockContent = false; + /** Timeout for RPC calls */ + private final int _rpcTimeout; + /** * Construct a channel on the given connection, with the given channel number. * @param connection the underlying connection for this channel @@ -67,6 +74,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent { public AMQChannel(AMQConnection connection, int channelNumber) { this._connection = connection; this._channelNumber = channelNumber; + this._rpcTimeout = connection.getChannelRpcTimeout(); } /** @@ -225,8 +233,23 @@ private AMQCommand privateRpc(Method m) // // Calling getReply() on the continuation puts us to sleep // until the connection's reader-thread throws the reply over - // the fence. - return k.getReply(); + // the fence or the RPC times out (if enabled) + if(_rpcTimeout == NO_RPC_TIMEOUT) { + return k.getReply(); + } else { + try { + return k.getReply(_rpcTimeout); + } catch (TimeoutException e) { + try { + // clean RPC channel state + nextOutstandingRpc(); + markRpcFinished(); + } catch(Exception ex) { + LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage()); + } + throw new ChannelRpcTimeoutException(e, this, m); + } + } } private AMQCommand privateRpc(Method m, int timeout) diff --git a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java index aeb5e35884..2a40069380 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java @@ -94,11 +94,7 @@ public static Map defaultClientProperties() { new Version(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR); /** The special channel 0 (not managed by the _channelManager) */ - private final AMQChannel _channel0 = new AMQChannel(this, 0) { - @Override public boolean processAsync(Command c) throws IOException { - return getConnection().processControlCommand(c); - } - }; + private final AMQChannel _channel0; protected ConsumerWorkService _workService = null; @@ -137,6 +133,7 @@ public static Map defaultClientProperties() { private final String password; private final Collection blockedListeners = new CopyOnWriteArrayList(); protected final MetricsCollector metricsCollector; + private final int channelRpcTimeout; /* State modified after start - all volatile */ @@ -228,6 +225,13 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics this.heartbeatExecutor = params.getHeartbeatExecutor(); this.shutdownExecutor = params.getShutdownExecutor(); this.threadFactory = params.getThreadFactory(); + this.channelRpcTimeout = params.getChannelRpcTimeout(); + + this._channel0 = new AMQChannel(this, 0) { + @Override public boolean processAsync(Command c) throws IOException { + return getConnection().processControlCommand(c); + } + }; this._channelManager = null; @@ -1045,4 +1049,8 @@ public String getId() { public void setId(String id) { this.id = id; } + + public int getChannelRpcTimeout() { + return channelRpcTimeout; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java index d13e6facc9..72c9004544 100644 --- a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java +++ b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java @@ -39,6 +39,7 @@ public class ConnectionParams { private SaslConfig saslConfig; private long networkRecoveryInterval; private boolean topologyRecovery; + private int channelRpcTimeout; private ExceptionHandler exceptionHandler; private ThreadFactory threadFactory; @@ -109,6 +110,10 @@ public ThreadFactory getThreadFactory() { return threadFactory; } + public int getChannelRpcTimeout() { + return channelRpcTimeout; + } + public void setUsername(String username) { this.username = username; } @@ -180,4 +185,8 @@ public ScheduledExecutorService getHeartbeatExecutor() { public void setHeartbeatExecutor(ScheduledExecutorService heartbeatExecutor) { this.heartbeatExecutor = heartbeatExecutor; } + + public void setChannelRpcTimeout(int channelRpcTimeout) { + this.channelRpcTimeout = channelRpcTimeout; + } } diff --git a/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java b/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java new file mode 100644 index 0000000000..a6f0891c09 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java @@ -0,0 +1,120 @@ +// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.ChannelRpcTimeoutException; +import com.rabbitmq.client.Command; +import com.rabbitmq.client.Method; +import com.rabbitmq.client.impl.AMQChannel; +import com.rabbitmq.client.impl.AMQCommand; +import com.rabbitmq.client.impl.AMQConnection; +import com.rabbitmq.client.impl.AMQImpl; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AMQChannelTest { + + ScheduledExecutorService scheduler; + + @Before public void init() { + scheduler = Executors.newSingleThreadScheduledExecutor(); + } + + @After public void tearDown() { + scheduler.shutdownNow(); + } + + @Test public void rpcTimesOutWhenResponseDoesNotCome() throws IOException { + int rpcTimeout = 100; + AMQConnection connection = mock(AMQConnection.class); + when(connection.getChannelRpcTimeout()).thenReturn(rpcTimeout); + + DummyAmqChannel channel = new DummyAmqChannel(connection, 1); + Method method = new AMQImpl.Queue.Declare.Builder() + .queue("") + .durable(false) + .exclusive(true) + .autoDelete(true) + .arguments(null) + .build(); + + try { + channel.rpc(method); + fail("Should time out and throw an exception"); + } catch(ChannelRpcTimeoutException e) { + // OK + assertThat((DummyAmqChannel) e.getChannel(), is(channel)); + assertThat(e.getMethod(), is(method)); + assertNull("outstanding RPC should have been cleaned", channel.nextOutstandingRpc()); + } + } + + @Test public void rpcReturnsResultWhenResponseHasCome() throws IOException { + int rpcTimeout = 1000; + AMQConnection connection = mock(AMQConnection.class); + when(connection.getChannelRpcTimeout()).thenReturn(rpcTimeout); + + final DummyAmqChannel channel = new DummyAmqChannel(connection, 1); + Method method = new AMQImpl.Queue.Declare.Builder() + .queue("") + .durable(false) + .exclusive(true) + .autoDelete(true) + .arguments(null) + .build(); + + final Method response = new AMQImpl.Queue.DeclareOk.Builder() + .queue("whatever") + .consumerCount(0) + .messageCount(0).build(); + + scheduler.schedule(new Callable() { + @Override + public Void call() throws Exception { + channel.handleCompleteInboundCommand(new AMQCommand(response)); + return null; + } + }, (long) (rpcTimeout / 2.0), TimeUnit.MILLISECONDS); + + AMQCommand rpcResponse = channel.rpc(method); + assertThat(rpcResponse.getMethod(), is(response)); + } + + static class DummyAmqChannel extends AMQChannel { + + public DummyAmqChannel(AMQConnection connection, int channelNumber) { + super(connection, channelNumber); + } + + @Override + public boolean processAsync(Command command) throws IOException { + return false; + } + } + +} diff --git a/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java b/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java new file mode 100644 index 0000000000..4ee6838562 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java @@ -0,0 +1,138 @@ +// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.*; +import com.rabbitmq.client.impl.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +public class ChannelRpcTimeoutIntegrationTest { + + static long waitTimeOnSomeResponses = 1000L; + + ConnectionFactory factory; + + @Before + public void setUp() throws Exception { + factory = TestUtils.connectionFactory(); + } + + @After + public void tearDown() throws Exception { + factory = null; + } + + @Test public void channelWaitsWhenNoTimeoutSet() throws IOException, TimeoutException { + FrameHandler frameHandler = createFrameHandler(); + ConnectionParams params = factory.params(Executors.newFixedThreadPool(1)); + WaitingAmqConnection connection = new WaitingAmqConnection(params, frameHandler); + try { + connection.start(); + Channel channel = connection.createChannel(); + channel.queueDeclare(); + } finally { + connection.close(); + } + + } + + @Test public void channelThrowsExceptionWhenTimeoutIsSet() throws IOException, TimeoutException { + FrameHandler frameHandler = createFrameHandler(); + ConnectionParams params = factory.params(Executors.newFixedThreadPool(1)); + params.setChannelRpcTimeout((int) (waitTimeOnSomeResponses / 5.0)); + WaitingAmqConnection connection = new WaitingAmqConnection(params, frameHandler); + try { + connection.start(); + Channel channel = connection.createChannel(); + try { + + channel.queueDeclare(); + fail("Should time out and throw an exception"); + } catch(ChannelRpcTimeoutException e) { + // OK + assertThat((Channel) e.getChannel(), is(channel)); + assertThat(e.getMethod(), instanceOf(AMQP.Queue.Declare.class)); + } + } finally { + connection.close(); + } + } + + private FrameHandler createFrameHandler() throws IOException { + SocketFrameHandlerFactory socketFrameHandlerFactory = new SocketFrameHandlerFactory(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, + SocketFactory.getDefault(), new DefaultSocketConfigurator(), false, null); + return socketFrameHandlerFactory.create(new Address("localhost")); + } + + static class WaitingChannel extends ChannelN { + + public WaitingChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) { + super(connection, channelNumber, workService); + } + + @Override + public void handleCompleteInboundCommand(AMQCommand command) throws IOException { + if(command.getMethod() instanceof AMQImpl.Queue.DeclareOk) { + try { + Thread.sleep(waitTimeOnSomeResponses); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + super.handleCompleteInboundCommand(command); + } + } + + static class WaitingChannelManager extends ChannelManager { + + public WaitingChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) { + super(workService, channelMax, threadFactory); + } + + @Override + protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) { + return new WaitingChannel(connection, channelNumber, workService); + } + } + + static class WaitingAmqConnection extends AMQConnection { + + public WaitingAmqConnection(ConnectionParams params, FrameHandler frameHandler) { + super(params, frameHandler); + } + + @Override + protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) { + WaitingChannelManager channelManager = new WaitingChannelManager(_workService, channelMax, threadFactory); + configureChannelManager(channelManager); + return channelManager; + } + } + +} diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java index 3c1371669a..34ec68de85 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTests.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java @@ -27,6 +27,8 @@ BlockingCellTest.class, TruncatedInputStreamTest.class, AMQConnectionTest.class, + AMQChannelTest.class, + ChannelRpcTimeoutIntegrationTest.class, ValueOrExceptionTest.class, BrokenFramesTest.class, ClonePropertiesTest.class, diff --git a/src/test/java/com/rabbitmq/client/test/RpcTimeoutTest.java b/src/test/java/com/rabbitmq/client/test/RpcTimeoutTest.java new file mode 100644 index 0000000000..ee0f3bb290 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/RpcTimeoutTest.java @@ -0,0 +1,20 @@ +// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +public class RpcTimeoutTest { + +} From 03db0469613e01c38cb6dd01369138dfab0b1c99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 1 Dec 2016 16:44:05 +0100 Subject: [PATCH 2/6] Add Javadoc to RPC timeout related classes Fixes #219 --- .../com/rabbitmq/client/ChannelRpcTimeoutException.java | 9 ++++++++- src/main/java/com/rabbitmq/client/ConnectionFactory.java | 4 ++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java b/src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java index 5802a68a58..361dfd3c82 100644 --- a/src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java +++ b/src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java @@ -4,12 +4,19 @@ import java.util.concurrent.TimeoutException; /** - * + * Exception thrown when a channel times out on a RPC call. + * @since 4.1.0 */ public class ChannelRpcTimeoutException extends IOException { + /** + * The channel that performed the RPC. + */ private final Object channel; + /** + * The request method that timed out. + */ private final Method method; public ChannelRpcTimeoutException(TimeoutException cause, Object channel, Method method) { diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index a4736bd98c..420dfb942d 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -119,6 +119,10 @@ public class ConnectionFactory implements Cloneable { private SSLContext sslContext; + /** + * RPC timeout. + * @since 4.1.0 + */ private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT; /** @return the default host to use for connections */ From 4d8b09714c9fad25d6eb862f1008ae008c5afaec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 2 Dec 2016 10:04:09 +0100 Subject: [PATCH 3/6] Set default continuation timeout to 10 minutes Fixes #219 --- ...a => ChannelContinuationTimeoutException.java} | 8 ++++---- .../com/rabbitmq/client/ConnectionFactory.java | 15 ++++++++++----- .../java/com/rabbitmq/client/impl/AMQChannel.java | 8 ++++++-- .../com/rabbitmq/client/impl/AMQConnection.java | 3 +++ .../com/rabbitmq/client/test/AMQChannelTest.java | 4 ++-- .../rabbitmq/client/test/AMQConnectionTest.java | 10 ++++++++++ .../test/ChannelRpcTimeoutIntegrationTest.java | 3 +-- 7 files changed, 36 insertions(+), 15 deletions(-) rename src/main/java/com/rabbitmq/client/{ChannelRpcTimeoutException.java => ChannelContinuationTimeoutException.java} (61%) diff --git a/src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java b/src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java similarity index 61% rename from src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java rename to src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java index 361dfd3c82..18e1fc6027 100644 --- a/src/main/java/com/rabbitmq/client/ChannelRpcTimeoutException.java +++ b/src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java @@ -4,13 +4,13 @@ import java.util.concurrent.TimeoutException; /** - * Exception thrown when a channel times out on a RPC call. + * Exception thrown when a channel times out on a continuation during a RPC call. * @since 4.1.0 */ -public class ChannelRpcTimeoutException extends IOException { +public class ChannelContinuationTimeoutException extends IOException { /** - * The channel that performed the RPC. + * The channel that performed the call. */ private final Object channel; @@ -19,7 +19,7 @@ public class ChannelRpcTimeoutException extends IOException { */ private final Method method; - public ChannelRpcTimeoutException(TimeoutException cause, Object channel, Method method) { + public ChannelContinuationTimeoutException(TimeoutException cause, Object channel, Method method) { super(cause); this.channel = channel; this.method = method; diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index 420dfb942d..25fa5a8fdc 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -33,6 +33,8 @@ import java.util.*; import java.util.concurrent.*; +import static java.util.concurrent.TimeUnit.*; + /** * Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker. */ @@ -73,8 +75,8 @@ public class ConnectionFactory implements Cloneable { * zero means wait indefinitely */ public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000; - /** The default timeout for RPC calls in channels: no timeout */ - public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = -1; + /** The default continuation timeout for RPC calls in channels: 10 minutes */ + public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = (int) MINUTES.toMillis(10); private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2"; @@ -120,7 +122,7 @@ public class ConnectionFactory implements Cloneable { private SSLContext sslContext; /** - * RPC timeout. + * Continuation timeout on RPC calls. * @since 4.1.0 */ private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT; @@ -1091,11 +1093,14 @@ public void useBlockingIo() { } /** - * Set the timeout for RPC calls in channels. - * Default is no timeout. + * Set the continuation timeout for RPC calls in channels. + * Default is 10 minutes. 0 means no timeout. * @param channelRpcTimeout */ public void setChannelRpcTimeout(int channelRpcTimeout) { + if(channelRpcTimeout < 0) { + throw new IllegalArgumentException("Timeout cannot be less than 0"); + } this.channelRpcTimeout = channelRpcTimeout; } diff --git a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java index 806d3431b0..662e06a79f 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java @@ -39,7 +39,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent { private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class); - private static final int NO_RPC_TIMEOUT = ConnectionFactory.DEFAULT_CHANNEL_RPC_TIMEOUT; + private static final int NO_RPC_TIMEOUT = 0; /** * Protected; used instead of synchronizing on the channel itself, @@ -74,6 +74,9 @@ public abstract class AMQChannel extends ShutdownNotifierComponent { public AMQChannel(AMQConnection connection, int channelNumber) { this._connection = connection; this._channelNumber = channelNumber; + if(connection.getChannelRpcTimeout() < 0) { + throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0"); + } this._rpcTimeout = connection.getChannelRpcTimeout(); } @@ -235,6 +238,7 @@ private AMQCommand privateRpc(Method m) // until the connection's reader-thread throws the reply over // the fence or the RPC times out (if enabled) if(_rpcTimeout == NO_RPC_TIMEOUT) { + System.out.println("passe"); return k.getReply(); } else { try { @@ -247,7 +251,7 @@ private AMQCommand privateRpc(Method m) } catch(Exception ex) { LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage()); } - throw new ChannelRpcTimeoutException(e, this, m); + throw new ChannelContinuationTimeoutException(e, this, m); } } } diff --git a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java index 2a40069380..7593fe22ec 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java @@ -225,6 +225,9 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics this.heartbeatExecutor = params.getHeartbeatExecutor(); this.shutdownExecutor = params.getShutdownExecutor(); this.threadFactory = params.getThreadFactory(); + if(params.getChannelRpcTimeout() < 0) { + throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0"); + } this.channelRpcTimeout = params.getChannelRpcTimeout(); this._channel0 = new AMQChannel(this, 0) { diff --git a/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java b/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java index a6f0891c09..1adf3e4d17 100644 --- a/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java +++ b/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java @@ -15,7 +15,7 @@ package com.rabbitmq.client.test; -import com.rabbitmq.client.ChannelRpcTimeoutException; +import com.rabbitmq.client.ChannelContinuationTimeoutException; import com.rabbitmq.client.Command; import com.rabbitmq.client.Method; import com.rabbitmq.client.impl.AMQChannel; @@ -66,7 +66,7 @@ public class AMQChannelTest { try { channel.rpc(method); fail("Should time out and throw an exception"); - } catch(ChannelRpcTimeoutException e) { + } catch(ChannelContinuationTimeoutException e) { // OK assertThat((DummyAmqChannel) e.getChannel(), is(channel)); assertThat(e.getMethod(), is(method)); diff --git a/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java b/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java index 8d5a0d479d..ae8bc1e258 100644 --- a/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java +++ b/src/test/java/com/rabbitmq/client/test/AMQConnectionTest.java @@ -99,6 +99,16 @@ public class AMQConnectionTest { cf.setHandshakeTimeout(7000); } + @Test public void negativeRpcTimeoutIsForbidden() { + ConnectionFactory cf = TestUtils.connectionFactory(); + try { + cf.setChannelRpcTimeout(-10); + fail("expected an exception"); + } catch (IllegalArgumentException _ignored) { + // expected + } + } + /** Check the AMQConnection does send exactly 1 initial header, and deal correctly with * the frame handler throwing an exception when we try to read data */ diff --git a/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java b/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java index 4ee6838562..36be0939bf 100644 --- a/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java +++ b/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java @@ -71,10 +71,9 @@ public void tearDown() throws Exception { connection.start(); Channel channel = connection.createChannel(); try { - channel.queueDeclare(); fail("Should time out and throw an exception"); - } catch(ChannelRpcTimeoutException e) { + } catch(ChannelContinuationTimeoutException e) { // OK assertThat((Channel) e.getChannel(), is(channel)); assertThat(e.getMethod(), instanceOf(AMQP.Queue.Declare.class)); From 4688f2aee47a72be3b24e7005f5771456bb0db1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 2 Dec 2016 10:10:52 +0100 Subject: [PATCH 4/6] Remove a sysout --- src/main/java/com/rabbitmq/client/impl/AMQChannel.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java index 662e06a79f..29f8cd598e 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java @@ -238,7 +238,6 @@ private AMQCommand privateRpc(Method m) // until the connection's reader-thread throws the reply over // the fence or the RPC times out (if enabled) if(_rpcTimeout == NO_RPC_TIMEOUT) { - System.out.println("passe"); return k.getReply(); } else { try { From 8b576a9b2852bd058362a01cd26a1e0045bbd8cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 2 Dec 2016 10:13:27 +0100 Subject: [PATCH 5/6] Delete unused test class --- .../rabbitmq/client/test/RpcTimeoutTest.java | 20 ------------------- 1 file changed, 20 deletions(-) delete mode 100644 src/test/java/com/rabbitmq/client/test/RpcTimeoutTest.java diff --git a/src/test/java/com/rabbitmq/client/test/RpcTimeoutTest.java b/src/test/java/com/rabbitmq/client/test/RpcTimeoutTest.java deleted file mode 100644 index ee0f3bb290..0000000000 --- a/src/test/java/com/rabbitmq/client/test/RpcTimeoutTest.java +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved. -// -// This software, the RabbitMQ Java client library, is triple-licensed under the -// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 -// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see -// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, -// please see LICENSE-APACHE2. -// -// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, -// either express or implied. See the LICENSE file for specific language governing -// rights and limitations of this software. -// -// If you have any questions regarding licensing, please contact us at -// info@rabbitmq.com. - -package com.rabbitmq.client.test; - -public class RpcTimeoutTest { - -} From 6b5d676d15ae2d2761609f09735257ce352a12b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 5 Dec 2016 12:09:43 +0100 Subject: [PATCH 6/6] Add channel number to ChannelContinuationTimeoutException Fixes #219 --- .../ChannelContinuationTimeoutException.java | 32 +++++++++++++++++-- .../com/rabbitmq/client/impl/AMQChannel.java | 2 +- .../rabbitmq/client/test/AMQChannelTest.java | 1 + .../ChannelRpcTimeoutIntegrationTest.java | 1 + 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java b/src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java index 18e1fc6027..d5bca1eeba 100644 --- a/src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java +++ b/src/main/java/com/rabbitmq/client/ChannelContinuationTimeoutException.java @@ -11,25 +11,53 @@ public class ChannelContinuationTimeoutException extends IOException { /** * The channel that performed the call. + * Typed as Object as the underlying + * object that performs the call might + * not be an implementation of {@link Channel}. */ private final Object channel; + /** + * The number of the channel that performed the call. + */ + private final int channelNumber; + /** * The request method that timed out. */ private final Method method; - public ChannelContinuationTimeoutException(TimeoutException cause, Object channel, Method method) { - super(cause); + public ChannelContinuationTimeoutException(TimeoutException cause, Object channel, int channelNumber, Method method) { + super( + "Continuation call for method " + method + " on channel " + channel + " (#" + channelNumber + ") timed out", + cause + ); this.channel = channel; + this.channelNumber = channelNumber; this.method = method; } + /** + * + * @return request method that timed out + */ public Method getMethod() { return method; } + /** + * channel that performed the call + * @return + */ public Object getChannel() { return channel; } + + /** + * + * @return number of the channel that performed the call + */ + public int getChannelNumber() { + return channelNumber; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java index 29f8cd598e..6726f728f8 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java @@ -250,7 +250,7 @@ private AMQCommand privateRpc(Method m) } catch(Exception ex) { LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage()); } - throw new ChannelContinuationTimeoutException(e, this, m); + throw new ChannelContinuationTimeoutException(e, this, this._channelNumber, m); } } } diff --git a/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java b/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java index 1adf3e4d17..460ae3e93c 100644 --- a/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java +++ b/src/test/java/com/rabbitmq/client/test/AMQChannelTest.java @@ -69,6 +69,7 @@ public class AMQChannelTest { } catch(ChannelContinuationTimeoutException e) { // OK assertThat((DummyAmqChannel) e.getChannel(), is(channel)); + assertThat(e.getChannelNumber(), is(channel.getChannelNumber())); assertThat(e.getMethod(), is(method)); assertNull("outstanding RPC should have been cleaned", channel.nextOutstandingRpc()); } diff --git a/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java b/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java index 36be0939bf..dd5b8c0c6f 100644 --- a/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java +++ b/src/test/java/com/rabbitmq/client/test/ChannelRpcTimeoutIntegrationTest.java @@ -76,6 +76,7 @@ public void tearDown() throws Exception { } catch(ChannelContinuationTimeoutException e) { // OK assertThat((Channel) e.getChannel(), is(channel)); + assertThat(e.getChannelNumber(), is(channel.getChannelNumber())); assertThat(e.getMethod(), instanceOf(AMQP.Queue.Declare.class)); } } finally {