Skip to content

Commit

Permalink
AMQP-750: Add possibleAuthenticationFailureFatal
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-750

In some cases `PossibleAuthenticationFailureException` is not really
fatal error, so add `possibleAuthenticationFailureFatal` option to the
ListenerContainer to disable a fatal behavior on startup

* Upgrade to SF-4.3.10 and RabbitMQ Client 4.0.3
  • Loading branch information
artembilan committed Jul 20, 2017
1 parent 57a3399 commit 0cdce83
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 18 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ subprojects { subproject ->
log4j2Version = '2.7'
logbackVersion = '1.1.7'
mockitoVersion = '1.10.19'
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '4.0.2'
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '4.0.3'
rabbitmqHttpClientVersion = '1.1.1.RELEASE'
slf4jVersion = "1.7.21"

springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.3.9.RELEASE'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.3.10.RELEASE'

springRetryVersion = '1.2.0.RELEASE'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -87,6 +87,8 @@ public final class RabbitNamespaceUtils {

private static final String MISMATCHED_QUEUES_FATAL = "mismatched-queues-fatal";

private static final String POSSIBLE_AUTHENTICATION_FAILURE_FATAL = "possible-authentication-failure-fatal";

private static final String AUTO_DECLARE = "auto-declare";

private static final String DECLARATION_RETRIES = "declaration-retries";
Expand Down Expand Up @@ -223,7 +225,7 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
if (StringUtils.hasText(recoveryBackOff)) {
parserContext.getReaderContext()
.error("'" + RECOVERY_INTERVAL + "' and '" + RECOVERY_BACK_OFF + "' are mutually exclusive",
containerEle);
containerEle);
}
containerDef.getPropertyValues().add("recoveryInterval", new TypedStringValue(recoveryInterval));
}
Expand All @@ -241,6 +243,12 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
containerDef.getPropertyValues().add("mismatchedQueuesFatal", new TypedStringValue(mismatchedQueuesFatal));
}

String possibleAuthenticationFailureFatal = containerEle.getAttribute(POSSIBLE_AUTHENTICATION_FAILURE_FATAL);
if (StringUtils.hasText(possibleAuthenticationFailureFatal)) {
containerDef.getPropertyValues().add("possibleAuthenticationFailureFatal",
new TypedStringValue(possibleAuthenticationFailureFatal));
}

String autoDeclare = containerEle.getAttribute(AUTO_DECLARE);
if (StringUtils.hasText(autoDeclare)) {
containerDef.getPropertyValues().add("autoDeclare", new TypedStringValue(autoDeclare));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;

import org.springframework.amqp.AmqpAuthenticationException;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.springframework.util.backoff.FixedBackOff;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.PossibleAuthenticationFailureException;
import com.rabbitmq.client.ShutdownSignalException;

/**
Expand Down Expand Up @@ -180,6 +182,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta

private volatile boolean mismatchedQueuesFatal = false;

private boolean possibleAuthenticationFailureFatal = true;

private volatile ConsumerTagStrategy consumerTagStrategy;

private volatile ApplicationEventPublisher applicationEventPublisher;
Expand Down Expand Up @@ -531,6 +535,17 @@ public void setMismatchedQueuesFatal(boolean mismatchedQueuesFatal) {
this.mismatchedQueuesFatal = mismatchedQueuesFatal;
}

/**
* Prevent the container to fail during initialization if a
* {@link com.rabbitmq.client.PossibleAuthenticationFailureException} is thrown.
* Default true.
* @param possibleAuthenticationFailureFatal false do not fail initialization when this condition occurs.
* @since 1.7.4
*/
public void setPossibleAuthenticationFailureFatal(boolean possibleAuthenticationFailureFatal) {
this.possibleAuthenticationFailureFatal = possibleAuthenticationFailureFatal;
}

/**
* {@inheritDoc}
* @since 1.5
Expand Down Expand Up @@ -1409,7 +1424,21 @@ public void run() {
}
}
catch (FatalListenerStartupException ex) {
throw ex;
if (SimpleMessageListenerContainer.this.possibleAuthenticationFailureFatal) {
throw ex;
}
else {
Throwable possibleAuthException = ex.getCause().getCause();
if (possibleAuthException == null ||
!(possibleAuthException instanceof PossibleAuthenticationFailureException)) {
throw ex;
}
else {
this.start.countDown();
handleStartupFailure(this.consumer.getBackOffExecution());
throw possibleAuthException;
}
}
}
catch (Throwable t) { //NOSONAR
this.start.countDown();
Expand Down Expand Up @@ -1509,6 +1538,19 @@ public void run() {
aborted = true;
publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
}
catch (PossibleAuthenticationFailureException ex) {
logger.error("Consumer received fatal=" +
SimpleMessageListenerContainer.this.possibleAuthenticationFailureFatal +
" exception during processing", ex);
if (SimpleMessageListenerContainer.this.possibleAuthenticationFailureFatal) {
this.startupException =
new FatalListenerStartupException("Authentication failure",
new AmqpAuthenticationException(ex));
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);
}
catch (ShutdownSignalException e) {
if (RabbitUtils.isNormalShutdown(e)) {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -1587,11 +1629,11 @@ private void logConsumerException(Throwable t) {
}
else {
if (t instanceof ConsumerCancelledException && this.consumer.isNormalCancel()) {
if (logger.isDebugEnabled()) {
logger.debug(
"Consumer raised exception, processing can restart if the connection factory supports it. "
+ "Exception summary: " + t);
}
if (logger.isDebugEnabled()) {
logger.debug(
"Consumer raised exception, processing can restart if the connection factory supports it. "
+ "Exception summary: " + t);
}
}
else if (logger.isWarnEnabled()) {
logger.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,16 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="possible-authentication-failure-fatal" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
When set to `true` (default), if a `PossibleAuthenticationFailureException` is thrown during connection,
it is considered fatal. This causes the application context to fail to initialize during startup.
When set to `false`, after making the 3 retries, the container will go into recovery mode, as with other problems,
such as the broker being down. The container will attempt to recover according to the `recoveryInterval` property.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="declaration-retries" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2016 the original author or authors.
* Copyright 2010-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -105,6 +105,7 @@ public void testParseWithQueueNames() throws Exception {
assertEquals(1235L, ReflectionTestUtils.getField(container, "idleEventInterval"));
assertEquals("container1", container.getListenerId());
assertTrue(TestUtils.getPropertyValue(container, "mismatchedQueuesFatal", Boolean.class));
assertFalse(TestUtils.getPropertyValue(container, "possibleAuthenticationFailureFatal", Boolean.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.BDDMockito.given;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import org.springframework.amqp.AmqpAuthenticationException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Message;
Expand All @@ -82,6 +84,7 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.PossibleAuthenticationFailureException;


/**
Expand Down Expand Up @@ -155,6 +158,7 @@ public void testDefaultConsumerCount() throws Exception {
public void testLazyConsumerCount() throws Exception {
final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory("localhost");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory) {

@Override
protected void doStart() throws Exception {
// do nothing
Expand Down Expand Up @@ -485,9 +489,9 @@ public void testWithConnectionPerListenerThread() throws Exception {
Channel mockChannel2 = mock(Channel.class);

when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString()))
.thenReturn(mockConnection1)
.thenReturn(mockConnection2)
.thenReturn(null);
.thenReturn(mockConnection1)
.thenReturn(mockConnection2)
.thenReturn(null);
when(mockConnection1.createChannel()).thenReturn(mockChannel1).thenReturn(null);
when(mockConnection2.createChannel()).thenReturn(mockChannel2).thenReturn(null);
when(mockChannel1.isOpen()).thenReturn(true);
Expand All @@ -506,9 +510,9 @@ public void testWithConnectionPerListenerThread() throws Exception {
CountDownLatch latch1 = new CountDownLatch(2);
CountDownLatch latch2 = new CountDownLatch(2);
doAnswer(messageToConsumer(mockChannel1, container, false, latch1))
.when(mockChannel1).basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(), any(Consumer.class));
.when(mockChannel1).basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(), any(Consumer.class));
doAnswer(messageToConsumer(mockChannel2, container, false, latch1))
.when(mockChannel2).basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(), any(Consumer.class));
.when(mockChannel2).basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(), any(Consumer.class));
doAnswer(messageToConsumer(mockChannel1, container, true, latch2)).when(mockChannel1).basicCancel(anyString());
doAnswer(messageToConsumer(mockChannel2, container, true, latch2)).when(mockChannel2).basicCancel(anyString());

Expand Down Expand Up @@ -602,6 +606,26 @@ public BlockingQueueConsumer answer(InvocationOnMock invocation) throws Throwabl
assertThat(n, lessThanOrEqualTo(10));
}

@Test
public void testPossibleAuthenticationFailureNotFatal() {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);

given(connectionFactory.createConnection())
.willThrow(new AmqpAuthenticationException(new PossibleAuthenticationFailureException("intentional")));

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("foo");
container.setPossibleAuthenticationFailureFatal(false);

container.start();

assertTrue(container.isActive());
assertTrue(container.isRunning());

container.destroy();
}

private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
final boolean cancel, final CountDownLatch latch) {
return new Answer<Object>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
max-concurrency="6" receive-timeout="9876" recovery-interval="5555" missing-queues-fatal="false"
min-start-interval="1234" min-stop-interval="2345" min-consecutive-active="12" min-consecutive-idle="34"
declaration-retries="5" failed-declaration-retry-interval="1000" missing-queue-retry-interval="30000"
consumer-tag-strategy="tagger">
consumer-tag-strategy="tagger" possible-authentication-failure-fatal="false">
<rabbit:listener id="container1" queue-names="foo, #{bar.name}" ref="testBean" method="handle" priority="10" />
</rabbit:listener-container>

Expand Down
13 changes: 13 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3994,6 +3994,19 @@ If you wish to limit the checks to just those queues used by a container, you sh
`RabbitAdmin` for the container, and provide a reference to it using the `rabbitAdmin` property.
See <<conditional-declaration>> for more information.

| possibleAuthenticationFailureFatal
(possible-authentication-failure-fatal)

a| When set to `true` (default), if a `PossibleAuthenticationFailureException` is thrown during connection, it is considered fatal.
This causes the application context to fail to initialize during startup.

Since _version 1.7.4_.

When set to `false`, after making the 3 retries, the container will go into recovery mode, as with other problems, such as the broker being down.
The container will attempt to recover according to the `recoveryInterval` property.
During each recovery attempt, each consumer will again try 4 times to start.
This process will continue indefinitely.

| autoDeclare
(auto-declare)

Expand Down

0 comments on commit 0cdce83

Please sign in to comment.