Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.integration.support.json;

import java.io.IOException;
import java.io.Serial;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -51,6 +52,20 @@
*/
public final class JacksonJsonUtils {

/**
* The packages to trust on JSON deserialization by default.
*/
public static final List<String> DEFAULT_TRUSTED_PACKAGES =
List.of(
"java.util",
"java.lang",
"org.springframework.messaging.support",
"org.springframework.integration.support",
"org.springframework.integration.message",
"org.springframework.integration.store",
"org.springframework.integration.history"
);

private JacksonJsonUtils() {
}

Expand Down Expand Up @@ -99,17 +114,16 @@ public static ObjectMapper messagingAwareMapper(String... trustedPackages) {

/**
* An implementation of {@link ObjectMapper.DefaultTypeResolverBuilder}
* that wraps a default {@link TypeIdResolver} to the {@link AllowlistTypeIdResolver}.
* that wraps a default {@link TypeIdResolver} to the {@link AllowListTypeIdResolver}.
*
* @author Rob Winch
* @author Artem Bilan
* @author Filip Hanik
* @author Gary Russell
*
* @since 4.3.11
*/
private static final class AllowListTypeResolverBuilder extends ObjectMapper.DefaultTypeResolverBuilder {

@Serial
private static final long serialVersionUID = 1L;

private final String[] trustedPackages;
Expand All @@ -133,8 +147,9 @@ protected TypeIdResolver idResolver(MapperConfig<?> config,
JavaType baseType,
PolymorphicTypeValidator subtypeValidator,
Collection<NamedType> subtypes, boolean forSer, boolean forDeser) {

TypeIdResolver result = super.idResolver(config, baseType, subtypeValidator, subtypes, forSer, forDeser);
return new AllowlistTypeIdResolver(result, this.trustedPackages);
return new AllowListTypeIdResolver(result, this.trustedPackages);
}

}
Expand All @@ -146,27 +161,14 @@ protected TypeIdResolver idResolver(MapperConfig<?> config,
*
* @author Rob Winch
* @author Artem Bilan
*
* @since 4.3.11
*/
private static final class AllowlistTypeIdResolver implements TypeIdResolver {

private static final List<String> TRUSTED_PACKAGES =
Arrays.asList(
"java.util",
"java.lang",
"org.springframework.messaging.support",
"org.springframework.integration.support",
"org.springframework.integration.message",
"org.springframework.integration.store",
"org.springframework.integration.history"
);
private static final class AllowListTypeIdResolver implements TypeIdResolver {

private final TypeIdResolver delegate;

private final Set<String> trustedPackages = new LinkedHashSet<>(TRUSTED_PACKAGES);
private final Set<String> trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES);

AllowlistTypeIdResolver(TypeIdResolver delegate, String... trustedPackages) {
AllowListTypeIdResolver(TypeIdResolver delegate, String... trustedPackages) {
this.delegate = delegate;
if (trustedPackages != null) {
for (String trustedPackage : trustedPackages) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,13 +22,18 @@
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.integration.support.management.ManageableSmartLifecycle;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
Expand All @@ -49,6 +54,8 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su

private final KafkaListenerContainerFactory<?> factory;

private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();

private MessageDispatcher dispatcher;

private MessageListenerContainer container;
Expand All @@ -71,6 +78,24 @@ public SubscribableKafkaChannel(KafkaOperations<?, ?> template, KafkaListenerCon
super(template, channelTopic);
Assert.notNull(factory, "'factory' cannot be null");
this.factory = factory;

if (JacksonPresent.isJackson2Present()) {
var messageConverter = new MessagingMessageConverter();
var headerMapper = new DefaultKafkaHeaderMapper();
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
messageConverter.setHeaderMapper(headerMapper);
this.recordListener.setMessageConverter(messageConverter);
}
}


/**
* Set the {@link RecordMessageConverter} to the listener.
* @param messageConverter the converter.
* @since 6.0
*/
public void setMessageConverter(RecordMessageConverter messageConverter) {
this.recordListener.setMessageConverter(messageConverter);
}

@Override
Expand Down Expand Up @@ -113,7 +138,7 @@ protected void onInit() {
String groupId = getGroupId();
ContainerProperties containerProperties = this.container.getContainerProperties();
containerProperties.setGroupId(groupId != null ? groupId : getBeanName());
containerProperties.setMessageListener(new IntegrationRecordMessageListener());
containerProperties.setMessageListener(this.recordListener);
}

protected MessageDispatcher createDispatcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,19 @@
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -108,6 +112,13 @@ public KafkaInboundGateway(AbstractMessageListenerContainer<K, V> messageListene
this.messageListenerContainer.setAutoStartup(false);
this.kafkaTemplate = kafkaTemplate;
setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
if (JacksonPresent.isJackson2Present()) {
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
messageConverter.setHeaderMapper(headerMapper);
this.listener.setMessageConverter(messageConverter);
}
}

/**
Expand Down Expand Up @@ -169,6 +180,7 @@ public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
*/
public void setOnPartitionsAssignedSeekCallback(
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {

this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
}

Expand All @@ -192,9 +204,6 @@ protected void onInit() {
}
}
ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
Object existing = containerProperties.getMessageListener();
Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing
+ ")");
containerProperties.setMessageListener(this.listener);
this.containerDeliveryAttemptPresent = containerProperties.isDeliveryAttemptHeader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package org.springframework.integration.kafka.inbound;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -37,6 +37,7 @@
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
Expand All @@ -48,11 +49,14 @@
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -106,8 +110,6 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo

private boolean containerDeliveryAttemptPresent;

private boolean doFilterInRetry;

/**
* Construct an instance with mode {@link ListenerMode#record}.
* @param messageListenerContainer the container.
Expand All @@ -131,6 +133,15 @@ public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> m
this.messageListenerContainer.setAutoStartup(false);
this.mode = mode;
setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());

if (JacksonPresent.isJackson2Present()) {
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
messageConverter.setHeaderMapper(headerMapper);
this.recordListener.setMessageConverter(messageConverter);
this.batchListener.setMessageConverter(messageConverter);
}
}

/**
Expand All @@ -149,7 +160,6 @@ else if (messageConverter instanceof BatchMessageConverter) {
throw new IllegalArgumentException(
"Message converter must be a 'RecordMessageConverter' or 'BatchMessageConverter'");
}

}

/**
Expand Down Expand Up @@ -280,14 +290,11 @@ protected void onInit() {
super.onInit();

ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
Object existing = containerProperties.getMessageListener();
Assert.state(existing == null, () -> "listener container cannot have an existing message listener (" + existing
+ ")");
if (this.mode.equals(ListenerMode.record)) {
MessageListener<K, V> listener = this.recordListener;

this.doFilterInRetry = this.filterInRetry && this.retryTemplate != null
&& this.recordFilterStrategy != null;
boolean doFilterInRetry =
this.filterInRetry && this.retryTemplate != null && this.recordFilterStrategy != null;

if (this.retryTemplate != null) {
MessageChannel errorChannel = getErrorChannel();
Expand All @@ -296,7 +303,7 @@ protected void onInit() {
}
this.retryTemplate.registerListener(this.recordListener);
}
if (!this.doFilterInRetry && this.recordFilterStrategy != null) {
if (!doFilterInRetry && this.recordFilterStrategy != null) {
listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy,
this.ackDiscarded);
}
Expand Down Expand Up @@ -597,7 +604,7 @@ private Message<?> toMessage(List<ConsumerRecord<K, V>> records, Acknowledgment
}
catch (RuntimeException ex) {
Exception exception = new ConversionException("Failed to convert to message",
records.stream().collect(Collectors.toList()), ex);
new ArrayList<>(records), ex);
MessageChannel errorChannel = getErrorChannel();
if (errorChannel != null) {
getMessagingTemplate().send(errorChannel, buildErrorMessage(message, exception));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LogIfLevelEnabled;
Expand Down Expand Up @@ -233,6 +236,12 @@ public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory,
this.assignTimeout =
Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT)); // NOSONAR - magic
this.commitTimeout = consumerProperties.getSyncCommitTimeout();

if (JacksonPresent.isJackson2Present()) {
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
((MessagingMessageConverter) this.messageConverter).setHeaderMapper(headerMapper);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.kafka.channel.PollableKafkaChannel;
import org.springframework.integration.kafka.channel.PublishSubscribeKafkaChannel;
import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
Expand All @@ -53,6 +55,7 @@

/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 5.4
*
Expand All @@ -70,10 +73,18 @@ void subscribablePtp(@Autowired SubscribableChannel ptp) throws InterruptedExcep
latch.countDown();
});
Message<?> msg = new GenericMessage<>("foo");
NullChannel component = new NullChannel();
component.setBeanName("myNullChannel");
msg = MessageHistory.write(msg, component);
ptp.send(msg, 10_000L);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(message.get().getPayload()).isEqualTo("foo");
assertThat(message.get().getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("channel.1");
Message<?> received = message.get();
assertThat(received.getPayload()).isEqualTo("foo");
assertThat(received.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("channel.1");

MessageHistory messageHistory = MessageHistory.read(received);
assertThat(messageHistory).isNotNull();
assertThat(messageHistory.toString()).isEqualTo("myNullChannel");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ void testKafkaMessageDrivenChannelAdapterOptions() {

messageListener = containerProps.getMessageListener();
assertThat(messageListener.getClass().getName()).contains("$IntegrationRecordMessageListener");

assertThat(adapter).extracting("doFilterInRetry").isEqualTo(Boolean.TRUE);
}

}
Loading