diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/KeyDirectory.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/KeyDirectory.java deleted file mode 100644 index 1ff79854840..00000000000 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/KeyDirectory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.file.remote.aop; - -import org.springframework.integration.file.remote.session.DelegatingSessionFactory; -import org.springframework.util.Assert; - -/** - * A {@link DelegatingSessionFactory} key/directory pair. - */ -public class KeyDirectory { - - private final Object key; - - private final String directory; - - public KeyDirectory(Object key, String directory) { - Assert.notNull(key, "key cannot be null"); - Assert.notNull(directory, "directory cannot be null"); - this.key = key; - this.directory = directory; - } - - public Object getKey() { - return this.key; - } - - public String getDirectory() { - return this.directory; - } - - @Override - public String toString() { - return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]"; - } - -} diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java index 4c520995e1a..1f637535e3d 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java @@ -20,9 +20,7 @@ import org.springframework.integration.aop.AbstractMessageSourceAdvice; import org.springframework.integration.core.MessageSource; -import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource; import org.springframework.integration.file.remote.session.DelegatingSessionFactory; -import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource; import org.springframework.messaging.Message; import org.springframework.util.Assert; @@ -45,9 +43,9 @@ public class RotatingServerAdvice extends AbstractMessageSourceAdvice { * Create an instance that rotates to the next server/directory if no message is * received. * @param factory the {@link DelegatingSessionFactory}. - * @param keyDirectories a list of {@link KeyDirectory}. + * @param keyDirectories a list of {@link RotationPolicy.KeyDirectory}. */ - public RotatingServerAdvice(DelegatingSessionFactory factory, List keyDirectories) { + public RotatingServerAdvice(DelegatingSessionFactory factory, List keyDirectories) { this(factory, keyDirectories, false); } @@ -55,10 +53,12 @@ public RotatingServerAdvice(DelegatingSessionFactory factory, List factory, List keyDirectories, boolean fair) { + public RotatingServerAdvice(DelegatingSessionFactory factory, List keyDirectories, + boolean fair) { + this(new StandardRotationPolicy(factory, keyDirectories, fair)); } @@ -84,29 +84,4 @@ public Message afterReceive(Message result, MessageSource source) { return result; } - public static class StandardRotationPolicy extends AbstractStandardRotationPolicy { - - - public StandardRotationPolicy(DelegatingSessionFactory factory, List keyDirectories, - boolean fair) { - super(factory, keyDirectories, fair); - } - - @Override - protected void onRotation(MessageSource source) { - Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource - || source instanceof AbstractRemoteFileStreamingMessageSource, - "source must be an AbstractInboundFileSynchronizingMessageSource or a " - + "AbstractRemoteFileStreamingMessageSource"); - - if (source instanceof AbstractRemoteFileStreamingMessageSource) { - ((AbstractRemoteFileStreamingMessageSource) source).setRemoteDirectory(getCurrent().getDirectory()); - } - else { - ((AbstractInboundFileSynchronizingMessageSource) source).getSynchronizer() - .setRemoteDirectory(getCurrent().getDirectory()); - } - } - - } } diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java index ab21233ce6c..82a715039ea 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java @@ -17,17 +17,18 @@ package org.springframework.integration.file.remote.aop; import org.springframework.integration.core.MessageSource; +import org.springframework.util.Assert; /** - * Implementations can reconfigure the message source before and/or after - * a poll. + * A strategy for rotating advices to allow reconfiguring + * the message source before and/or after a poll. * * @author Gary Russell * @author Michael Forstner * @author Artem Bilan * @author David Turanski * - * @since 5.0.7 + * @since 5.2 */ public interface RotationPolicy { @@ -44,11 +45,42 @@ public interface RotationPolicy { */ void afterReceive(boolean messageReceived, MessageSource source); - /** - * + * Return the current {@link KeyDirectory}. * @return the current {@link KeyDirectory} + * @since 5.2 */ KeyDirectory getCurrent(); + /** + * A key for a thread-local store and its related directory pair. + */ + class KeyDirectory { + + private final Object key; + + private final String directory; + + public KeyDirectory(Object key, String directory) { + Assert.notNull(key, "key cannot be null"); + Assert.notNull(directory, "directory cannot be null"); + this.key = key; + this.directory = directory; + } + + public Object getKey() { + return this.key; + } + + public String getDirectory() { + return this.directory; + } + + @Override + public String toString() { + return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]"; + } + + } + } diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java similarity index 72% rename from spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java rename to spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java index 99ae79ebea3..bcaf674956c 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2018-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.integration.file.remote.aop; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -24,7 +25,9 @@ import org.apache.commons.logging.LogFactory; import org.springframework.integration.core.MessageSource; +import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource; import org.springframework.integration.file.remote.session.DelegatingSessionFactory; +import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource; import org.springframework.util.Assert; /** @@ -41,13 +44,13 @@ * @author Artem Bilan * @author David Turanski * - * @since 5.1.8 + * @since 5.2 */ -public abstract class AbstractStandardRotationPolicy implements RotationPolicy { +public class StandardRotationPolicy implements RotationPolicy { protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final - private final DelegatingSessionFactory factory; // NOSONAR final + private final DelegatingSessionFactory factory; private final List keyDirectories = new ArrayList<>(); @@ -59,12 +62,12 @@ public abstract class AbstractStandardRotationPolicy implements RotationPolicy { private volatile boolean initialized; - protected AbstractStandardRotationPolicy(DelegatingSessionFactory factory, List keyDirectories, + public StandardRotationPolicy(DelegatingSessionFactory factory, List keyDirectories, boolean fair) { Assert.notNull(factory, "factory cannot be null"); Assert.notNull(keyDirectories, "keyDirectories cannot be null"); - Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required"); + Assert.isTrue(!keyDirectories.isEmpty(), "At least one KeyDirectory is required"); this.factory = factory; this.keyDirectories.addAll(keyDirectories); this.fair = fair; @@ -107,7 +110,7 @@ protected DelegatingSessionFactory getFactory() { } protected List getKeyDirectories() { - return this.keyDirectories; + return Collections.unmodifiableList(this.keyDirectories); } protected boolean isFair() { @@ -123,14 +126,26 @@ protected boolean isInitialized() { } protected void configureSource(MessageSource source) { - if (!this.iterator.hasNext()) { this.iterator = this.keyDirectories.iterator(); } this.current = this.iterator.next(); - onRotation(source); } - protected abstract void onRotation(MessageSource source); + protected void onRotation(MessageSource source) { + Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource + || source instanceof AbstractRemoteFileStreamingMessageSource, + "source must be an AbstractInboundFileSynchronizingMessageSource or a " + + "AbstractRemoteFileStreamingMessageSource"); + + if (source instanceof AbstractRemoteFileStreamingMessageSource) { + ((AbstractRemoteFileStreamingMessageSource) source).setRemoteDirectory(getCurrent().getDirectory()); + } + else { + ((AbstractInboundFileSynchronizingMessageSource) source).getSynchronizer() + .setRemoteDirectory(getCurrent().getDirectory()); + } + } + } diff --git a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java index e75ab4a28fa..6a523c2aa12 100644 --- a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java +++ b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java @@ -43,8 +43,8 @@ import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.dsl.StandardIntegrationFlow; -import org.springframework.integration.file.remote.aop.KeyDirectory; import org.springframework.integration.file.remote.aop.RotatingServerAdvice; +import org.springframework.integration.file.remote.aop.RotationPolicy; import org.springframework.integration.file.remote.session.CachingSessionFactory; import org.springframework.integration.file.remote.session.DefaultSessionFactoryLocator; import org.springframework.integration.file.remote.session.DelegatingSessionFactory; @@ -227,17 +227,17 @@ public DelegatingSessionFactory sf() { @Bean public RotatingServerAdvice advice() { - List keyDirectories = new ArrayList<>(); - keyDirectories.add(new KeyDirectory("one", "foo")); - keyDirectories.add(new KeyDirectory("one", "bar")); - keyDirectories.add(new KeyDirectory("two", "baz")); - keyDirectories.add(new KeyDirectory("two", "qux")); - keyDirectories.add(new KeyDirectory("three", "fiz")); - keyDirectories.add(new KeyDirectory("three", "buz")); + List keyDirectories = new ArrayList<>(); + keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo")); + keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar")); + keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz")); + keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux")); + keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz")); + keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz")); return theAdvice(keyDirectories); } - protected RotatingServerAdvice theAdvice(List keyDirectories) { + protected RotatingServerAdvice theAdvice(List keyDirectories) { return new RotatingServerAdvice(sf(), keyDirectories); } @@ -262,7 +262,7 @@ protected File localDir() { public static class FairConfig extends StandardConfig { @Override - protected RotatingServerAdvice theAdvice(List keyDirectories) { + protected RotatingServerAdvice theAdvice(List keyDirectories) { return new RotatingServerAdvice(sf(), keyDirectories, true); } diff --git a/src/reference/asciidoc/ftp.adoc b/src/reference/asciidoc/ftp.adoc index 55a10934577..531fb066244 100644 --- a/src/reference/asciidoc/ftp.adoc +++ b/src/reference/asciidoc/ftp.adoc @@ -706,20 +706,20 @@ Notice that, in this example, the message handler downstream of the transformer Starting with _version 5.0.7_, the `RotatingServerAdvice` is available; when configured as a poller advice, the inbound adapters can poll multiple servers and directories. Configure the advice and add it to the poller's advice chain as normal. A `DelegatingSessionFactory` is used to select the server see <> for more information. -The advice configuration consists of a list of `RotatingServerAdvice.KeyDirectory` objects. +The advice configuration consists of a list of `RotationPolicy.KeyDirectory` objects. .Example [source, java] ---- @Bean public RotatingServerAdvice advice() { - List keyDirectories = new ArrayList<>(); - keyDirectories.add(new KeyDirectory("one", "foo")); - keyDirectories.add(new KeyDirectory("one", "bar")); - keyDirectories.add(new KeyDirectory("two", "baz")); - keyDirectories.add(new KeyDirectory("two", "qux")); - keyDirectories.add(new KeyDirectory("three", "fiz")); - keyDirectories.add(new KeyDirectory("three", "buz")); + List keyDirectories = new ArrayList<>(); + keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo")); + keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar")); + keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz")); + keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux")); + keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz")); + keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz")); return new RotatingServerAdvice(delegatingSf(), keyDirectories); } ---- @@ -740,7 +740,7 @@ public RotatingServerAdvice advice() { In this case, the advice will move to the next server/directory regardless of whether the previous poll returned a file. -Alternatively, you can provide your own `RotatingServerAdvice.RotationPolicy` to reconfigure the message source as needed: +Alternatively, you can provide your own `RotationPolicy` to reconfigure the message source as needed: .policy [source, java] diff --git a/src/reference/asciidoc/sftp.adoc b/src/reference/asciidoc/sftp.adoc index f6d0145dd6d..e33b9fbe38d 100644 --- a/src/reference/asciidoc/sftp.adoc +++ b/src/reference/asciidoc/sftp.adoc @@ -705,20 +705,20 @@ Notice that, in this example, the message handler downstream of the transformer Starting with _version 5.0.7_, the `RotatingServerAdvice` is available; when configured as a poller advice, the inbound adapters can poll multiple servers and directories. Configure the advice and add it to the poller's advice chain as normal. A `DelegatingSessionFactory` is used to select the server see <<./ftp.adoc#ftp-dsf,Delegating Session Factory>> for more information. -The advice configuration consists of a list of `RotatingServerAdvice.KeyDirectory` objects. +The advice configuration consists of a list of `RotationPolicy.KeyDirectory` objects. .Example [source, java] ---- @Bean public RotatingServerAdvice advice() { - List keyDirectories = new ArrayList<>(); - keyDirectories.add(new KeyDirectory("one", "foo")); - keyDirectories.add(new KeyDirectory("one", "bar")); - keyDirectories.add(new KeyDirectory("two", "baz")); - keyDirectories.add(new KeyDirectory("two", "qux")); - keyDirectories.add(new KeyDirectory("three", "fiz")); - keyDirectories.add(new KeyDirectory("three", "buz")); + List keyDirectories = new ArrayList<>(); + keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo")); + keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar")); + keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz")); + keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux")); + keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz")); + keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz")); return new RotatingServerAdvice(delegatingSf(), keyDirectories); } ---- @@ -739,7 +739,7 @@ public RotatingServerAdvice advice() { In this case, the advice will move to the next server/directory regardless of whether the previous poll returned a file. -Alternatively, you can provide your own `RotatingServerAdvice.RotationPolicy` to reconfigure the message source as needed: +Alternatively, you can provide your own `RotationPolicy` to reconfigure the message source as needed: .policy [source, java] diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 8d592a8d7f5..ac253098198 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -55,6 +55,13 @@ See <<./aggregator.adoc#flux-aggregator,Flux Aggregator>> for more information. The FTP and SFTP modules now provide an event listener for certain Apache Mina FTP/SFTP server events. See <<./ftp.adoc#ftp-server-events, Apache Mina FTP Server Events>> and <<./sftp.adoc#sftp-server-events, Apache Mina SFTP Server Events>> for more information. +[[x5.2-avro]] +==== Avro Transformers + +Simple Apache Avro transformers are now provided. +See <<./transformers.adoc#avro-transformers, Avro Transformers>> for more information. + + [[x5.2-general]] === General Changes @@ -131,14 +138,14 @@ See <<./webflux.adoc#webflux,WebFlux Support>> for more information. The `MongoDbMessageStore` can now be configured with custom converters. See <<./mongodb.adoc#mongodb, MongoDB Support>> for more information. -[[x5.2-avro]] -==== Avro Transformers - -Simple Apache Avro transformers are now provided. -See <<./transformers.adoc#avro-transformers, Avro Transformers>> for more information. - [[x5.2-routers]] ==== Router Changes You can now disable falling back to the channel key as the channel bean name. See <<./router.adoc#dynamic-routers, Dynamic Routers>> for more information. + +[[x5.2--ftp-sftp]] +==== FTP/SFTP Changes + +The `RotatingServerAdvice` is decoupled now from the `RotationPolicy` and its `StandardRotationPolicy`. +See <<./ftp.adoc#ftp-rotating-server-advice, Polling Multiple Servers and Directories>> for more information.