Skip to content

Commit

Permalink
tweaks to recovery retry helper
Browse files Browse the repository at this point in the history
(cherry picked from commit 9b5adbe)

Conflicts:
	src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java
  • Loading branch information
vikinghawk authored and acogoluegnes committed Aug 29, 2018
1 parent c764cdc commit a0ccd99
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ public RetryResult retryConsumerRecovery(RetryContext context) throws Exception

protected <T extends RecordedEntity> RetryResult doRetry(RetryCondition<T> condition, RetryOperation<?> operation, T entity, RetryContext context)
throws Exception {
log(entity, context.exception());
int attempts = 0;
Exception exception = context.exception();
while (attempts < retryAttempts) {
if (condition.test(entity, exception)) {
log(entity, context.exception(), attempts);
backoffPolicy.backoff(attempts + 1);
try {
Object result = operation.call(context);
Expand All @@ -115,11 +115,11 @@ protected <T extends RecordedEntity> RetryResult doRetry(RetryCondition<T> condi
throw exception;
}
}
throw context.exception();
throw exception;
}

protected void log(RecordedEntity entity, Exception exception) {
LOGGER.info("Error while recovering {}, retrying with {} attempt(s).", entity, retryAttempts, exception);
protected void log(RecordedEntity entity, Exception exception, int attempts) {
LOGGER.info("Error while recovering {}, retrying with {} more attempt(s).", entity, retryAttempts - attempts, exception);
}

public static abstract class RetryOperation<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;

import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;

Expand Down Expand Up @@ -136,7 +137,7 @@ public String call(RetryContext context) throws Exception {
public Void call(RetryContext context) throws Exception {
if (context.entity() instanceof RecordedConsumer) {
String queue = context.consumer().getQueue();
for (RecordedBinding recordedBinding : context.connection().getRecordedBindings()) {
for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) {
if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
recordedBinding.recover();
}
Expand All @@ -147,16 +148,15 @@ public Void call(RetryContext context) throws Exception {
};

/**
* Pre-configured {@link DefaultRetryHandler} that retries recovery of bindings and consumers
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers
* when their respective queue is not found.
* This retry handler can be useful for long recovery processes, whereby auto-delete queues
* can be deleted between queue recovery and binding/consumer recovery.
*/
public static final RetryHandler RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)))
.build();
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)));
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void topologyRecoveryRetry() throws Exception {
@Override
protected ConnectionFactory newConnectionFactory() {
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER);
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.build());
connectionFactory.setNetworkRecoveryInterval(1000);
return connectionFactory;
}
Expand Down

0 comments on commit a0ccd99

Please sign in to comment.