Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basicCancel and basicConsume honor rpc timeout. #278

Merged
merged 3 commits into from
May 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions src/main/java/com/rabbitmq/client/impl/AMQChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {

private static final Logger LOGGER = LoggerFactory.getLogger(AMQChannel.class);

private static final int NO_RPC_TIMEOUT = 0;
protected static final int NO_RPC_TIMEOUT = 0;

/**
* Protected; used instead of synchronizing on the channel itself,
Expand All @@ -64,7 +64,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
public volatile boolean _blockContent = false;

/** Timeout for RPC calls */
private final int _rpcTimeout;
protected final int _rpcTimeout;

/**
* Construct a channel on the given connection, with the given channel number.
Expand Down Expand Up @@ -243,24 +243,38 @@ private AMQCommand privateRpc(Method m)
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
try {
// clean RPC channel state
nextOutstandingRpc();
markRpcFinished();
} catch(Exception ex) {
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
}
throw new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
throw wrapTimeoutException(m, e);
}
}
}

private void cleanRpcChannelState() {
try {
// clean RPC channel state
nextOutstandingRpc();
markRpcFinished();
} catch (Exception ex) {
LOGGER.warn("Error while cleaning timed out channel RPC: {}", ex.getMessage());
}
}

/** Cleans RPC channel state after a timeout and wraps the TimeoutException in a ChannelContinuationTimeoutException */
protected ChannelContinuationTimeoutException wrapTimeoutException(final Method m, final TimeoutException e) {
cleanRpcChannelState();
return new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
}

private AMQCommand privateRpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
rpc(m, k);

return k.getReply(timeout);
try {
return k.getReply(timeout);
} catch (TimeoutException e) {
cleanRpcChannelState();
throw e;
}
}

public void rpc(Method m, RpcContinuation k)
Expand Down
50 changes: 36 additions & 14 deletions src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import com.rabbitmq.client.impl.AMQImpl.Tx;
import com.rabbitmq.utility.Utility;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Main interface to AMQP protocol functionality. Public API -
* Implementation of all AMQChannels except channel zero.
Expand All @@ -49,6 +52,7 @@
*/
public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
private static final String UNSPECIFIED_OUT_OF_BAND = "";
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelN.class);

/** Map from consumer tag to {@link Consumer} instance.
* <p/>
Expand Down Expand Up @@ -1239,18 +1243,26 @@ public String transformReply(AMQCommand replyCommand) {
}
};

rpc(new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build(),
k);
final Method m = new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build();
rpc(m, k);

try {
return k.getReply();
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
Expand All @@ -1267,17 +1279,27 @@ public void basicCancel(final String consumerTag)
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() {
@Override
public Consumer transformReply(AMQCommand replyCommand) {
replyCommand.getMethod();
if (!(replyCommand.getMethod() instanceof Basic.CancelOk))
LOGGER.warn("Received reply {} was not of expected method Basic.CancelOk", replyCommand.getMethod());
_consumers.remove(consumerTag); //may already have been removed
dispatcher.handleCancelOk(originalConsumer, consumerTag);
return originalConsumer;
}
};

rpc(new Basic.Cancel(consumerTag, false), k);

final Method m = new Basic.Cancel(consumerTag, false);
rpc(m, k);

try {
k.getReply(); // discard result
if(_rpcTimeout == NO_RPC_TIMEOUT) {
k.getReply(); // discard result
} else {
try {
k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
Expand Down