Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basicCancel and basicConsume honor rpc timeout. #278

Merged
merged 3 commits into from
May 29, 2017
Merged

basicCancel and basicConsume honor rpc timeout. #278

merged 3 commits into from
May 29, 2017

Conversation

vikinghawk
Copy link
Contributor

Also make sure to remove other e2e bindings for autodeleted exchanges.

See https://groups.google.com/forum/#!topic/rabbitmq-users/pQ46aq6Tf_o for more details related to this PR.

Also make sure to remove other e2e bindings for autodeleted exchanges
@@ -1267,17 +1275,26 @@ public void basicCancel(final String consumerTag)
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() {
@Override
public Consumer transformReply(AMQCommand replyCommand) {
replyCommand.getMethod();
((Basic.CancelOk) replyCommand.getMethod()).getConsumerTag(); // just to make sure its the method expected
Copy link
Contributor Author

@vikinghawk vikinghawk May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be logging another issue related to this. During cluster node failovers we have seen some TimeoutExceptions during connection recovery end up causing ClassCastExceptions because the reply for the timed out rpc request comes in while a 2nd rpc request is waiting. Adding this here to make sure the replyCommand is actually a CancelOk event. I'm not sure there is an easy fix for the timed out rpc replies so I didn't try to tackle it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so you want to trigger the "unsynchronized" RPC responses problem here, right? If so, maybe wrap it in try/catch block and log the exception as a warning, to avoid changing the behavior to much. Thanks.

Copy link
Contributor Author

@vikinghawk vikinghawk May 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

public Consumer transformReply(AMQCommand replyCommand) {
    if (!(replyCommand.getMethod() instanceof Basic.CancelOk))
        LOGGER.warn("Received reply was not of expected method Basic.CancelOk");
    _consumers.remove(consumerTag); //may already have been removed
    dispatcher.handleCancelOk(originalConsumer, consumerTag);
    return originalConsumer;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@@ -805,7 +805,7 @@ void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
// last binding where this exchange is the source is gone, remove recorded exchange
// if it is auto-deleted. See bug 26364.
if((x != null) && x.isAutoDelete()) {
this.recordedExchanges.remove(exchange);
deleteRecordedExchange(exchange);
Copy link
Contributor Author

@vikinghawk vikinghawk May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was possible to have abandoned e2e bindings in the recordedBindings before this change.

Workflow to reproduce:

  1. create durable topic exchange -> auto-delete headers exchange -> auto-delete queue -> consumer
  2. cancel consumer
  3. calls maybeDeleteRecordedAutoDeleteQueue which ends up deleting the recorded queue
  4. calls maybeDeleteRecordedAutoDeleteExchange and removes the recorded AD headers exchange

In that case the e2e binding from the durable topic exchange to the now deleted headers exchange still existed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't directly related to the issue you fix here. Could you address this one in a separate PR? It's better for versioning, changelog, etc. Thanks :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

Copy link
Contributor Author

@vikinghawk vikinghawk May 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -805,7 +805,7 @@ void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
// last binding where this exchange is the source is gone, remove recorded exchange
// if it is auto-deleted. See bug 26364.
if((x != null) && x.isAutoDelete()) {
this.recordedExchanges.remove(exchange);
deleteRecordedExchange(exchange);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't directly related to the issue you fix here. Could you address this one in a separate PR? It's better for versioning, changelog, etc. Thanks :)

@@ -1267,17 +1275,26 @@ public void basicCancel(final String consumerTag)
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() {
@Override
public Consumer transformReply(AMQCommand replyCommand) {
replyCommand.getMethod();
((Basic.CancelOk) replyCommand.getMethod()).getConsumerTag(); // just to make sure its the method expected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so you want to trigger the "unsynchronized" RPC responses problem here, right? If so, maybe wrap it in try/catch block and log the exception as a warning, to avoid changing the behavior to much. Thanks.

@acogoluegnes acogoluegnes merged commit d72ee8a into rabbitmq:4.1.x-stable May 29, 2017
@acogoluegnes
Copy link
Contributor

@vikinghawk Thanks!

@acogoluegnes acogoluegnes added this to the 4.1.1 milestone May 30, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants