Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

import org.springframework.integration.aop.AbstractMessageSourceAdvice;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

Expand All @@ -45,20 +43,22 @@ public class RotatingServerAdvice extends AbstractMessageSourceAdvice {
* Create an instance that rotates to the next server/directory if no message is
* received.
* @param factory the {@link DelegatingSessionFactory}.
* @param keyDirectories a list of {@link KeyDirectory}.
* @param keyDirectories a list of {@link RotationPolicy.KeyDirectory}.
*/
public RotatingServerAdvice(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories) {
public RotatingServerAdvice(DelegatingSessionFactory<?> factory, List<RotationPolicy.KeyDirectory> keyDirectories) {
this(factory, keyDirectories, false);
}

/**
* Create an instance that rotates to the next server/directory depending on the fair
* argument.
* @param factory the {@link DelegatingSessionFactory}.
* @param keyDirectories a list of {@link KeyDirectory}.
* @param keyDirectories a list of {@link RotationPolicy.KeyDirectory}.
* @param fair true to rotate on every poll, false to rotate when no message is received.
*/
public RotatingServerAdvice(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories, boolean fair) {
public RotatingServerAdvice(DelegatingSessionFactory<?> factory, List<RotationPolicy.KeyDirectory> keyDirectories,
boolean fair) {

this(new StandardRotationPolicy(factory, keyDirectories, fair));
}

Expand All @@ -84,29 +84,4 @@ public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
return result;
}

public static class StandardRotationPolicy extends AbstractStandardRotationPolicy {


public StandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
boolean fair) {
super(factory, keyDirectories, fair);
}

@Override
protected void onRotation(MessageSource<?> source) {
Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource
|| source instanceof AbstractRemoteFileStreamingMessageSource,
"source must be an AbstractInboundFileSynchronizingMessageSource or a "
+ "AbstractRemoteFileStreamingMessageSource");

if (source instanceof AbstractRemoteFileStreamingMessageSource) {
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(getCurrent().getDirectory());
}
else {
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
.setRemoteDirectory(getCurrent().getDirectory());
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
package org.springframework.integration.file.remote.aop;

import org.springframework.integration.core.MessageSource;
import org.springframework.util.Assert;

/**
* Implementations can reconfigure the message source before and/or after
* a poll.
* A strategy for rotating advices to allow reconfiguring
* the message source before and/or after a poll.
*
* @author Gary Russell
* @author Michael Forstner
* @author Artem Bilan
* @author David Turanski
*
* @since 5.0.7
* @since 5.2
*/
public interface RotationPolicy {

Expand All @@ -44,11 +45,42 @@ public interface RotationPolicy {
*/
void afterReceive(boolean messageReceived, MessageSource<?> source);


/**
*
* Return the current {@link KeyDirectory}.
* @return the current {@link KeyDirectory}
* @since 5.2
*/
KeyDirectory getCurrent();

/**
* A key for a thread-local store and its related directory pair.
*/
class KeyDirectory {

private final Object key;

private final String directory;

public KeyDirectory(Object key, String directory) {
Assert.notNull(key, "key cannot be null");
Assert.notNull(directory, "directory cannot be null");
this.key = key;
this.directory = directory;
}

public Object getKey() {
return this.key;
}

public String getDirectory() {
return this.directory;
}

@Override
public String toString() {
return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]";
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,14 +17,17 @@
package org.springframework.integration.file.remote.aop;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource;
import org.springframework.util.Assert;

/**
Expand All @@ -41,13 +44,13 @@
* @author Artem Bilan
* @author David Turanski
*
* @since 5.1.8
* @since 5.2
*/
public abstract class AbstractStandardRotationPolicy implements RotationPolicy {
public class StandardRotationPolicy implements RotationPolicy {

protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final

private final DelegatingSessionFactory<?> factory; // NOSONAR final
private final DelegatingSessionFactory<?> factory;

private final List<KeyDirectory> keyDirectories = new ArrayList<>();

Expand All @@ -59,12 +62,12 @@ public abstract class AbstractStandardRotationPolicy implements RotationPolicy {

private volatile boolean initialized;

protected AbstractStandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
public StandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
boolean fair) {

Assert.notNull(factory, "factory cannot be null");
Assert.notNull(keyDirectories, "keyDirectories cannot be null");
Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required");
Assert.isTrue(!keyDirectories.isEmpty(), "At least one KeyDirectory is required");
this.factory = factory;
this.keyDirectories.addAll(keyDirectories);
this.fair = fair;
Expand Down Expand Up @@ -107,7 +110,7 @@ protected DelegatingSessionFactory<?> getFactory() {
}

protected List<KeyDirectory> getKeyDirectories() {
return this.keyDirectories;
return Collections.unmodifiableList(this.keyDirectories);
}

protected boolean isFair() {
Expand All @@ -123,14 +126,26 @@ protected boolean isInitialized() {
}

protected void configureSource(MessageSource<?> source) {

if (!this.iterator.hasNext()) {
this.iterator = this.keyDirectories.iterator();
}
this.current = this.iterator.next();

onRotation(source);
}

protected abstract void onRotation(MessageSource<?> source);
protected void onRotation(MessageSource<?> source) {
Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource
|| source instanceof AbstractRemoteFileStreamingMessageSource,
"source must be an AbstractInboundFileSynchronizingMessageSource or a "
+ "AbstractRemoteFileStreamingMessageSource");

if (source instanceof AbstractRemoteFileStreamingMessageSource) {
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(getCurrent().getDirectory());
}
else {
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
.setRemoteDirectory(getCurrent().getDirectory());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.file.remote.aop.KeyDirectory;
import org.springframework.integration.file.remote.aop.RotatingServerAdvice;
import org.springframework.integration.file.remote.aop.RotationPolicy;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.DefaultSessionFactoryLocator;
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
Expand Down Expand Up @@ -227,17 +227,17 @@ public DelegatingSessionFactory<FTPFile> sf() {

@Bean
public RotatingServerAdvice advice() {
List<KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new KeyDirectory("one", "foo"));
keyDirectories.add(new KeyDirectory("one", "bar"));
keyDirectories.add(new KeyDirectory("two", "baz"));
keyDirectories.add(new KeyDirectory("two", "qux"));
keyDirectories.add(new KeyDirectory("three", "fiz"));
keyDirectories.add(new KeyDirectory("three", "buz"));
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return theAdvice(keyDirectories);
}

protected RotatingServerAdvice theAdvice(List<KeyDirectory> keyDirectories) {
protected RotatingServerAdvice theAdvice(List<RotationPolicy.KeyDirectory> keyDirectories) {
return new RotatingServerAdvice(sf(), keyDirectories);
}

Expand All @@ -262,7 +262,7 @@ protected File localDir() {
public static class FairConfig extends StandardConfig {

@Override
protected RotatingServerAdvice theAdvice(List<KeyDirectory> keyDirectories) {
protected RotatingServerAdvice theAdvice(List<RotationPolicy.KeyDirectory> keyDirectories) {
return new RotatingServerAdvice(sf(), keyDirectories, true);
}

Expand Down
18 changes: 9 additions & 9 deletions src/reference/asciidoc/ftp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -706,20 +706,20 @@ Notice that, in this example, the message handler downstream of the transformer
Starting with _version 5.0.7_, the `RotatingServerAdvice` is available; when configured as a poller advice, the inbound adapters can poll multiple servers and directories.
Configure the advice and add it to the poller's advice chain as normal.
A `DelegatingSessionFactory` is used to select the server see <<ftp-dsf>> for more information.
The advice configuration consists of a list of `RotatingServerAdvice.KeyDirectory` objects.
The advice configuration consists of a list of `RotationPolicy.KeyDirectory` objects.

.Example
[source, java]
----
@Bean
public RotatingServerAdvice advice() {
List<KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new KeyDirectory("one", "foo"));
keyDirectories.add(new KeyDirectory("one", "bar"));
keyDirectories.add(new KeyDirectory("two", "baz"));
keyDirectories.add(new KeyDirectory("two", "qux"));
keyDirectories.add(new KeyDirectory("three", "fiz"));
keyDirectories.add(new KeyDirectory("three", "buz"));
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
----
Expand All @@ -740,7 +740,7 @@ public RotatingServerAdvice advice() {

In this case, the advice will move to the next server/directory regardless of whether the previous poll returned a file.

Alternatively, you can provide your own `RotatingServerAdvice.RotationPolicy` to reconfigure the message source as needed:
Alternatively, you can provide your own `RotationPolicy` to reconfigure the message source as needed:

.policy
[source, java]
Expand Down
Loading