Skip to content

Commit

Permalink
Fix maxMessagesPerPoll for SourcePollingChAdapter (#8747)
Browse files Browse the repository at this point in the history
* Fix maxMessagesPerPoll for SourcePollingChAdapter

The `AbstractMethodAnnotationPostProcessor` does not check
for `PollerMetadata.MAX_MESSAGES_UNBOUNDED` before setting
`maxMessagesPerPoll` into a `SourcePollingChannelAdapter`
which in this case must be `1`

Also fix `SourcePollingChannelAdapterFactoryBean` to not mutate
the provided `PollerMetadata` (which might be global default)
with a new `maxMessagesPerPoll`

**Cherry-pick to `6.1.x` & `6.0.x`**

* * Fix `this.` prefix in `SourcePollingChannelAdapterFactoryBean`
  • Loading branch information
artembilan committed Oct 4, 2023
1 parent 261589a commit ad01c44
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,14 @@ protected void configurePollingEndpoint(AbstractPollingEndpoint pollingEndpoint,
pollingEndpoint.setTaskExecutor(pollerMetadata.getTaskExecutor());
pollingEndpoint.setTrigger(pollerMetadata.getTrigger());
pollingEndpoint.setAdviceChain(pollerMetadata.getAdviceChain());
pollingEndpoint.setMaxMessagesPerPoll(pollerMetadata.getMaxMessagesPerPoll());
long maxMessagesPerPoll = pollerMetadata.getMaxMessagesPerPoll();
if (maxMessagesPerPoll == PollerMetadata.MAX_MESSAGES_UNBOUNDED &&
pollingEndpoint instanceof SourcePollingChannelAdapter) {
// the default is 1 since a source might return
// a non-null and non-interruptible value every time it is invoked
maxMessagesPerPoll = 1;
}
pollingEndpoint.setMaxMessagesPerPoll(maxMessagesPerPoll);
pollingEndpoint.setErrorHandler(pollerMetadata.getErrorHandler());
if (pollingEndpoint instanceof PollingConsumer) {
((PollingConsumer) pollingEndpoint).setReceiveTimeout(pollerMetadata.getReceiveTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,13 @@ private void initializeAdapter() {
Assert.notNull(this.pollerMetadata, () -> "No poller has been defined for channel-adapter '"
+ this.beanName + "', and no default poller is available within the context.");
}
if (this.pollerMetadata.getMaxMessagesPerPoll() == Integer.MIN_VALUE) {
long maxMessagesPerPoll = this.pollerMetadata.getMaxMessagesPerPoll();
if (maxMessagesPerPoll == PollerMetadata.MAX_MESSAGES_UNBOUNDED) {
// the default is 1 since a source might return
// a non-null and non-interruptible value every time it is invoked
this.pollerMetadata.setMaxMessagesPerPoll(1);
maxMessagesPerPoll = 1;
}
spca.setMaxMessagesPerPoll(this.pollerMetadata.getMaxMessagesPerPoll());
spca.setMaxMessagesPerPoll(maxMessagesPerPoll);
if (this.sendTimeout != null) {
spca.setSendTimeout(this.sendTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.springframework.integration.endpoint.MethodInvokingMessageSource;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.expression.SpelPropertyAccessorRegistrar;
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.integration.handler.ServiceActivatingHandler;
Expand Down Expand Up @@ -417,12 +418,15 @@ public void testAnnotatedServiceActivator() throws Exception {

assertThat(this.counterChannel.receive(10)).isNull();

SmartLifecycle countSA = this.context.getBean("annotationTestService.count.inboundChannelAdapter",
SmartLifecycle.class);
SourcePollingChannelAdapter countSA =
this.context.getBean("annotationTestService.count.inboundChannelAdapter",
SourcePollingChannelAdapter.class);
assertThat(countSA.isAutoStartup()).isFalse();
assertThat(countSA.getPhase()).isEqualTo(23);
countSA.start();

assertThat(countSA.getMaxMessagesPerPoll()).isEqualTo(1);

for (int i = 0; i < 10; i++) {
Message<?> message = this.counterChannel.receive(10_000);
assertThat(message).isNotNull();
Expand Down

0 comments on commit ad01c44

Please sign in to comment.