Skip to content

Commit

Permalink
AMQP-814: Add retry to RabbitAdmin
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-814

Add retry to avoid race conditions with auto-delete, exclusive queues.

**cherry-pick to 2.0.x**
**back port to 1.7.x, without lambda in RabbitAdmin**
  • Loading branch information
garyrussell authored and artembilan committed May 22, 2018
1 parent 8c43c57 commit 907d5cb
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 5 deletions.
Expand Up @@ -48,6 +48,9 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

import com.rabbitmq.client.AMQP.Queue.DeclareOk;
Expand Down Expand Up @@ -96,13 +99,17 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Applicat

private final RabbitTemplate rabbitTemplate;

private RetryTemplate retryTemplate;

private boolean retryDisabled;

private volatile boolean running = false;

private volatile boolean autoStartup = true;
private boolean autoStartup = true;

private volatile ApplicationContext applicationContext;
private ApplicationContext applicationContext;

private volatile boolean ignoreDeclarationExceptions;
private boolean ignoreDeclarationExceptions;

private final Object lifecycleMonitor = new Object();

Expand Down Expand Up @@ -367,6 +374,25 @@ public Properties getQueueProperties(final String queueName) {
});
}

/**
* Set a retry template for auto declarations. There is a race condition with
* auto-delete, exclusive queues in that the queue might still exist for a short time,
* preventing the redeclaration. The default retry configuration will try 5 times with
* an exponential backOff starting at 1 second a multiplier of 2.0 and a max interval
* of 5 seconds. To disable retry, set the argument to {@code null}. Note that this
* retry is at the macro level - all declarations will be retried within the scope of
* this template. If you supplied a {@link RabbitTemplate} that is configured with a
* {@link RetryTemplate}, its template will retry each individual declaration.
* @param retryTemplate the retry template.
* @since 1.7.8
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
if (retryTemplate == null) {
this.retryDisabled = true;
}
}

// Lifecycle implementation

public boolean isAutoStartup() {
Expand All @@ -391,6 +417,15 @@ public void afterPropertiesSet() {
return;
}

if (this.retryTemplate == null && !this.retryDisabled) {
this.retryTemplate = new RetryTemplate();
this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5));
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(5000);
this.retryTemplate.setBackOffPolicy(backOffPolicy);
}
if (this.connectionFactory instanceof CachingConnectionFactory &&
((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
Expand All @@ -413,7 +448,15 @@ public void afterPropertiesSet() {
* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
* declared for every connection. If anyone has a problem with it: use auto-startup="false".
*/
initialize();
if (this.retryTemplate != null) {
this.retryTemplate.execute(c -> {
initialize();
return null;
});
}
else {
initialize();
}
}
finally {
initializing.compareAndSet(true, false);
Expand Down
Expand Up @@ -28,7 +28,9 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willDoNothing;
import static org.mockito.BDDMockito.willReturn;
Expand All @@ -52,11 +54,13 @@
import java.util.concurrent.TimeoutException;

import org.apache.commons.logging.Log;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;

import org.springframework.amqp.UncategorizedAmqpException;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
Expand Down Expand Up @@ -284,9 +288,10 @@ public void testAvoidHangAMQP_508() {
String longName = new String(new byte[300]).replace('\u0000', 'x');
try {
admin.declareQueue(new Queue(longName));
fail("expected exception");
}
catch (Exception e) {
e.printStackTrace();
// NOSONAR
}
String goodName = "foobar";
admin.declareQueue(new Queue(goodName));
Expand Down Expand Up @@ -352,6 +357,35 @@ public void testWithinInvoke() throws Exception {
verifyZeroInteractions(channel2);
}

@Test
@Ignore // too long; not much value
public void testRetry() throws Exception {
com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
com.rabbitmq.client.Connection connection = mock(com.rabbitmq.client.Connection.class);
given(rabbitConnectionFactory.newConnection((ExecutorService) isNull(), anyString())).willReturn(connection);
Channel channel = mock(Channel.class);
given(connection.createChannel()).willReturn(channel);
given(channel.isOpen()).willReturn(true);
willThrow(new RuntimeException()).given(channel)
.queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
RabbitAdmin admin = new RabbitAdmin(ccf);
GenericApplicationContext ctx = new GenericApplicationContext();
ctx.getBeanFactory().registerSingleton("foo", new AnonymousQueue());
ctx.getBeanFactory().registerSingleton("admin", admin);
admin.setApplicationContext(ctx);
ctx.getBeanFactory().initializeBean(admin, "admin");
ctx.refresh();
try {
ccf.createConnection();
fail("expected exception");
}
catch (UncategorizedAmqpException e) {
// NOSONAR
}
ctx.close();
}

@Configuration
public static class Config {

Expand Down

0 comments on commit 907d5cb

Please sign in to comment.