Skip to content

Commit

Permalink
Remove use of reflection from CloudEventMessageUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
onobc committed Jul 13, 2022
1 parent d84b0c9 commit 191a09c
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

package org.springframework.cloud.function.cloudevent;

import java.lang.reflect.Field;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import org.springframework.cloud.function.context.message.MessageUtils;
Expand All @@ -34,7 +35,6 @@
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

/**
Expand All @@ -45,6 +45,7 @@
*
* @author Oleg Zhurakousky
* @author Dave Syer
* @author Chris Bono
* @since 3.1
*/
public final class CloudEventMessageUtils {
Expand All @@ -61,12 +62,6 @@ public MimeType resolve(@Nullable MessageHeaders headers) {

};

private static Field MESSAGE_HEADERS = ReflectionUtils.findField(MessageHeaders.class, "headers");

static {
MESSAGE_HEADERS.setAccessible(true);
}

private CloudEventMessageUtils() {
}

Expand Down Expand Up @@ -226,15 +221,16 @@ public static Map<String, Object> getAttributes(Message<?> message) {

/**
* This method does several things.
* First in canonicalizes Cloud Events attributes ensuring that they all prefixed
* First it canonicalizes Cloud Events attributes ensuring that they are all prefixed
* with 'ce-' prefix regardless where they came from.
* It also transforms structured-mode Cloud Event to binary-mode and then it canonicalizes attributes
* It also transforms structured-mode Cloud Event to binary-mode and then canonicalizes attributes
* as well as described in the previous sentence.
*/
@SuppressWarnings("unchecked")
static Message<?> toCanonical(Message<?> inputMessage, MessageConverter messageConverter) {
Map<String, Object> headers = (Map<String, Object>) ReflectionUtils.getField(MESSAGE_HEADERS, inputMessage.getHeaders());
canonicalizeHeaders(headers, false);
inputMessage = canonicalizeHeadersWithPossibleCopy(inputMessage);
Map<String, Object> headers = new HashMap<>(inputMessage.getHeaders());

if (isCloudEvent(inputMessage) && headers.containsKey("content-type")) {
inputMessage = MessageBuilder.fromMessage(inputMessage).setHeader(MessageHeaders.CONTENT_TYPE, headers.get("content-type")).build();
}
Expand Down Expand Up @@ -272,6 +268,67 @@ else if (StringUtils.hasText(inputContentType)) {
return inputMessage;
}

/**
* Attempts to {@link #canonicalizeHeaders canonicalize} the headers of a message.
* @param message the message
* @return a copy of the message with the canonicalized headers or the passed in unmodified message if no
* headers were canonicalized
*/
// VisibleForTesting
static Message<?> canonicalizeHeadersWithPossibleCopy(Message<?> message) {
Map<String, Object> headers = new HashMap<>(message.getHeaders());
boolean headersModified = canonicalizeHeaders(headers, false);
if (headersModified) {
message = MessageBuilder.fromMessage(message)
.removeHeaders("*")
.copyHeaders(headers)
.build();
}
return message;
}

/**
* Will canonicalize Cloud Event attributes (headers) by ensuring canonical
* prefix for all attributes and extensions regardless of where they came from.
* The canonical prefix is 'ce-'.
*
* So, for example 'ce_source' will become 'ce-source'.
* @param headers message headers
* @param structured boolean signifying that headers map represents structured Cloud Event
* at which point attributes without any prefix will still be treated as
* Cloud Event attributes.
* @return whether the headers were modified during the process
*/
private static boolean canonicalizeHeaders(Map<String, Object> headers, boolean structured) {
boolean modified = false;
String[] keys = headers.keySet().toArray(new String[] {});
for (String key : keys) {
if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
Object value = headers.remove(key);
String newKey = DEFAULT_ATTR_PREFIX + key.substring(DEFAULT_ATTR_PREFIX.length());
headers.put(newKey, value);
modified |= (!Objects.equals(key, newKey));
}
else if (key.startsWith(KAFKA_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(KAFKA_ATTR_PREFIX.length());
headers.put(DEFAULT_ATTR_PREFIX + key, value);
modified = true;
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(AMQP_ATTR_PREFIX.length());
headers.put(DEFAULT_ATTR_PREFIX + key, value);
modified = true;
}
else if (structured) {
Object value = headers.remove(key);
headers.put(DEFAULT_ATTR_PREFIX + key, value);
modified = true;
}
}
return modified;
}

/**
* Determines attribute prefix based on the presence of certain well defined headers.
Expand Down Expand Up @@ -354,42 +411,6 @@ private static boolean isAttribute(String key) {
return key.startsWith(DEFAULT_ATTR_PREFIX) || key.startsWith(AMQP_ATTR_PREFIX) || key.startsWith(KAFKA_ATTR_PREFIX);
}

/**
* Will canonicalize Cloud Event attributes (headers) by ensuring canonical
* prefix for all attributes and extensions regardless of where they came from.
* The canonical prefix is 'ce-'.
*
* So, for example 'ce_source' will become 'ce-source'.
* @param headers message headers
* @param structured boolean signifying that headers map represents structured Cloud Event
* at which point attributes without any prefix will still be treated as
* Cloud Event attributes.
*/
private static void canonicalizeHeaders(Map<String, Object> headers, boolean structured) {
String[] keys = headers.keySet().toArray(new String[] {});
for (String key : keys) {
if (key.startsWith(DEFAULT_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(DEFAULT_ATTR_PREFIX.length());
headers.put(DEFAULT_ATTR_PREFIX + key, value);
}
else if (key.startsWith(KAFKA_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(KAFKA_ATTR_PREFIX.length());
headers.put(DEFAULT_ATTR_PREFIX + key, value);
}
else if (key.startsWith(AMQP_ATTR_PREFIX)) {
Object value = headers.remove(key);
key = key.substring(AMQP_ATTR_PREFIX.length());
headers.put(DEFAULT_ATTR_PREFIX + key, value);
}
else if (structured) {
Object value = headers.remove(key);
headers.put(DEFAULT_ATTR_PREFIX + key, value);
}
}
}

private static Message<?> buildBinaryMessageFromStructuredMap(Map<String, Object> structuredCloudEvent,
MessageHeaders originalHeaders) {
Object payload = structuredCloudEvent.remove(DATA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,45 @@ public void testAttributeRecognitionAndCanonicalization() {
assertThat(httpMessage.getHeaders().get("ce-specversion")).isNotNull();
assertThat(CloudEventMessageUtils.getSpecVersion(httpMessage)).isEqualTo("1.0");
}

@Test
void canonicalizeHeadersWithPossibleCopyReturnsCopyWithUpdatedHeadersWhenModified() {
// TODO add the following test cases
//
// defaultAttrs w/ unmodified keys -> not modified
// defaultAttrs w/ modified keys -> modified
// kafkaAttrs w/ (defaultAttrs+unmodified keys) -> modified
// amqpAttrs -> modified
// structured -> modified
Message<?> inputMessage = MessageBuilder.withPayload("hello")
.setHeader("ce_foo", "bar")
.setHeader("x", "x1")
.setHeader("x|x", "x2")
.build();

Message<?> updatedMessage = CloudEventMessageUtils.canonicalizeHeadersWithPossibleCopy(inputMessage);

assertThat(inputMessage).isNotSameAs(updatedMessage);
assertThat(updatedMessage.getHeaders())
.containsEntry("ce-foo", "bar")
.containsEntry("x", "x1")
.containsEntry("x|x", "x2");
}

@Test
void canonicalizeHeadersWithPossibleCopyReturnsSameInstanceWhenNotModified() {
Message<?> inputMessage = MessageBuilder.withPayload("hello")
.setHeader("ce-foo", "bar")
.setHeader("x", "x1")
.setHeader("x|x", "x2")
.build();

Message<?> updatedMessage = CloudEventMessageUtils.canonicalizeHeadersWithPossibleCopy(inputMessage);

assertThat(inputMessage).isSameAs(updatedMessage);
assertThat(updatedMessage.getHeaders())
.containsEntry("ce-foo", "bar")
.containsEntry("x", "x1")
.containsEntry("x|x", "x2");
}
}

0 comments on commit 191a09c

Please sign in to comment.