Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.rabbitmq.client;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Exception thrown when a channel times out on a continuation during a RPC call.
* @since 4.1.0
*/
public class ChannelContinuationTimeoutException extends IOException {

/**
* The channel that performed the call.
* Typed as <code>Object</code> as the underlying
* object that performs the call might
* not be an implementation of {@link Channel}.
*/
private final Object channel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Channel or am I missing something?

Copy link
Contributor Author

@acogoluegnes acogoluegnes Dec 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception is thrown from AMQChannel, which is not a Channel. There are several other solutions:

  • having a Channel property in the exception and casting the AMQChannel to Channel when throwing the exception. We risk in theory a ClassCastException.
  • having an AMQChannel property in the exception, but the implementation leaks up to error handling.
  • having just the channel number and the connection in the exception, which should be enough to diagnose any error.

I think the third solution is a good compromise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a channel number field (we can keep the field we have with a comment).


/**
* The number of the channel that performed the call.
*/
private final int channelNumber;

/**
* The request method that timed out.
*/
private final Method method;

public ChannelContinuationTimeoutException(TimeoutException cause, Object channel, int channelNumber, Method method) {
super(
"Continuation call for method " + method + " on channel " + channel + " (#" + channelNumber + ") timed out",
cause
);
this.channel = channel;
this.channelNumber = channelNumber;
this.method = method;
}

/**
*
* @return request method that timed out
*/
public Method getMethod() {
return method;
}

/**
* channel that performed the call
* @return
*/
public Object getChannel() {
return channel;
}

/**
*
* @return number of the channel that performed the call
*/
public int getChannelNumber() {
return channelNumber;
}
}
32 changes: 32 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.*;
import java.util.concurrent.*;

import static java.util.concurrent.TimeUnit.*;

/**
* Convenience "factory" class to facilitate opening a {@link Connection} to an AMQP broker.
*/
Expand Down Expand Up @@ -73,6 +75,9 @@ public class ConnectionFactory implements Cloneable {
* zero means wait indefinitely */
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;

/** The default continuation timeout for RPC calls in channels: 10 minutes */
public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = (int) MINUTES.toMillis(10);

private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";

private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";
Expand Down Expand Up @@ -116,6 +121,12 @@ public class ConnectionFactory implements Cloneable {

private SSLContext sslContext;

/**
* Continuation timeout on RPC calls.
* @since 4.1.0
*/
private int channelRpcTimeout = DEFAULT_CHANNEL_RPC_TIMEOUT;

/** @return the default host to use for connections */
public String getHost() {
return host;
Expand Down Expand Up @@ -937,6 +948,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setRequestedHeartbeat(requestedHeartbeat);
result.setShutdownExecutor(shutdownExecutor);
result.setHeartbeatExecutor(heartbeatExecutor);
result.setChannelRpcTimeout(channelRpcTimeout);
return result;
}

Expand Down Expand Up @@ -1079,4 +1091,24 @@ public void useNio() {
public void useBlockingIo() {
this.nio = false;
}

/**
* Set the continuation timeout for RPC calls in channels.
* Default is 10 minutes. 0 means no timeout.
* @param channelRpcTimeout
*/
public void setChannelRpcTimeout(int channelRpcTimeout) {
if(channelRpcTimeout < 0) {
throw new IllegalArgumentException("Timeout cannot be less than 0");
}
this.channelRpcTimeout = channelRpcTimeout;
}

/**
* Get the timeout for RPC calls in channels.
* @return
*/
public int getChannelRpcTimeout() {
return channelRpcTimeout;
}
}
38 changes: 32 additions & 6 deletions src/main/java/com/rabbitmq/client/impl/AMQChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.*;
import com.rabbitmq.client.Method;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.BlockingValueOrException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class modelling an AMQ channel. Subclasses implement
Expand All @@ -37,6 +36,11 @@
* @see Connection
*/
public abstract class AMQChannel extends ShutdownNotifierComponent {

private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);

private static final int NO_RPC_TIMEOUT = 0;

/**
* Protected; used instead of synchronizing on the channel itself,
* so that clients can themselves use the channel to synchronize
Expand All @@ -59,6 +63,9 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;

/** Timeout for RPC calls */
private final int _rpcTimeout;

/**
* Construct a channel on the given connection, with the given channel number.
* @param connection the underlying connection for this channel
Expand All @@ -67,6 +74,10 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
public AMQChannel(AMQConnection connection, int channelNumber) {
this._connection = connection;
this._channelNumber = channelNumber;
if(connection.getChannelRpcTimeout() < 0) {
throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
}
this._rpcTimeout = connection.getChannelRpcTimeout();
}

/**
Expand Down Expand Up @@ -225,8 +236,23 @@ private AMQCommand privateRpc(Method m)
//
// Calling getReply() on the continuation puts us to sleep
// until the connection's reader-thread throws the reply over
// the fence.
return k.getReply();
// the fence or the RPC times out (if enabled)
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
try {
// clean RPC channel state
nextOutstandingRpc();
markRpcFinished();
} catch(Exception ex) {
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
}
throw new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
}
}
}

private AMQCommand privateRpc(Method m, int timeout)
Expand Down
21 changes: 16 additions & 5 deletions src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,7 @@ public static Map<String, Object> defaultClientProperties() {
new Version(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR);

/** The special channel 0 (<i>not</i> managed by the <code><b>_channelManager</b></code>) */
private final AMQChannel _channel0 = new AMQChannel(this, 0) {
@Override public boolean processAsync(Command c) throws IOException {
return getConnection().processControlCommand(c);
}
};
private final AMQChannel _channel0;

protected ConsumerWorkService _workService = null;

Expand Down Expand Up @@ -137,6 +133,7 @@ public static Map<String, Object> defaultClientProperties() {
private final String password;
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
protected final MetricsCollector metricsCollector;
private final int channelRpcTimeout;

/* State modified after start - all volatile */

Expand Down Expand Up @@ -228,6 +225,16 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
this.heartbeatExecutor = params.getHeartbeatExecutor();
this.shutdownExecutor = params.getShutdownExecutor();
this.threadFactory = params.getThreadFactory();
if(params.getChannelRpcTimeout() < 0) {
throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
}
this.channelRpcTimeout = params.getChannelRpcTimeout();

this._channel0 = new AMQChannel(this, 0) {
@Override public boolean processAsync(Command c) throws IOException {
return getConnection().processControlCommand(c);
}
};

this._channelManager = null;

Expand Down Expand Up @@ -1045,4 +1052,8 @@ public String getId() {
public void setId(String id) {
this.id = id;
}

public int getChannelRpcTimeout() {
return channelRpcTimeout;
}
}
9 changes: 9 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ConnectionParams {
private SaslConfig saslConfig;
private long networkRecoveryInterval;
private boolean topologyRecovery;
private int channelRpcTimeout;

private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;
Expand Down Expand Up @@ -109,6 +110,10 @@ public ThreadFactory getThreadFactory() {
return threadFactory;
}

public int getChannelRpcTimeout() {
return channelRpcTimeout;
}

public void setUsername(String username) {
this.username = username;
}
Expand Down Expand Up @@ -180,4 +185,8 @@ public ScheduledExecutorService getHeartbeatExecutor() {
public void setHeartbeatExecutor(ScheduledExecutorService heartbeatExecutor) {
this.heartbeatExecutor = heartbeatExecutor;
}

public void setChannelRpcTimeout(int channelRpcTimeout) {
this.channelRpcTimeout = channelRpcTimeout;
}
}
Loading