Skip to content

Add Spring Integration default poller auto-config #27992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.boot.autoconfigure.integration;

import java.time.Duration;

import javax.management.MBeanServer;
import javax.sql.DataSource;

Expand Down Expand Up @@ -56,10 +58,14 @@
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
Expand Down Expand Up @@ -110,6 +116,29 @@ public static org.springframework.integration.context.IntegrationProperties inte
@EnableIntegration
protected static class IntegrationConfiguration {

@Bean(PollerMetadata.DEFAULT_POLLER)
@ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller(IntegrationProperties integrationProperties) {
IntegrationProperties.Poller poller = integrationProperties.getPoller();
int hasCron = poller.getCron() != null ? 1 : 0;
int hasFixedDelay = poller.getFixedDelay() != null ? 1 : 0;
int hasFixedRate = poller.getFixedRate() != null ? 1 : 0;
Assert.isTrue((hasCron + hasFixedDelay + hasFixedRate) <= 1,
"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties.");
PollerMetadata pollerMetadata = new PollerMetadata();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll);
map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout);
map.from(poller::getCron).whenHasText().as(CronTrigger::new).to(pollerMetadata::setTrigger);
map.from((poller.getFixedDelay() != null) ? poller.getFixedDelay() : poller.getFixedRate())
.as(Duration::toMillis).as(PeriodicTrigger::new).as((trigger) -> {
map.from(poller::getInitialDelay).as(Duration::toMillis).to(trigger::setInitialDelay);
trigger.setFixedRate(poller.getFixedRate() != null);
return trigger;
}).to(pollerMetadata::setTrigger);
return pollerMetadata;
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.integration;

import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -44,6 +45,8 @@ public class IntegrationProperties {

private final RSocket rsocket = new RSocket();

private final Poller poller = new Poller();

public Channel getChannel() {
return this.channel;
}
Expand All @@ -64,6 +67,10 @@ public RSocket getRsocket() {
return this.rsocket;
}

public Poller getPoller() {
return this.poller;
}

public static class Channel {

/**
Expand Down Expand Up @@ -295,4 +302,88 @@ public void setMessageMappingEnabled(boolean messageMappingEnabled) {

}

public static class Poller {

/**
* Maximum of messages to poll per polling cycle.
*/
private int maxMessagesPerPoll = Integer.MIN_VALUE; // PollerMetadata.MAX_MESSAGES_UNBOUNDED

/**
* How long to wait for messages on poll.
*/
private Duration receiveTimeout = Duration.ofSeconds(1); // PollerMetadata.DEFAULT_RECEIVE_TIMEOUT

/**
* Polling delay period. Mutually explusive with 'cron' and 'fixedRate'.
*/
private Duration fixedDelay;

/**
* Polling rate period. Mutually explusive with 'fixedDelay' and 'cron'.
*/
private Duration fixedRate;

/**
* Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for
* 'cron'.
*/
private Duration initialDelay;

/**
* Cron expression for polling. Mutually explusive with 'fixedDelay' and
* 'fixedRate'.
*/
private String cron;

public int getMaxMessagesPerPoll() {
return this.maxMessagesPerPoll;
}

public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}

public Duration getReceiveTimeout() {
return this.receiveTimeout;
}

public void setReceiveTimeout(Duration receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}

public Duration getFixedDelay() {
return this.fixedDelay;
}

public void setFixedDelay(Duration fixedDelay) {
this.fixedDelay = fixedDelay;
}

public Duration getFixedRate() {
return this.fixedRate;
}

public void setFixedRate(Duration fixedRate) {
this.fixedRate = fixedRate;
}

public Duration getInitialDelay() {
return this.initialDelay;
}

public void setInitialDelay(Duration initialDelay) {
this.initialDelay = initialDelay;
}

public String getCron() {
return this.cron;
}

public void setCron(String cron) {
this.cron = cron;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package org.springframework.boot.autoconfigure.integration;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.management.MBeanServer;
import javax.sql.DataSource;

Expand Down Expand Up @@ -47,6 +51,8 @@
import org.springframework.core.io.ResourceLoader;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.IntegrationManagementConfigurer;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessageSource;
Expand All @@ -57,13 +63,17 @@
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.channel.HeaderChannelRegistry;
import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jmx.export.MBeanExporter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -372,6 +382,54 @@ void whenTheUserDefinesTheirOwnDatabaseInitializerThenTheAutoConfiguredIntegrati
.hasBean("customInitializer"));
}

@Test
void defaultPoller() {
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> {
assertThat(context).hasSingleBean(PollerMetadata.class).getBean(PollerMetadata.DEFAULT_POLLER)
.hasFieldOrPropertyWithValue("maxMessagesPerPoll", (long) PollerMetadata.MAX_MESSAGES_UNBOUNDED)
.hasFieldOrPropertyWithValue("receiveTimeout", PollerMetadata.DEFAULT_RECEIVE_TIMEOUT)
.hasFieldOrPropertyWithValue("trigger", null);

GenericMessage<String> testMessage = new GenericMessage<>("test");
context.getBean("testChannel", QueueChannel.class).send(testMessage);
@SuppressWarnings("unchecked")
BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
});
}

@Test
void customPollerProperties() {
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class)
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
"spring.integration.poller.max-messages-per-poll=1",
"spring.integration.poller.receive-timeout=10s")
.run((context) -> {
assertThat(context).hasSingleBean(PollerMetadata.class)
.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class)
.hasFieldOrPropertyWithValue("maxMessagesPerPoll", 1L)
.hasFieldOrPropertyWithValue("receiveTimeout", 10000L)
.extracting(PollerMetadata::getTrigger).isInstanceOf(CronTrigger.class)
.hasFieldOrPropertyWithValue("expression", "* * * ? * *");

GenericMessage<String> testMessage = new GenericMessage<>("test");
context.getBean("testChannel", QueueChannel.class).send(testMessage);
@SuppressWarnings("unchecked")
BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
});
}

@Test
void triggerPropertiesAreMutuallyExclusive() {
this.contextRunner
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
"spring.integration.poller.fixed-delay=1s")
.run((context) -> assertThat(context).hasFailed().getFailure()
.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class).hasMessageContaining(
"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties."));
}

@Configuration(proxyBeanMethods = false)
static class CustomMBeanExporter {

Expand Down Expand Up @@ -459,4 +517,25 @@ IntegrationDataSourceInitializer customInitializer(DataSource dataSource, Resour

}

@Configuration(proxyBeanMethods = false)
static class PollingConsumerConfiguration {

@Bean
QueueChannel testChannel() {
return new QueueChannel();
}

@Bean
BlockingQueue<Message<?>> sink() {
return new LinkedBlockingQueue<>();
}

@ServiceActivator(inputChannel = "testChannel")
@Bean
MessageHandler handler(BlockingQueue<Message<?>> sink) {
return sink::add;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Spring Integration provides abstractions over messaging and also other transport
If Spring Integration is available on your classpath, it is initialized through the `@EnableIntegration` annotation.

Spring Integration polling logic relies <<features#features.task-execution-and-scheduling,on the auto-configured `TaskScheduler`>>.
The default `PollerMetadata` (poll unbounded number of messages every second) can be customized with `spring.integration.poller.*` configuration properties.

Spring Boot also configures some features that are triggered by the presence of additional Spring Integration modules.
If `spring-integration-jmx` is also on the classpath, message processing statistics are published over JMX.
Expand Down