Skip to content

Commit

Permalink
@RabbitListener override acknowlegment mode
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell authored and artembilan committed May 9, 2019
1 parent cdbdb9f commit 329734f
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 4 deletions.
Expand Up @@ -253,4 +253,14 @@
*/
String executor() default "";

/**
* Override the container factory
* {@link org.springframework.amqp.core.AcknowledgeMode} property. Must be one of the
* valid enumerations. If a SpEL expression is provided, it must evaluate to a
* {@link String} or {@link org.springframework.amqp.core.AcknowledgeMode}.
* @return the acknowledgement mode.
* @since 2.2
*/
String ackMode() default "";

}
Expand Up @@ -35,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Base64UrlNamingStrategy;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
Expand Down Expand Up @@ -397,7 +398,7 @@ private Method checkProxy(Method methodArg, Object bean) {
method = iface.getMethod(method.getName(), method.getParameterTypes());
break;
}
catch (NoSuchMethodException noMethod) {
catch (@SuppressWarnings("unused") NoSuchMethodException noMethod) {
}
}
}
Expand Down Expand Up @@ -466,11 +467,28 @@ else if (errorHandler instanceof String) {

resolveExecutor(endpoint, rabbitListener, target, beanName);
resolveAdmin(endpoint, rabbitListener, target);
resolveAckMode(endpoint, rabbitListener);
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);

this.registrar.registerEndpoint(endpoint, factory);
}

private void resolveAckMode(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener) {
String ackModeAttr = rabbitListener.ackMode();
if (StringUtils.hasText(ackModeAttr)) {
Object ackMode = resolveExpression(ackModeAttr);
if (ackMode instanceof String) {
endpoint.setAckMode(AcknowledgeMode.valueOf((String) ackMode));
}
else if (ackMode instanceof AcknowledgeMode) {
endpoint.setAckMode((AcknowledgeMode) ackMode);
}
else {
Assert.isNull(ackMode, "ackMode must resolve to a String or AcknowledgeMode");
}
}
}

private void resolveAdmin(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object adminTarget) {
String rabbitAdmin = resolve(rabbitListener.admin());
if (StringUtils.hasText(rabbitAdmin)) {
Expand Down
Expand Up @@ -404,7 +404,8 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
if (endpoint != null) { // endpoint settings overriding default factory settings
javaUtils
.acceptIfNotNull(endpoint.getAutoStartup(), instance::setAutoStartup)
.acceptIfNotNull(endpoint.getTaskExecutor(), instance::setTaskExecutor);
.acceptIfNotNull(endpoint.getTaskExecutor(), instance::setTaskExecutor)
.acceptIfNotNull(endpoint.getAckMode(), instance::setAcknowledgeMode);
javaUtils
.acceptIfNotNull(this.batchingStrategy, endpoint::setBatchingStrategy);
instance.setListenerId(endpoint.getId());
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
Expand Down Expand Up @@ -87,6 +88,8 @@ public abstract class AbstractRabbitListenerEndpoint implements RabbitListenerEn

private BatchingStrategy batchingStrategy;

private AcknowledgeMode ackMode;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
Expand Down Expand Up @@ -309,6 +312,16 @@ public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
this.batchingStrategy = batchingStrategy;
}

@Override
@Nullable
public AcknowledgeMode getAckMode() {
return this.ackMode;
}

public void setAckMode(AcknowledgeMode ackMode) {
this.ackMode = ackMode;
}

@Override
public void setupListenerContainer(MessageListenerContainer listenerContainer) {
AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) listenerContainer;
Expand Down
Expand Up @@ -16,6 +16,7 @@

package org.springframework.amqp.rabbit.listener;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.core.support.BatchingStrategy;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.core.task.TaskExecutor;
Expand Down Expand Up @@ -125,4 +126,14 @@ default void setBatchingStrategy(BatchingStrategy batchingStrategy) {
// NOSONAR empty
}

/**
* Override the container factory's {@link AcknowledgeMode}.
* @return the acknowledgment mode.
* @since 2.2
*/
@Nullable
default AcknowledgeMode getAckMode() {
return null;
}

}
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.io.Serializable;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.mockito.Mockito;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
Expand Down Expand Up @@ -167,7 +169,8 @@ public class EnableRabbitIntegrationTests {
"test.converted.foomessage", "test.notconverted.messagingmessagenotgeneric", "test.simple.direct",
"test.simple.direct2", "test.generic.list", "test.generic.map",
"amqp656dlq", "test.simple.declare", "test.return.exceptions", "test.pojo.errors", "test.pojo.errors2",
"test.messaging.message", "test.amqp.message", "test.bytes.to.string", "test.projection");
"test.messaging.message", "test.amqp.message", "test.bytes.to.string", "test.projection",
"manual.acks.1", "manual.acks.2");

@Autowired
private RabbitTemplate rabbitTemplate;
Expand Down Expand Up @@ -842,6 +845,14 @@ public void bytesToString() {
assertThat(message.getBody()).isEqualTo("BYTES".getBytes());
}

@Test
public void testManualOverride() {
assertThat(TestUtils.getPropertyValue(this.registry.getListenerContainer("manual.acks.1"), "acknowledgeMode"))
.isEqualTo(AcknowledgeMode.MANUAL);
assertThat(TestUtils.getPropertyValue(this.registry.getListenerContainer("manual.acks.2"), "acknowledgeMode"))
.isEqualTo(AcknowledgeMode.MANUAL);
}

interface TxService {

@Transactional
Expand Down Expand Up @@ -1171,6 +1182,24 @@ public String bytesToString(String in) {
return in.toUpperCase();
}

@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public String manual1(String in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {

channel.basicAck(tag, false);
return in.toUpperCase();
}

@RabbitListener(id = "manual.acks.2", queues = "manual.acks.2",
ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}")

public String manual2(String in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {

channel.basicAck(tag, false);
return in.toUpperCase();
}

}

public static class JsonObject {
Expand Down
17 changes: 17 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Expand Up @@ -2215,6 +2215,8 @@ For `DirectMessageListenerContainer` instances, you can use XML similar to the f
----
====

[[listener-property-overrides]]

Starting with version 2.0, the `@RabbitListener` annotation has a `concurrency` property.
It supports SpEL expressions (`#{...}`) and property placeholders (`${...}`).
Its meaning and allowed values depend on the container type, as follows:
Expand All @@ -2228,6 +2230,21 @@ Previously you had to define different container factories if you had listeners
The annotation also allows overriding the factory `autoStartup` and `taskExecutor` properties via the `autoStartup` and `executor` (since 2.2) annotation properties.
Using a different executor for each might help with identifying threads associated with each listener in logs and thread dumps.

Version 2.2 also added the `ackMode` property, which allows you to override the container factory's `acknowledgeMode` property.

====
[source, java]
----
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
channel.basicAck(tag, false);
}
----
====

[[async-annotation-conversion]]
====== Message Conversion for Annotated Methods

Expand Down
3 changes: 2 additions & 1 deletion src/reference/asciidoc/whats-new.adoc
Expand Up @@ -8,7 +8,8 @@ This section describes the changes between version 2.1 and version 2.2.
===== @RabbitListener Changes

You can now configure an `executor` on each listener, overriding the factory configuration, to more easily identify threads associated with the listener.
See <<async-annotation-driven-enable>> for more information.
You can now override the container factory's `acknowledgeMode` property with the annotation's `ackMode` property.
See <<listener-property-overrides,overriding container factory properties>> for more information.

When using <<receiving-batch,batching>>, `@RabbitListener` methods can now receive a complete batch of messages in one call instead of getting them one at at time.

Expand Down

0 comments on commit 329734f

Please sign in to comment.