Skip to content

Commit

Permalink
Move Spring dependencies to SNAPSHOT
Browse files Browse the repository at this point in the history
* Fix `AbstractAmqpChannel` to add messaging packages
as allowed for deserialization
* Some code style clean up in the `AbstractSubscribableAmqpChannel`
* Disable Kraft (default) for Kafka tests since they
are not reliable(perhaps only on Windows)
  • Loading branch information
artembilan committed Oct 10, 2023
1 parent 49cd14c commit 0203652
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 25 deletions.
16 changes: 8 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ ext {
lettuceVersion = '6.2.6.RELEASE'
log4jVersion = '2.20.0'
mailVersion = '2.0.2'
micrometerTracingVersion = '1.2.0-M3'
micrometerVersion = '1.12.0-M3'
micrometerTracingVersion = '1.2.0-SNAPSHOT'
micrometerVersion = '1.12.0-SNAPSHOT'
mockitoVersion = '5.5.0'
mongoDriverVersion = '4.10.2'
mysqlVersion = '8.0.33'
Expand All @@ -100,19 +100,19 @@ ext {
postgresVersion = '42.6.0'
protobufVersion = '3.24.3'
r2dbch2Version = '1.0.0.RELEASE'
reactorVersion = '2023.0.0-M3'
reactorVersion = '2023.0.0-SNAPSHOT'
resilience4jVersion = '2.1.0'
romeToolsVersion = '2.1.0'
rsocketVersion = '1.1.4'
servletApiVersion = '6.0.0'
smackVersion = '4.4.6'
springAmqpVersion = '3.1.0-M1'
springDataVersion = '2023.1.0-M3'
springAmqpVersion = '3.1.0-SNAPSHOT'
springDataVersion = '2023.1.0-SNAPSHOT'
springGraphqlVersion = '1.2.3'
springKafkaVersion = '3.1.0-M1'
springKafkaVersion = '3.1.0-SNAPSHOT'
springRetryVersion = '2.0.3'
springSecurityVersion = '6.2.0-M3'
springVersion = '6.1.0-M5'
springSecurityVersion = '6.2.0-SNAPSHOT'
springVersion = '6.1.0-SNAPSHOT'
springWsVersion = '4.0.6'
testcontainersVersion = '1.19.0'
tomcatVersion = '10.1.13'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,11 +23,20 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AllowedListDeserializingMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.MappingUtils;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.integration.support.MutableMessage;
import org.springframework.integration.support.MutableMessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -87,6 +96,19 @@ public abstract class AbstractAmqpChannel extends AbstractMessageChannel impleme
this.amqpTemplate = amqpTemplate;
if (amqpTemplate instanceof RabbitTemplate) {
this.rabbitTemplate = (RabbitTemplate) amqpTemplate;
MessageConverter converter = this.rabbitTemplate.getMessageConverter();
if (converter instanceof AllowedListDeserializingMessageConverter allowedListMessageConverter) {
allowedListMessageConverter.addAllowedListPatterns(
"java.util*",
"java.lang*",
GenericMessage.class.getName(),
ErrorMessage.class.getName(),
AdviceMessage.class.getName(),
MutableMessage.class.getName(),
MessageHeaders.class.getName(),
MutableMessageHeaders.class.getName(),
MessageHistory.class.getName());
}
}
else {
this.rabbitTemplate = null;
Expand Down Expand Up @@ -143,7 +165,7 @@ protected boolean isExtractPayload() {

/**
* When mapping headers for the outbound message, determine whether the headers are
* mapped before the message is converted, or afterwards. This only affects headers
* mapped before the message is converted, or afterward. This only affects headers
* that might be added by the message converter. When false, the converter's headers
* win; when true, any headers added by the converter will be overridden (if the
* source message has a header that maps to those headers). You might wish to set this
Expand Down Expand Up @@ -242,10 +264,6 @@ public void onCreate(Connection connection) {
doDeclares();
}

@Override
public void onClose(Connection connection) {
}

protected abstract void doDeclares();

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
Expand Down Expand Up @@ -77,6 +78,7 @@ abstract class AbstractSubscribableAmqpChannel extends AbstractAmqpChannel
*/
protected AbstractSubscribableAmqpChannel(String channelName, AbstractMessageListenerContainer container,
AmqpTemplate amqpTemplate) {

this(channelName, container, amqpTemplate, false);
}

Expand All @@ -93,6 +95,7 @@ protected AbstractSubscribableAmqpChannel(String channelName, AbstractMessageLis
*/
protected AbstractSubscribableAmqpChannel(String channelName, AbstractMessageListenerContainer container,
AmqpTemplate amqpTemplate, AmqpHeaderMapper outboundMapper, AmqpHeaderMapper inboundMapper) {

this(channelName, container, amqpTemplate, false, outboundMapper, inboundMapper);
}

Expand All @@ -108,6 +111,7 @@ protected AbstractSubscribableAmqpChannel(String channelName, AbstractMessageLis
protected AbstractSubscribableAmqpChannel(String channelName,
AbstractMessageListenerContainer container,
AmqpTemplate amqpTemplate, boolean isPubSub) {

this(channelName, container, amqpTemplate, isPubSub,
DefaultAmqpHeaderMapper.outboundMapper(), DefaultAmqpHeaderMapper.inboundMapper());
}
Expand All @@ -128,14 +132,16 @@ protected AbstractSubscribableAmqpChannel(String channelName,
AbstractMessageListenerContainer container,
AmqpTemplate amqpTemplate, boolean isPubSub,
AmqpHeaderMapper outboundMapper, AmqpHeaderMapper inboundMapper) {

super(amqpTemplate, outboundMapper, inboundMapper);
Assert.notNull(container, "container must not be null");
Assert.hasText(channelName, "channel name must not be empty");
this.channelName = channelName;
this.container = container;
this.isPubSub = isPubSub;
setConnectionFactory(container.getConnectionFactory());
setAdmin(new RabbitAdmin(getConnectionFactory()));
ConnectionFactory connectionFactory = container.getConnectionFactory();
setConnectionFactory(connectionFactory);
setAdmin(new RabbitAdmin(connectionFactory));
}

/**
Expand Down Expand Up @@ -173,11 +179,13 @@ public void onInit() {
setMaxSubscribers(this.maxSubscribers);
String queue = obtainQueueName(this.channelName);
this.container.setQueueNames(queue);
MessageConverter converter = (this.getAmqpTemplate() instanceof RabbitTemplate)
? ((RabbitTemplate) this.getAmqpTemplate()).getMessageConverter()
MessageConverter converter =
(getAmqpTemplate() instanceof RabbitTemplate rabbitTemplate)
? rabbitTemplate.getMessageConverter()
: new SimpleMessageConverter();
MessageListener listener = new DispatchingMessageListener(converter,
this.dispatcher, this, this.isPubSub,

MessageListener listener =
new DispatchingMessageListener(converter, this.dispatcher, this, this.isPubSub,
getMessageBuilderFactory(), getInboundHeaderMapper());
this.container.setMessageListener(listener);
if (!this.container.isActive()) {
Expand Down Expand Up @@ -256,7 +264,7 @@ public void destroy() {

private static final class DispatchingMessageListener implements MessageListener {

private final Log logger = LogFactory.getLog(this.getClass());
private final Log logger = LogFactory.getLog(DispatchingMessageListener.class);

private final MessageDispatcher dispatcher;

Expand All @@ -273,6 +281,7 @@ private static final class DispatchingMessageListener implements MessageListener
private DispatchingMessageListener(MessageConverter converter,
MessageDispatcher dispatcher, AbstractSubscribableAmqpChannel channel, boolean isPubSub,
MessageBuilderFactory messageBuilderFactory, AmqpHeaderMapper inboundHeaderMapper) {

Assert.notNull(converter, "MessageConverter must not be null");
Assert.notNull(dispatcher, "MessageDispatcher must not be null");
this.converter = converter;
Expand Down Expand Up @@ -308,7 +317,7 @@ public void onMessage(org.springframework.amqp.core.Message message) {
}
}

protected Message<Object> buildMessage(org.springframework.amqp.core.Message message, Object converted) {
private Message<Object> buildMessage(org.springframework.amqp.core.Message message, Object converted) {
AbstractIntegrationMessageBuilder<Object> messageBuilder =
this.messageBuilderFactory.withPayload(converted);
if (this.channel.isExtractPayload()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
*
*/
@EmbeddedKafka(controlledShutdown = true,
kraft = false,
topics = {InboundGatewayTests.topic1,
InboundGatewayTests.topic2,
InboundGatewayTests.topic3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -139,8 +140,7 @@ class KafkaProducerMessageHandlerTests {

@BeforeAll
static void setup() {
embeddedKafka = new EmbeddedKafkaBroker(1, true,
topic1, topic2, topic3, topic4, topic5, topic6);
embeddedKafka = new EmbeddedKafkaKraftBroker(1, 2, topic1, topic2, topic3, topic4, topic5, topic6);
embeddedKafka.afterPropertiesSet();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testOut", "true", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
spring.kafka.global.embedded.enabled = true
spring.kafka.embedded.kraft=false
spring.embedded.kafka.brokers.property=spring.global.embedded.kafka.brokers
spring.kafka.embedded.partitions=1

0 comments on commit 0203652

Please sign in to comment.