From 191a09cc95db35121955c8c4a464bdea46da1773 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Tue, 12 Jul 2022 23:35:46 -0500 Subject: [PATCH] Remove use of reflection from CloudEventMessageUtils Fixes #892 --- .../cloudevent/CloudEventMessageUtils.java | 117 +++++++++++------- ...CloudEventMessageUtilsAndBuilderTests.java | 41 ++++++ 2 files changed, 110 insertions(+), 48 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 37d8c7549..ac5e17a6d 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -16,11 +16,12 @@ package org.springframework.cloud.function.cloudevent; -import java.lang.reflect.Field; import java.net.URI; import java.time.OffsetDateTime; import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import org.springframework.cloud.function.context.message.MessageUtils; @@ -34,7 +35,6 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; -import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; /** @@ -45,6 +45,7 @@ * * @author Oleg Zhurakousky * @author Dave Syer + * @author Chris Bono * @since 3.1 */ public final class CloudEventMessageUtils { @@ -61,12 +62,6 @@ public MimeType resolve(@Nullable MessageHeaders headers) { }; - private static Field MESSAGE_HEADERS = ReflectionUtils.findField(MessageHeaders.class, "headers"); - - static { - MESSAGE_HEADERS.setAccessible(true); - } - private CloudEventMessageUtils() { } @@ -226,15 +221,16 @@ public static Map getAttributes(Message message) { /** * This method does several things. - * First in canonicalizes Cloud Events attributes ensuring that they all prefixed + * First it canonicalizes Cloud Events attributes ensuring that they are all prefixed * with 'ce-' prefix regardless where they came from. - * It also transforms structured-mode Cloud Event to binary-mode and then it canonicalizes attributes + * It also transforms structured-mode Cloud Event to binary-mode and then canonicalizes attributes * as well as described in the previous sentence. */ @SuppressWarnings("unchecked") static Message toCanonical(Message inputMessage, MessageConverter messageConverter) { - Map headers = (Map) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders()); - canonicalizeHeaders(headers, false); + inputMessage = canonicalizeHeadersWithPossibleCopy(inputMessage); + Map headers = new HashMap<>(inputMessage.getHeaders()); + if (isCloudEvent(inputMessage) && headers.containsKey("content-type")) { inputMessage = MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, headers.get("content-type")).build(); } @@ -272,6 +268,67 @@ else if (StringUtils.hasText(inputContentType)) { return inputMessage; } + /** + * Attempts to {@link #canonicalizeHeaders canonicalize} the headers of a message. + * @param message the message + * @return a copy of the message with the canonicalized headers or the passed in unmodified message if no + * headers were canonicalized + */ + // VisibleForTesting + static Message canonicalizeHeadersWithPossibleCopy(Message message) { + Map headers = new HashMap<>(message.getHeaders()); + boolean headersModified = canonicalizeHeaders(headers, false); + if (headersModified) { + message = MessageBuilder.fromMessage(message) + .removeHeaders("*") + .copyHeaders(headers) + .build(); + } + return message; + } + + /** + * Will canonicalize Cloud Event attributes (headers) by ensuring canonical + * prefix for all attributes and extensions regardless of where they came from. + * The canonical prefix is 'ce-'. + * + * So, for example 'ce_source' will become 'ce-source'. + * @param headers message headers + * @param structured boolean signifying that headers map represents structured Cloud Event + * at which point attributes without any prefix will still be treated as + * Cloud Event attributes. + * @return whether the headers were modified during the process + */ + private static boolean canonicalizeHeaders(Map headers, boolean structured) { + boolean modified = false; + String[] keys = headers.keySet().toArray(new String[] {}); + for (String key : keys) { + if (key.startsWith(DEFAULT_ATTR_PREFIX)) { + Object value = headers.remove(key); + String newKey = DEFAULT_ATTR_PREFIX + key.substring(DEFAULT_ATTR_PREFIX.length()); + headers.put(newKey, value); + modified |= (!Objects.equals(key, newKey)); + } + else if (key.startsWith(KAFKA_ATTR_PREFIX)) { + Object value = headers.remove(key); + key = key.substring(KAFKA_ATTR_PREFIX.length()); + headers.put(DEFAULT_ATTR_PREFIX + key, value); + modified = true; + } + else if (key.startsWith(AMQP_ATTR_PREFIX)) { + Object value = headers.remove(key); + key = key.substring(AMQP_ATTR_PREFIX.length()); + headers.put(DEFAULT_ATTR_PREFIX + key, value); + modified = true; + } + else if (structured) { + Object value = headers.remove(key); + headers.put(DEFAULT_ATTR_PREFIX + key, value); + modified = true; + } + } + return modified; + } /** * Determines attribute prefix based on the presence of certain well defined headers. @@ -354,42 +411,6 @@ private static boolean isAttribute(String key) { return key.startsWith(DEFAULT_ATTR_PREFIX) || key.startsWith(AMQP_ATTR_PREFIX) || key.startsWith(KAFKA_ATTR_PREFIX); } - /** - * Will canonicalize Cloud Event attributes (headers) by ensuring canonical - * prefix for all attributes and extensions regardless of where they came from. - * The canonical prefix is 'ce-'. - * - * So, for example 'ce_source' will become 'ce-source'. - * @param headers message headers - * @param structured boolean signifying that headers map represents structured Cloud Event - * at which point attributes without any prefix will still be treated as - * Cloud Event attributes. - */ - private static void canonicalizeHeaders(Map headers, boolean structured) { - String[] keys = headers.keySet().toArray(new String[] {}); - for (String key : keys) { - if (key.startsWith(DEFAULT_ATTR_PREFIX)) { - Object value = headers.remove(key); - key = key.substring(DEFAULT_ATTR_PREFIX.length()); - headers.put(DEFAULT_ATTR_PREFIX + key, value); - } - else if (key.startsWith(KAFKA_ATTR_PREFIX)) { - Object value = headers.remove(key); - key = key.substring(KAFKA_ATTR_PREFIX.length()); - headers.put(DEFAULT_ATTR_PREFIX + key, value); - } - else if (key.startsWith(AMQP_ATTR_PREFIX)) { - Object value = headers.remove(key); - key = key.substring(AMQP_ATTR_PREFIX.length()); - headers.put(DEFAULT_ATTR_PREFIX + key, value); - } - else if (structured) { - Object value = headers.remove(key); - headers.put(DEFAULT_ATTR_PREFIX + key, value); - } - } - } - private static Message buildBinaryMessageFromStructuredMap(Map structuredCloudEvent, MessageHeaders originalHeaders) { Object payload = structuredCloudEvent.remove(DATA); diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java index 3bf9e313b..0161f6367 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java @@ -97,4 +97,45 @@ public void testAttributeRecognitionAndCanonicalization() { assertThat(httpMessage.getHeaders().get("ce-specversion")).isNotNull(); assertThat(CloudEventMessageUtils.getSpecVersion(httpMessage)).isEqualTo("1.0"); } + + @Test + void canonicalizeHeadersWithPossibleCopyReturnsCopyWithUpdatedHeadersWhenModified() { + // TODO add the following test cases + // + // defaultAttrs w/ unmodified keys -> not modified + // defaultAttrs w/ modified keys -> modified + // kafkaAttrs w/ (defaultAttrs+unmodified keys) -> modified + // amqpAttrs -> modified + // structured -> modified + Message inputMessage = MessageBuilder.withPayload("hello") + .setHeader("ce_foo", "bar") + .setHeader("x", "x1") + .setHeader("x|x", "x2") + .build(); + + Message updatedMessage = CloudEventMessageUtils.canonicalizeHeadersWithPossibleCopy(inputMessage); + + assertThat(inputMessage).isNotSameAs(updatedMessage); + assertThat(updatedMessage.getHeaders()) + .containsEntry("ce-foo", "bar") + .containsEntry("x", "x1") + .containsEntry("x|x", "x2"); + } + + @Test + void canonicalizeHeadersWithPossibleCopyReturnsSameInstanceWhenNotModified() { + Message inputMessage = MessageBuilder.withPayload("hello") + .setHeader("ce-foo", "bar") + .setHeader("x", "x1") + .setHeader("x|x", "x2") + .build(); + + Message updatedMessage = CloudEventMessageUtils.canonicalizeHeadersWithPossibleCopy(inputMessage); + + assertThat(inputMessage).isSameAs(updatedMessage); + assertThat(updatedMessage.getHeaders()) + .containsEntry("ce-foo", "bar") + .containsEntry("x", "x1") + .containsEntry("x|x", "x2"); + } }