Skip to content

Commit

Permalink
Add @RabbitListener replyPostProcessor
Browse files Browse the repository at this point in the history
- allow customization of the reply message based on the request message.
  • Loading branch information
garyrussell authored and artembilan committed Feb 25, 2020
1 parent abec859 commit e4c523b
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 12 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -263,4 +263,14 @@
*/
String ackMode() default "";

/**
* The bean name of a
* {@link org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor} to post
* process a response before it is sent.
* @return the bean name.
* @since 2.2.5
* @see org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener#setReplyPostProcessor(org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor)
*/
String replyPostProcessor() default "";

}
Expand Up @@ -51,6 +51,7 @@
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
Expand Down Expand Up @@ -451,6 +452,7 @@ else if (errorHandler instanceof String) {
resolveExecutor(endpoint, rabbitListener, target, beanName);
resolveAdmin(endpoint, rabbitListener, target);
resolveAckMode(endpoint, rabbitListener);
resolvePostProcessor(endpoint, rabbitListener, target, beanName);
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);

this.registrar.registerEndpoint(endpoint, factory);
Expand Down Expand Up @@ -519,8 +521,26 @@ private void resolveExecutor(MethodRabbitListenerEndpoint endpoint, RabbitListen
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on ["
+ execTarget + "] for bean " + beanName + ", no " + TaskExecutor.class.getSimpleName()
+ " with id '" + execBeanName + "' was found in the application context", ex);
+ execTarget + "] for bean " + beanName + ", no 'TaskExecutor' with id '"
+ execBeanName + "' was found in the application context", ex);
}
}
}

@SuppressWarnings("unchecked")
private void resolvePostProcessor(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener,
Object target, String beanName) {

String ppBeanName = resolve(rabbitListener.replyPostProcessor());
if (StringUtils.hasText(ppBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
endpoint.setReplyPostProcessor(this.beanFactory.getBean(ppBeanName, ReplyPostProcessor.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on ["
+ target + "] for bean " + beanName + ", no 'ReplyPostProcessor' with id '"
+ ppBeanName + "' was found in the application context", ex);
}
}
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -455,7 +455,8 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
.acceptIfNotNull(this.retryTemplate, messageListener::setRetryTemplate)
.acceptIfCondition(this.retryTemplate != null && this.recoveryCallback != null,
this.recoveryCallback, messageListener::setRecoveryCallback)
.acceptIfNotNull(this.defaultRequeueRejected, messageListener::setDefaultRequeueRejected);
.acceptIfNotNull(this.defaultRequeueRejected, messageListener::setDefaultRequeueRejected)
.acceptIfNotNull(endpoint.getReplyPostProcessor(), messageListener::setReplyPostProcessor);
}
initializeContainer(instance, endpoint);

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -90,6 +91,8 @@ public abstract class AbstractRabbitListenerEndpoint implements RabbitListenerEn

private AcknowledgeMode ackMode;

private ReplyPostProcessor replyPostProcessor;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
Expand Down Expand Up @@ -318,8 +321,22 @@ public AcknowledgeMode getAckMode() {
return this.ackMode;
}

public void setAckMode(AcknowledgeMode ackMode) {
this.ackMode = ackMode;
public void setAckMode(AcknowledgeMode mode) {
this.ackMode = mode;
}

@Override
public ReplyPostProcessor getReplyPostProcessor() {
return this.replyPostProcessor;
}

/**
* Set a {@link ReplyPostProcessor} to post process a response message before it is sent.
* @param replyPostProcessor the post processor.
* @since 2.2.5
*/
public void setReplyPostProcessor(ReplyPostProcessor replyPostProcessor) {
this.replyPostProcessor = replyPostProcessor;
}

@Override
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -136,4 +137,15 @@ default AcknowledgeMode getAckMode() {
return null;
}

/**
* Return a {@link ReplyPostProcessor} to post process a reply message before it is
* sent.
* @return the post processor.
* @since 2.2.5
*/
@Nullable
default ReplyPostProcessor getReplyPostProcessor() {
return null;
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -117,6 +117,8 @@ public abstract class AbstractAdaptableMessageListener implements ChannelAwareMe

private boolean defaultRequeueRejected = true;

private ReplyPostProcessor replyPostProcessor;

/**
* Set the routing key to use when sending response messages.
* This will be applied in case of a request message that
Expand Down Expand Up @@ -242,6 +244,17 @@ public void setBeanResolver(BeanResolver beanResolver) {
this.evalContext.addPropertyAccessor(new MapAccessor());
}

/**
* Set a {@link ReplyPostProcessor} to post process a response message before it is
* sent. It is called after {@link #postProcessResponse(Message, Message)} which sets
* up the correlationId header.
* @param replyPostProcessor the post processor.
* @since 2.2.5
*/
public void setReplyPostProcessor(ReplyPostProcessor replyPostProcessor) {
this.replyPostProcessor = replyPostProcessor;
}

/**
* Return the converter that will convert incoming Rabbit messages to listener method arguments, and objects
* returned from listener methods back to Rabbit messages.
Expand Down Expand Up @@ -413,6 +426,9 @@ protected void doHandleResult(InvocationResult resultArg, Message request, Chann
props.setTargetBean(resultArg.getBean());
props.setTargetMethod(resultArg.getMethod());
postProcessResponse(request, response);
if (this.replyPostProcessor != null) {
response = this.replyPostProcessor.apply(request, response);
}
Address replyTo = getReplyToAddress(request, source, resultArg);
sendResponse(channel, replyTo, response);
}
Expand Down Expand Up @@ -529,6 +545,7 @@ private Address evaluateReplyTo(Message request, Object source, Object result, E
* @param replyTo the Rabbit ReplyTo string to use when sending. Currently interpreted to be the routing key.
* @param messageIn the Rabbit message to send
* @see #postProcessResponse(Message, Message)
* @see #setReplyPostProcessor(ReplyPostProcessor)
*/
protected void sendResponse(Channel channel, Address replyTo, Message messageIn) {
Message message = messageIn;
Expand Down
@@ -0,0 +1,35 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.listener.adapter;

import java.util.function.BiFunction;

import org.springframework.amqp.core.Message;

/**
* A post processor for replies. The first parameter to the function is the request
* message, the second is the response message; it must return the modified (or a new)
* message. Use this, for example, if you want to copy additional headers from the request
* message.
*
* @author Gary Russell
* @since 2.2.5
*
*/
public interface ReplyPostProcessor extends BiFunction<Message, Message, Message> {

}
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,6 +77,7 @@
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
Expand Down Expand Up @@ -417,6 +418,7 @@ public void endpointWithHeader() {
.isEqualTo("MyService");
assertThat((String) reply.getMessageProperties().getHeader("method"))
.isEqualTo("capitalizeWithHeader");
assertThat((String) reply.getMessageProperties().getHeader("prefix")).isEqualTo("prefix-");
}

@Test
Expand Down Expand Up @@ -1089,7 +1091,7 @@ public String multiQueuesConfig(String foo) {
return foo.toUpperCase() + foo;
}

@RabbitListener(queues = "test.header", group = "testGroup")
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoPrefixHeader")
public String capitalizeWithHeader(@Payload String content, @Header String prefix) {
return prefix + content.toUpperCase();
}
Expand Down Expand Up @@ -1806,6 +1808,14 @@ public DirectExchange internal() {
return directExchange;
}

@Bean
public ReplyPostProcessor echoPrefixHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("prefix", req.getMessageProperties().getHeader("prefix"));
return resp;
};
}

}

@RabbitListener(bindings = @QueueBinding
Expand Down
18 changes: 18 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Expand Up @@ -2635,7 +2635,25 @@ factory.setBeforeSendReplyPostProcessors(msg -> {
----
====

Starting with version 2.2.5, you can configure a `ReplyPostProcessor` to modify the reply message before it is sent; it is called after the `correlationId` header has been set up to match the request.

====
[source, java]
----
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}
@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
----
====

The `@SendTo` value is assumed as a reply `exchange` and `routingKey` pair that follws the `exchange/routingKey` pattern,
where one of those parts can be omitted.
Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Expand Up @@ -63,6 +63,9 @@ When a `@RabbitListener` method returns a result, the bean and `Method` are now
This allows configuration of a `beforeSendReplyMessagePostProcessor` to, for example, set a header in the reply to indicate which method was invoked on the server.
See <<async-annotation-driven-reply>> for more information.

You can now configure a `ReplyPostProcessor` to make modifications to a reply message before it is sent.
See <<async-annotation-driven-reply>> for more information.

==== AMQP Logging Appenders Changes

The Log4J and Logback `AmqpAppender` s now support a `verifyHostname` SSL option.
Expand Down

0 comments on commit e4c523b

Please sign in to comment.