AMQP-221 Support Dead Letter Queues #21

Closed
wants to merge 4 commits into
from
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.amqp;
+
+/**
+ * Exception for listener implementations used to indicate the
+ * basic.reject will be sent with requeue=false in order to enable
+ * features such as DLQ.
+ * @author Gary Russell
+ * @since 1.0.1
+ *
+ */
+@SuppressWarnings("serial")
+public class AmqpRejectAndDontRequeueException extends AmqpException {
+
+ public AmqpRejectAndDontRequeueException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AmqpRejectAndDontRequeueException(String message) {
+ super(message);
+ }
+
+ public AmqpRejectAndDontRequeueException(Throwable cause) {
+ super(cause);
+ }
+
+}
@@ -20,7 +20,7 @@
<org.mockito.version>1.8.4</org.mockito.version>
<org.codehaus.jackson.version>1.4.3</org.codehaus.jackson.version>
<org.erlang.otp.version>1.5.3</org.erlang.otp.version>
- <com.rabbitmq.version>2.5.0</com.rabbitmq.version>
+ <com.rabbitmq.version>2.8.1</com.rabbitmq.version>
<org.springframework.version>3.0.5.RELEASE</org.springframework.version>
</properties>
<licenses>
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2011 the original author or authors.
+ * Copyright 2010-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -33,6 +33,7 @@
/**
* @author Mark Fisher
+ * @author Gary Russell
* @since 1.0
*/
class ListenerContainerParser implements BeanDefinitionParser {
@@ -83,6 +84,7 @@
private static final String ADVICE_CHAIN_ATTRIBUTE = "advice-chain";
+ private static final String REQUEUE_REJECTED_ATTRIBUTE = "requeue-rejected";
public BeanDefinition parse(Element element, ParserContext parserContext) {
CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(),
@@ -243,6 +245,11 @@ private BeanDefinition parseContainer(Element listenerEle, Element containerEle,
containerDef.getPropertyValues().add("txSize", new TypedStringValue(transactionSize));
}
+ String requeueRejected = containerEle.getAttribute(REQUEUE_REJECTED_ATTRIBUTE);
+ if (StringUtils.hasText(requeueRejected)) {
+ containerDef.getPropertyValues().add("defaultRequeueRejected", new TypedStringValue(requeueRejected));
+ }
+
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2011 the original author or authors.
+ * Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
@@ -46,6 +47,7 @@
*
* @author Mark Pollack
* @author Dave Syer
+ * @author Gary Russell
*
*/
public class BlockingQueueConsumer {
@@ -80,20 +82,35 @@
private Set<Long> deliveryTags = new LinkedHashSet<Long>();
+ private final boolean defaultRequeuRejected;
+
/**
* Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker
- * until it is started.
+ * until it is started. RequeueRejected defaults to true.
*/
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, String... queues) {
+ this(connectionFactory, messagePropertiesConverter, activeObjectCounter,
+ acknowledgeMode, transactional, prefetchCount, true, queues);
+ }
+
+ /**
+ * Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker
+ * until it is started.
+ */
+ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
+ MessagePropertiesConverter messagePropertiesConverter,
+ ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
+ boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues) {
this.connectionFactory = connectionFactory;
this.messagePropertiesConverter = messagePropertiesConverter;
this.activeObjectCounter = activeObjectCounter;
this.acknowledgeMode = acknowledgeMode;
this.transactional = transactional;
this.prefetchCount = prefetchCount;
+ this.defaultRequeuRejected = defaultRequeueRejected;
this.queues = queues;
}
@@ -315,9 +332,17 @@ public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Rejecting messages");
}
+ boolean shouldRequeue = this.defaultRequeuRejected;
+ Throwable t = ex;
+ while (shouldRequeue && t != null) {
+ if (t instanceof AmqpRejectAndDontRequeueException) {
+ shouldRequeue = false;
+ }
+ t = t.getCause();
+ }
for (Long deliveryTag : deliveryTags) {
// With newer RabbitMQ brokers could use basicNack here...
- channel.basicReject(deliveryTag, true);
+ channel.basicReject(deliveryTag, shouldRequeue);
}
if (transactional) {
// Need to commit the reject (=nack)
@@ -23,6 +23,7 @@
import org.aopalliance.aop.Advice;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIllegalStateException;
+import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
@@ -94,6 +95,8 @@
private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
+ private volatile boolean defaultRequeueRejected = true;
+
public static interface ContainerDelegate {
void invokeListener(Channel channel, Message message) throws Exception;
}
@@ -219,6 +222,19 @@ public void setMessagePropertiesConverter(MessagePropertiesConverter messageProp
}
/**
+ * Determines the default behavior when a message is rejected, for example because the listener
+ * threw an exception. When true, messages will be requeued, when false, they will not. For
+ * versions of Rabbit that support dead-lettering, the message must not be requeued in order
+ * to be sent to the dead letter exchange. Setting to false causes all rejections to not
+ * be requeued. When true, the default can be overridden by the listener throwing an
+ * {@link AmqpRejectAndDontRequeueException}. Default true.
+ * @param defaultRequeueRejected
+ */
+ public void setDefaultRequeueRejected(boolean defaultRequeueRejected) {
+ this.defaultRequeueRejected = defaultRequeueRejected;
+ }
+
+ /**
* Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
* consumers.
*/
@@ -292,9 +308,17 @@ public int getActiveConsumerCount() {
protected void doStart() throws Exception {
super.doStart();
synchronized (this.consumersMonitor) {
- initializeConsumers();
+ int newConsumers = initializeConsumers();
if (this.consumers == null) {
- logger.info("Consumers were initialized and then cleared (presumably the container was stopped concurrently)");
+ if (logger.isInfoEnabled()) {
+ logger.info("Consumers were initialized and then cleared (presumably the container was stopped concurrently)");
+ }
+ return;
+ }
+ if (newConsumers <= 0) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Consumers are already running");
+ }
return;
}
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
@@ -343,17 +367,20 @@ protected void doShutdown() {
}
- protected void initializeConsumers() {
+ protected int initializeConsumers() {
+ int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
cancellationLock.reset();
this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
for (int i = 0; i < this.concurrentConsumers; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
+ count++;
}
}
}
+ return count;
}
protected boolean isChannelLocallyTransacted(Channel channel) {
@@ -367,7 +394,7 @@ protected BlockingQueueConsumer createBlockingQueueConsumer() {
// didn't get an ack for delivered messages
int actualPrefetchCount = prefetchCount > txSize ? prefetchCount : txSize;
consumer = new BlockingQueueConsumer(getConnectionFactory(), this.messagePropertiesConverter, cancellationLock,
- getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, queues);
+ getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, this.defaultRequeueRejected, queues);
return consumer;
}
@@ -29,7 +29,7 @@
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.impl.LongString;
+import com.rabbitmq.client.LongString;
/**
* Default implementation of the {@link MessagePropertiesConverter} strategy.
@@ -480,6 +480,15 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="requeue-rejected" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Tells the container the default requeue behavior when rejecting messages. Default is 'true' meaning messages
+ will be requeued, unless the listener signals not to by throwing an AmqpRejectAndDontRequeueException. When
+ set to false, messages will never be requeued.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="phase" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
@@ -136,7 +136,7 @@ public static void main(String[] args) {
channel.exchangeDeclare(exchangeName, exchangeType);
final Producer p = new Producer(channel, exchangeName, id, flags, producerTxSize,
1000L * samplingInterval, rateLimit, minMsgSize, timeLimit, messageCount);
- channel.setReturnListener(p);
+ channel.addReturnListener(p);
Thread t = new Thread(p);
producerThreads[i] = t;
t.start();
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2011 the original author or authors.
+ * Copyright 2010-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@
/**
* @author Mark Fisher
+ * @author Gary Russell
*/
public class ListenerContainerParserTests {
@@ -84,6 +85,14 @@ public void testParseWithAdviceChain() throws Exception {
public void testParseWithDefaults() throws Exception {
SimpleMessageListenerContainer container = beanFactory.getBean("container4", SimpleMessageListenerContainer.class);
assertEquals(1, ReflectionTestUtils.getField(container, "concurrentConsumers"));
+ assertEquals(true, ReflectionTestUtils.getField(container, "defaultRequeueRejected"));
+ }
+
+ @Test
+ public void testParseWithDefaultQueueRejectedFalse() throws Exception {
+ SimpleMessageListenerContainer container = beanFactory.getBean("container5", SimpleMessageListenerContainer.class);
+ assertEquals(1, ReflectionTestUtils.getField(container, "concurrentConsumers"));
+ assertEquals(false, ReflectionTestUtils.getField(container, "defaultRequeueRejected"));
}
static class TestBean {
Oops, something went wrong. Retry.