Skip to content
Permalink
Browse files

More JavaUtils - hasText and BiConsumer Flavors

- also fix race in `RabbitTemplateTests`

* Polishing - PR Comments
  • Loading branch information
garyrussell authored and artembilan committed Feb 11, 2019
1 parent 78b1eae commit a98e80e0a194a73354b8a9f7ed5b401ae26142f9
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,9 +19,11 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.utils.JavaUtils;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.AbstractHeaderMapper;
import org.springframework.util.MimeType;
@@ -50,89 +52,51 @@
*/
public class SimpleAmqpHeaderMapper extends AbstractHeaderMapper<MessageProperties> implements AmqpHeaderMapper {

@Override // NOSONAR complexity - mostly null/empty tests
public void fromHeaders(MessageHeaders headers, MessageProperties amqpMessageProperties) { // NOSONAR NCSS lines
String appId = getHeaderIfAvailable(headers, AmqpHeaders.APP_ID, String.class);
if (StringUtils.hasText(appId)) {
amqpMessageProperties.setAppId(appId);
}
String clusterId = getHeaderIfAvailable(headers, AmqpHeaders.CLUSTER_ID, String.class);
if (StringUtils.hasText(clusterId)) {
amqpMessageProperties.setClusterId(clusterId);
}
String contentEncoding = getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_ENCODING, String.class);
if (StringUtils.hasText(contentEncoding)) {
amqpMessageProperties.setContentEncoding(contentEncoding);
}
Long contentLength = getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_LENGTH, Long.class);
if (contentLength != null) {
amqpMessageProperties.setContentLength(contentLength);
}
String contentType = this.extractContentTypeAsString(headers);

if (StringUtils.hasText(contentType)) {
amqpMessageProperties.setContentType(contentType);
}
@Override
public void fromHeaders(MessageHeaders headers, MessageProperties amqpMessageProperties) {
JavaUtils javaUtils = JavaUtils.INSTANCE
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.APP_ID, String.class),
amqpMessageProperties::setAppId)
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CLUSTER_ID, String.class),
amqpMessageProperties::setClusterId)
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_ENCODING, String.class),
amqpMessageProperties::setContentEncoding)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_LENGTH, Long.class),
amqpMessageProperties::setContentLength)
.acceptIfHasText(extractContentTypeAsString(headers), amqpMessageProperties::setContentType);
Object correlationId = headers.get(AmqpHeaders.CORRELATION_ID);
if (correlationId instanceof String) {
amqpMessageProperties.setCorrelationId((String) correlationId);
}
Integer delay = getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class);
if (delay != null) {
amqpMessageProperties.setDelay(delay);
}
MessageDeliveryMode deliveryMode = getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class);
if (deliveryMode != null) {
amqpMessageProperties.setDeliveryMode(deliveryMode);
}
Long deliveryTag = getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class);
if (deliveryTag != null) {
amqpMessageProperties.setDeliveryTag(deliveryTag);
}
String expiration = getHeaderIfAvailable(headers, AmqpHeaders.EXPIRATION, String.class);
if (StringUtils.hasText(expiration)) {
amqpMessageProperties.setExpiration(expiration);
}
Integer messageCount = getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_COUNT, Integer.class);
if (messageCount != null) {
amqpMessageProperties.setMessageCount(messageCount);
}
String messageId = getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_ID, String.class);
if (StringUtils.hasText(messageId)) {
amqpMessageProperties.setMessageId(messageId);
}
Integer priority = getHeaderIfAvailable(headers, AmqpMessageHeaderAccessor.PRIORITY, Integer.class);
if (priority != null) {
amqpMessageProperties.setPriority(priority);
}
String receivedExchange = getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_EXCHANGE, String.class);
if (StringUtils.hasText(receivedExchange)) {
amqpMessageProperties.setReceivedExchange(receivedExchange);
}
String receivedRoutingKey = getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_ROUTING_KEY, String.class);
if (StringUtils.hasText(receivedRoutingKey)) {
amqpMessageProperties.setReceivedRoutingKey(receivedRoutingKey);
}
Boolean redelivered = getHeaderIfAvailable(headers, AmqpHeaders.REDELIVERED, Boolean.class);
if (redelivered != null) {
amqpMessageProperties.setRedelivered(redelivered);
}
String replyTo = getHeaderIfAvailable(headers, AmqpHeaders.REPLY_TO, String.class);
if (replyTo != null) {
amqpMessageProperties.setReplyTo(replyTo);
}
Date timestamp = getHeaderIfAvailable(headers, AmqpHeaders.TIMESTAMP, Date.class);
if (timestamp != null) {
amqpMessageProperties.setTimestamp(timestamp);
}
String type = getHeaderIfAvailable(headers, AmqpHeaders.TYPE, String.class);
if (type != null) {
amqpMessageProperties.setType(type);
}
String userId = getHeaderIfAvailable(headers, AmqpHeaders.USER_ID, String.class);
if (StringUtils.hasText(userId)) {
amqpMessageProperties.setUserId(userId);
}
javaUtils
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class),
amqpMessageProperties::setDelay)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class),
amqpMessageProperties::setDeliveryMode)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class),
amqpMessageProperties::setDeliveryTag)
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.EXPIRATION, String.class),
amqpMessageProperties::setExpiration)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_COUNT, Integer.class),
amqpMessageProperties::setMessageCount)
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_ID, String.class),
amqpMessageProperties::setMessageId)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpMessageHeaderAccessor.PRIORITY, Integer.class),
amqpMessageProperties::setPriority)
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_EXCHANGE, String.class),
amqpMessageProperties::setReceivedExchange)
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_ROUTING_KEY, String.class),
amqpMessageProperties::setReceivedRoutingKey)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REDELIVERED, Boolean.class),
amqpMessageProperties::setRedelivered)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REPLY_TO, String.class),
amqpMessageProperties::setReplyTo)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.TIMESTAMP, Date.class),
amqpMessageProperties::setTimestamp)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.TYPE, String.class),
amqpMessageProperties::setType)
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.USER_ID, String.class),
amqpMessageProperties::setUserId);

String replyCorrelation = getHeaderIfAvailable(headers, AmqpHeaders.SPRING_REPLY_CORRELATION, String.class);
if (StringUtils.hasLength(replyCorrelation)) {
@@ -158,98 +122,47 @@ public void fromHeaders(MessageHeaders headers, MessageProperties amqpMessagePro
}
}

@Override // NOSONAR complexity - mostly null/empty tests
public MessageHeaders toHeaders(MessageProperties amqpMessageProperties) { // NOSONAR NCSS lines
@Override
public MessageHeaders toHeaders(MessageProperties amqpMessageProperties) {
Map<String, Object> headers = new HashMap<String, Object>();
try {
String appId = amqpMessageProperties.getAppId();
if (StringUtils.hasText(appId)) {
headers.put(AmqpHeaders.APP_ID, appId);
}
String clusterId = amqpMessageProperties.getClusterId();
if (StringUtils.hasText(clusterId)) {
headers.put(AmqpHeaders.CLUSTER_ID, clusterId);
}
String contentEncoding = amqpMessageProperties.getContentEncoding();
if (StringUtils.hasText(contentEncoding)) {
headers.put(AmqpHeaders.CONTENT_ENCODING, contentEncoding);
}
BiConsumer<String, Object> putObject = headers::put;
BiConsumer<String, String> putString = headers::put;
JavaUtils javaUtils = JavaUtils.INSTANCE
.acceptIfNotNull(AmqpHeaders.APP_ID, amqpMessageProperties.getAppId(), putObject)
.acceptIfNotNull(AmqpHeaders.CLUSTER_ID, amqpMessageProperties.getClusterId(), putObject)
.acceptIfNotNull(AmqpHeaders.CONTENT_ENCODING, amqpMessageProperties.getContentEncoding(),
putObject);
long contentLength = amqpMessageProperties.getContentLength();
if (contentLength > 0) {
headers.put(AmqpHeaders.CONTENT_LENGTH, contentLength);
}
String contentType = amqpMessageProperties.getContentType();
if (StringUtils.hasText(contentType)) {
headers.put(AmqpHeaders.CONTENT_TYPE, contentType);
}
String correlationId = amqpMessageProperties.getCorrelationId();
if (StringUtils.hasText(correlationId)) {
headers.put(AmqpHeaders.CORRELATION_ID, correlationId);
}
MessageDeliveryMode receivedDeliveryMode = amqpMessageProperties.getReceivedDeliveryMode();
if (receivedDeliveryMode != null) {
headers.put(AmqpHeaders.RECEIVED_DELIVERY_MODE, receivedDeliveryMode);
}
javaUtils
.acceptIfCondition(contentLength > 0, AmqpHeaders.CONTENT_LENGTH, contentLength, putObject)
.acceptIfHasText(AmqpHeaders.CONTENT_TYPE, amqpMessageProperties.getContentType(), putString)
.acceptIfHasText(AmqpHeaders.CORRELATION_ID, amqpMessageProperties.getCorrelationId(), putString)
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELIVERY_MODE,
amqpMessageProperties.getReceivedDeliveryMode(), putObject);
long deliveryTag = amqpMessageProperties.getDeliveryTag();
if (deliveryTag > 0) {
headers.put(AmqpHeaders.DELIVERY_TAG, deliveryTag);
}
String expiration = amqpMessageProperties.getExpiration();
if (StringUtils.hasText(expiration)) {
headers.put(AmqpHeaders.EXPIRATION, expiration);
}
Integer messageCount = amqpMessageProperties.getMessageCount();
if (messageCount != null && messageCount > 0) {
headers.put(AmqpHeaders.MESSAGE_COUNT, messageCount);
}
String messageId = amqpMessageProperties.getMessageId();
if (StringUtils.hasText(messageId)) {
headers.put(AmqpHeaders.MESSAGE_ID, messageId);
}
javaUtils
.acceptIfCondition(deliveryTag > 0, AmqpHeaders.DELIVERY_TAG, deliveryTag, putObject)
.acceptIfHasText(AmqpHeaders.EXPIRATION, amqpMessageProperties.getExpiration(), putString)
.acceptIfNotNull(AmqpHeaders.MESSAGE_COUNT, amqpMessageProperties.getMessageCount(), putObject)
.acceptIfNotNull(AmqpHeaders.MESSAGE_ID, amqpMessageProperties.getMessageId(), putObject);
Integer priority = amqpMessageProperties.getPriority();
if (priority != null && priority > 0) {
headers.put(AmqpMessageHeaderAccessor.PRIORITY, priority);
}
Integer receivedDelay = amqpMessageProperties.getReceivedDelay();
if (receivedDelay != null) {
headers.put(AmqpHeaders.RECEIVED_DELAY, receivedDelay);
}
String receivedExchange = amqpMessageProperties.getReceivedExchange();
if (StringUtils.hasText(receivedExchange)) {
headers.put(AmqpHeaders.RECEIVED_EXCHANGE, receivedExchange);
}
String receivedRoutingKey = amqpMessageProperties.getReceivedRoutingKey();
if (StringUtils.hasText(receivedRoutingKey)) {
headers.put(AmqpHeaders.RECEIVED_ROUTING_KEY, receivedRoutingKey);
}
Boolean redelivered = amqpMessageProperties.isRedelivered();
if (redelivered != null) {
headers.put(AmqpHeaders.REDELIVERED, redelivered);
}
String replyTo = amqpMessageProperties.getReplyTo();
if (replyTo != null) {
headers.put(AmqpHeaders.REPLY_TO, replyTo);
}
Date timestamp = amqpMessageProperties.getTimestamp();
if (timestamp != null) {
headers.put(AmqpHeaders.TIMESTAMP, timestamp);
}
String type = amqpMessageProperties.getType();
if (StringUtils.hasText(type)) {
headers.put(AmqpHeaders.TYPE, type);
}
String userId = amqpMessageProperties.getReceivedUserId();
if (StringUtils.hasText(userId)) {
headers.put(AmqpHeaders.RECEIVED_USER_ID, userId);
}
String consumerTag = amqpMessageProperties.getConsumerTag();
if (StringUtils.hasText(consumerTag)) {
headers.put(AmqpHeaders.CONSUMER_TAG, consumerTag);
}
String consumerQueue = amqpMessageProperties.getConsumerQueue();
if (StringUtils.hasText(consumerQueue)) {
headers.put(AmqpHeaders.CONSUMER_QUEUE, consumerQueue);
}
javaUtils
.acceptIfCondition(priority != null && priority > 0, AmqpMessageHeaderAccessor.PRIORITY, priority,
putObject)
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), putObject)
.acceptIfHasText(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(),
putString)
.acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(),
putString)
.acceptIfNotNull(AmqpHeaders.REDELIVERED, amqpMessageProperties.isRedelivered(), putObject)
.acceptIfNotNull(AmqpHeaders.REPLY_TO, amqpMessageProperties.getReplyTo(), putObject)
.acceptIfNotNull(AmqpHeaders.TIMESTAMP, amqpMessageProperties.getTimestamp(), putObject)
.acceptIfHasText(AmqpHeaders.TYPE, amqpMessageProperties.getType(), putString)
.acceptIfHasText(AmqpHeaders.RECEIVED_USER_ID, amqpMessageProperties.getReceivedUserId(),
putString)
.acceptIfHasText(AmqpHeaders.CONSUMER_TAG, amqpMessageProperties.getConsumerTag(), putString)
.acceptIfHasText(AmqpHeaders.CONSUMER_QUEUE, amqpMessageProperties.getConsumerQueue(), putString);

// Map custom headers
for (Map.Entry<String, Object> entry : amqpMessageProperties.getHeaders().entrySet()) {
@@ -16,8 +16,11 @@

package org.springframework.amqp.utils;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.springframework.util.StringUtils;

/**
* Chained utility methods to simplify some Java repetitive code. Obtain a reference to
* the singleton {@link #INSTANCE} and then chain calls to the utility methods.
@@ -66,4 +69,68 @@ private JavaUtils() {
return this;
}

/**
* Invoke {@link Consumer#accept(Object)} with the value if it is not null or empty.
* @param value the value.
* @param consumer the consumer.
* @return this.
*/
public JavaUtils acceptIfHasText(String value, Consumer<String> consumer) {
if (StringUtils.hasText(value)) {
consumer.accept(value);
}
return this;
}

/**
* Invoke {@link BiConsumer#accept(Object, Object)} with the arguments if the
* condition is true.
* @param condition the condition.
* @param t1 the first consumer argument
* @param t2 the second consumer argument
* @param consumer the consumer.
* @param <T1> the first argument type.
* @param <T2> the second argument type.
* @return this.
*/
public <T1, T2> JavaUtils acceptIfCondition(boolean condition, T1 t1, T2 t2, BiConsumer<T1, T2> consumer) {
if (condition) {
consumer.accept(t1, t2);
}
return this;
}

/**
* Invoke {@link BiConsumer#accept(Object, Object)} with the arguments if the t2
* argument is not null.
* @param t1 the first argument
* @param t2 the second consumer argument
* @param consumer the consumer.
* @param <T1> the first argument type.
* @param <T2> the second argument type.
* @return this.
*/
public <T1, T2> JavaUtils acceptIfNotNull(T1 t1, T2 t2, BiConsumer<T1, T2> consumer) {
if (t2 != null) {
consumer.accept(t1, t2);
}
return this;
}

/**
* Invoke {@link BiConsumer#accept(Object, Object)} with the arguments if the value
* argument is not null or empty.
* @param t1 the first consumer argument.
* @param value the second consumer argument
* @param <T> the first argument type.
* @param consumer the consumer.
* @return this.
*/
public <T> JavaUtils acceptIfHasText(T t1, String value, BiConsumer<T, String> consumer) {
if (StringUtils.hasText(value)) {
consumer.accept(t1, value);
}
return this;
}

}

0 comments on commit a98e80e

Please sign in to comment.
You can’t perform that action at this time.