Skip to content

Commit

Permalink
GH-3089: Add AmqpInGateway.replyHeadersMappedLast (#3091)
Browse files Browse the repository at this point in the history
* GH-3089: Add AmqpInGateway.replyHeadersMappedLast

Fixes #3089

In some use-case we would like to control when headers from SI message
should be populated into an AMQP message.
One of the use-case is like a `SimpleMessageConverter` and its `plain/text`
for the String reply, meanwhile we know that this content is an
`application/json`.
So, with a new `replyHeadersMappedLast` we can override the mentioned
`content-type` header, populated by the `MessageConverter` with an
actual value from the message headers populated in the flow upstream

* Introduce an `AmqpInboundGateway.replyHeadersMappedLast`; expose it
on the DSL and XML level
* Use newly introduced `MappingUtils.mapReplyMessage()`
* Optimize `DefaultAmqpHeaderMapper` to not parse JSON headers at all
when `JsonHeaders.TYPE_ID` is already present (e.g. `MessageConverter`
result)
* Also skip `JsonHeaders` when we `populateUserDefinedHeader()`

**Cherry-pick to 5.1.x**

* * Fix language and package typos
* Add missed `@param` in JavaDoc of the `AmqpBaseInboundGatewaySpec.batchingStrategy()`
* Extract a `RabbitTemplate` `MessageConverter` to use for reply messages
conversion - pursue a backward compatibility
  • Loading branch information
artembilan authored and garyrussell committed Oct 31, 2019
1 parent 315fafd commit 54de7a2
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 101 deletions.
Expand Up @@ -30,6 +30,7 @@
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1
*/
public class AmqpInboundGatewayParser extends AbstractAmqpInboundAdapterParser {
Expand All @@ -48,6 +49,8 @@ protected void configureChannels(Element element, ParserContext parserContext, B
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "request-channel");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "default-reply-to");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-headers-last",
"replyHeadersMappedLast");
}

}
Expand Up @@ -138,4 +138,29 @@ public S recoveryCallback(RecoveryCallback<?> recoveryCallback) {
return _this();
}

/**
* Set to true to bind the source message in the headers.
* @param bindSourceMessage true to bind.
* @return the spec.
* @since 5.1.9
* @see AmqpInboundGateway#setBindSourceMessage(boolean)
*/
public S bindSourceMessage(boolean bindSourceMessage) {
this.target.setBindSourceMessage(bindSourceMessage);
return _this();
}

/**
* When mapping headers for the outbound (reply) message, determine whether the headers are
* mapped before the message is converted, or afterwards.
* @param replyHeadersMappedLast true if reply headers are mapped after conversion.
* @return the spec.
* @since 5.1.9
* @see AmqpInboundGateway#setReplyHeadersMappedLast(boolean)
*/
public S replyHeadersMappedLast(boolean replyHeadersMappedLast) {
this.target.setReplyHeadersMappedLast(replyHeadersMappedLast);
return _this();
}

}
Expand Up @@ -23,8 +23,6 @@
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
Expand All @@ -38,14 +36,14 @@
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.EndpointUtils;
import org.springframework.integration.amqp.support.MappingUtils;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import com.rabbitmq.client.Channel;

Expand All @@ -71,9 +69,11 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {

private final boolean amqpTemplateExplicitlySet;

private volatile MessageConverter amqpMessageConverter = new SimpleMessageConverter();
private MessageConverter amqpMessageConverter = new SimpleMessageConverter();

private volatile AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper();
private MessageConverter templateMessageConverter = this.amqpMessageConverter;

private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper();

private Address defaultReplyTo;

Expand All @@ -83,6 +83,8 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {

private boolean bindSourceMessage;

private boolean replyHeadersMappedLast;

public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer) {
this(listenerContainer, new RabbitTemplate(listenerContainer.getConnectionFactory()), false);
}
Expand Down Expand Up @@ -110,6 +112,9 @@ private AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer, A
this.messageListenerContainer.setAutoStartup(false);
this.amqpTemplate = amqpTemplate;
this.amqpTemplateExplicitlySet = amqpTemplateExplicitlySet;
if (this.amqpTemplateExplicitlySet && this.amqpTemplate instanceof RabbitTemplate) {
this.templateMessageConverter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
}
setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
}

Expand All @@ -125,6 +130,7 @@ public void setMessageConverter(MessageConverter messageConverter) {
this.amqpMessageConverter = messageConverter;
if (!this.amqpTemplateExplicitlySet) {
((RabbitTemplate) this.amqpTemplate).setMessageConverter(messageConverter);
this.templateMessageConverter = messageConverter;
}
}

Expand Down Expand Up @@ -187,6 +193,24 @@ public void setBindSourceMessage(boolean bindSourceMessage) {
this.bindSourceMessage = bindSourceMessage;
}

/**
* When mapping headers for the outbound (reply) message, determine whether the headers are
* mapped before the message is converted, or afterwards. This only affects headers
* that might be added by the message converter. When false, the converter's headers
* win; when true, any headers added by the converter will be overridden (if the
* source message has a header that maps to those headers). You might wish to set this
* to true, for example, when using a
* {@link org.springframework.amqp.support.converter.SimpleMessageConverter} with a
* String payload that contains json; the converter will set the content type to
* {@code text/plain} which can be overridden to {@code application/json} by setting
* the {@link AmqpHeaders#CONTENT_TYPE} message header. Default: false.
* @param replyHeadersMappedLast true if reply headers are mapped after conversion.
* @since 5.1.9
*/
public void setReplyHeadersMappedLast(boolean replyHeadersMappedLast) {
this.replyHeadersMappedLast = replyHeadersMappedLast;
}

@Override
public String getComponentType() {
return "amqp:inbound-gateway";
Expand Down Expand Up @@ -331,7 +355,7 @@ private org.springframework.messaging.Message<Object> convert(Message message, C

private void process(Message message, org.springframework.messaging.Message<Object> messagingMessage) {
setAttributesIfNecessary(message, messagingMessage);
final org.springframework.messaging.Message<?> reply = sendAndReceiveMessage(messagingMessage);
org.springframework.messaging.Message<?> reply = sendAndReceiveMessage(messagingMessage);
if (reply != null) {
Address replyTo;
String replyToProperty = message.getMessageProperties().getReplyTo();
Expand All @@ -342,39 +366,23 @@ private void process(Message message, org.springframework.messaging.Message<Obje
replyTo = AmqpInboundGateway.this.defaultReplyTo;
}

MessagePostProcessor messagePostProcessor =
message1 -> {
MessageProperties messageProperties = message1.getMessageProperties();
String contentEncoding = messageProperties.getContentEncoding();
long contentLength = messageProperties.getContentLength();
String contentType = messageProperties.getContentType();
AmqpInboundGateway.this.headerMapper.fromHeadersToReply(reply.getHeaders(),
messageProperties);
// clear the replyTo from the original message since we are using it now
messageProperties.setReplyTo(null);
// reset the content-* properties as determined by the MessageConverter
if (StringUtils.hasText(contentEncoding)) {
messageProperties.setContentEncoding(contentEncoding);
}
messageProperties.setContentLength(contentLength);
if (contentType != null) {
messageProperties.setContentType(contentType);
}
return message1;
};
org.springframework.amqp.core.Message amqpMessage =
MappingUtils.mapReplyMessage(reply, AmqpInboundGateway.this.templateMessageConverter,
AmqpInboundGateway.this.headerMapper,
message.getMessageProperties().getReceivedDeliveryMode(),
AmqpInboundGateway.this.replyHeadersMappedLast);

if (replyTo != null) {
AmqpInboundGateway.this.amqpTemplate.convertAndSend(replyTo.getExchangeName(),
replyTo.getRoutingKey(), reply.getPayload(), messagePostProcessor);
AmqpInboundGateway.this.amqpTemplate.send(replyTo.getExchangeName(), replyTo.getRoutingKey(),
amqpMessage);
}
else {
if (!AmqpInboundGateway.this.amqpTemplateExplicitlySet) {
throw new IllegalStateException("There is no 'replyTo' message property " +
"and the `defaultReplyTo` hasn't been configured.");
}
else {
AmqpInboundGateway.this.amqpTemplate.convertAndSend(reply.getPayload(),
messagePostProcessor);
AmqpInboundGateway.this.amqpTemplate.send(amqpMessage);
}
}
}
Expand Down
Expand Up @@ -105,7 +105,7 @@ protected DefaultAmqpHeaderMapper(String[] requestHeaderNames, String[] replyHea
*/
@Override
protected Map<String, Object> extractStandardHeaders(MessageProperties amqpMessageProperties) {
Map<String, Object> headers = new HashMap<String, Object>();
Map<String, Object> headers = new HashMap<>();
try {
String appId = amqpMessageProperties.getAppId();
if (StringUtils.hasText(appId)) {
Expand Down Expand Up @@ -325,24 +325,23 @@ else if (allHeaders != null) {
amqpMessageProperties.setUserId(userId);
}

Map<String, String> jsonHeaders = new HashMap<String, String>();

for (String jsonHeader : JsonHeaders.HEADERS) {
Object value = getHeaderIfAvailable(headers, jsonHeader, Object.class);
if (value != null) {
headers.remove(jsonHeader);
if (value instanceof Class<?>) {
value = ((Class<?>) value).getName();
}
jsonHeaders.put(jsonHeader.replaceFirst(JsonHeaders.PREFIX, ""), value.toString());
}
}

/*
* If the MessageProperties already contains JsonHeaders, don't overwrite them here because they were
* set up by a message converter.
*/
if (!amqpMessageProperties.getHeaders().containsKey(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, ""))) {
Map<String, String> jsonHeaders = new HashMap<>();

for (String jsonHeader : JsonHeaders.HEADERS) {
Object value = getHeaderIfAvailable(headers, jsonHeader, Object.class);
if (value != null) {
headers.remove(jsonHeader);
if (value instanceof Class<?>) {
value = ((Class<?>) value).getName();
}
jsonHeaders.put(jsonHeader.replaceFirst(JsonHeaders.PREFIX, ""), value.toString());
}
}
amqpMessageProperties.getHeaders().putAll(jsonHeaders);
}

Expand All @@ -361,8 +360,9 @@ protected void populateUserDefinedHeader(String headerName, Object headerValue,
MessageProperties amqpMessageProperties) {
// do not overwrite an existing header with the same key
// TODO: do we need to expose a boolean 'overwrite' flag?
if (!amqpMessageProperties.getHeaders().containsKey(headerName)
&& !AmqpHeaders.CONTENT_TYPE.equals(headerName)) {
if (!amqpMessageProperties.getHeaders().containsKey(headerName) &&
!AmqpHeaders.CONTENT_TYPE.equals(headerName) &&
!headerName.startsWith(JsonHeaders.PREFIX)) {
amqpMessageProperties.setHeader(headerName, headerValue);
}
}
Expand Down
Expand Up @@ -30,6 +30,8 @@
* Utility methods used during message mapping.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.3
*
*/
Expand All @@ -40,7 +42,7 @@ private MappingUtils() {
}

/**
* Map an o.s.Message to an o.s.a.core.Message. When using a
* Map an o.s.m.Message to an o.s.a.core.Message. When using a
* {@link ContentTypeDelegatingMessageConverter}, {@link AmqpHeaders#CONTENT_TYPE} and
* {@link MessageHeaders#CONTENT_TYPE} will be used for the selection, with the AMQP
* header taking precedence.
Expand All @@ -54,25 +56,64 @@ private MappingUtils() {
public static org.springframework.amqp.core.Message mapMessage(Message<?> requestMessage,
MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode,
boolean headersMappedLast) {

return doMapMessage(requestMessage, converter, headerMapper, defaultDeliveryMode, headersMappedLast, false);
}

/**
* Map a reply o.s.m.Message to an o.s.a.core.Message. When using a
* {@link ContentTypeDelegatingMessageConverter}, {@link AmqpHeaders#CONTENT_TYPE} and
* {@link MessageHeaders#CONTENT_TYPE} will be used for the selection, with the AMQP
* header taking precedence.
* @param replyMessage the reply message.
* @param converter the message converter to use.
* @param headerMapper the header mapper to use.
* @param defaultDeliveryMode the default delivery mode.
* @param headersMappedLast true if headers are mapped after conversion.
* @return the mapped Message.
* @since 5.1.9
*/
public static org.springframework.amqp.core.Message mapReplyMessage(Message<?> replyMessage,
MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode,
boolean headersMappedLast) {

return doMapMessage(replyMessage, converter, headerMapper, defaultDeliveryMode, headersMappedLast, true);
}

private static org.springframework.amqp.core.Message doMapMessage(Message<?> message,
MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode,
boolean headersMappedLast, boolean reply) {

MessageProperties amqpMessageProperties = new MessageProperties();
org.springframework.amqp.core.Message amqpMessage;
if (!headersMappedLast) {
headerMapper.fromHeadersToRequest(requestMessage.getHeaders(), amqpMessageProperties);
mapHeaders(message.getHeaders(), amqpMessageProperties, headerMapper, reply);
}
if (converter instanceof ContentTypeDelegatingMessageConverter && headersMappedLast) {
String contentType = contentTypeAsString(requestMessage.getHeaders());
String contentType = contentTypeAsString(message.getHeaders());
if (contentType != null) {
amqpMessageProperties.setContentType(contentType);
}
}
amqpMessage = converter.toMessage(requestMessage.getPayload(), amqpMessageProperties);
amqpMessage = converter.toMessage(message.getPayload(), amqpMessageProperties);
if (headersMappedLast) {
headerMapper.fromHeadersToRequest(requestMessage.getHeaders(), amqpMessageProperties);
mapHeaders(message.getHeaders(), amqpMessageProperties, headerMapper, reply);
}
checkDeliveryMode(requestMessage, amqpMessageProperties, defaultDeliveryMode);
checkDeliveryMode(message, amqpMessageProperties, defaultDeliveryMode);
return amqpMessage;
}

private static void mapHeaders(MessageHeaders messageHeaders, MessageProperties amqpMessageProperties,
AmqpHeaderMapper headerMapper, boolean reply) {

if (reply) {
headerMapper.fromHeadersToReply(messageHeaders, amqpMessageProperties);
}
else {
headerMapper.fromHeadersToRequest(messageHeaders, amqpMessageProperties);
}
}

private static String contentTypeAsString(MessageHeaders headers) {
Object contentType = headers.get(AmqpHeaders.CONTENT_TYPE);
if (contentType instanceof MimeType) {
Expand Down
Expand Up @@ -239,6 +239,18 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="reply-headers-last">
<xsd:annotation>
<xsd:documentation>
Whether reply headers are mapped before or after conversion from a messaging Message to
a spring amqp Message. Set to true, for example, if you wish to override the
contentType header set by the converter.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string" />
</xsd:simpleType>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down

0 comments on commit 54de7a2

Please sign in to comment.