Skip to content

Commit

Permalink
GH-8967: maxFetchSize = 1 from the StandardRotationPolicy
Browse files Browse the repository at this point in the history
Fixes: #8967

In the `fair` mode the `StandardRotationPolicy` re-configures an `AbstractFetchLimitingMessageSource`
for a new directory (and possible new `ConnectionFactory`) in the `beforeReceive()`.
However, with default `maxFetchSize` (or bigger than `1`), the `receive()`` would poll `toBeReceived` internal queue
 for files cached from the previous polling cycle.
Since we rotate the source immediately to a new set of options, all those cached files don't make sense
or even can cause the problem on fetching their content in case of `AbstractRemoteFileStreamingMessageSource`
when we rotate to a new `ConnectionFactory`.

* Call `fetchLimitingMessageSource.setMaxFetchSize(1);` in the `StandardRotationPolicy.beforeReceive()`
when `fair && !this.initialized`

(cherry picked from commit c78630e)
  • Loading branch information
artembilan committed Feb 26, 2024
1 parent 768915b commit 2d04847
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.core.log.LogMessage;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource;
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource;
Expand All @@ -33,9 +35,9 @@
/**
* Standard rotation policy; iterates over key/directory pairs; when the end is reached,
* starts again at the beginning. If the fair option is true the rotation occurs on every
* poll, regardless of result. Otherwise rotation occurs when the current pair returns no
* poll, regardless of result. Otherwise, rotation occurs when the current pair returns no
* message.
*
* <p>
* Subclasses implement {@code onRotation(MessageSource<?> source)} to configure the
* {@link MessageSource} on each rotation.
*
Expand Down Expand Up @@ -78,6 +80,12 @@ public StandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirec
public void beforeReceive(MessageSource<?> source) {
if (this.fair || !this.initialized) {
configureSource(source);
if (this.fair && !this.initialized
&& source instanceof AbstractFetchLimitingMessageSource<?> fetchLimitingMessageSource) {

this.logger.info(LogMessage.format("Enforce 'maxFetchSize = 1' for '%s' in the 'fair' mode", source));
fetchLimitingMessageSource.setMaxFetchSize(1);
}
this.initialized = true;
}
if (this.logger.isTraceEnabled()) {
Expand Down Expand Up @@ -142,11 +150,11 @@ protected void configureSource(MessageSource<?> source) {
* @param source the MessageSource.
*/
protected void onRotation(MessageSource<?> source) {
if (source instanceof AbstractRemoteFileStreamingMessageSource) {
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(this.current.getDirectory());
if (source instanceof AbstractRemoteFileStreamingMessageSource<?> streamingMessageSource) {
streamingMessageSource.setRemoteDirectory(this.current.getDirectory());
}
else if (source instanceof AbstractInboundFileSynchronizingMessageSource) {
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
else if (source instanceof AbstractInboundFileSynchronizingMessageSource<?> synchronizingMessageSource) {
synchronizingMessageSource.getSynchronizer()
.setRemoteDirectory(this.current.getDirectory());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -405,8 +405,7 @@ public RotatingServerAdvice advice() {
public IntegrationFlow flow() {
return IntegrationFlow.from(Ftp.inboundStreamingAdapter(new FtpRemoteFileTemplate(sf()))
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.remoteDirectory(".")
.maxFetchSize(1),
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
Expand Down

0 comments on commit 2d04847

Please sign in to comment.