Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.w3c.dom.Element;

import org.springframework.beans.BeanMetadataElement;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.ExpressionFactoryBean;
Expand All @@ -44,7 +45,9 @@ protected final BeanMetadataElement parseSource(Element element, ParserContext p
synchronizerBuilder.addConstructorArgReference(element.getAttribute("session-factory"));

// configure the InboundFileSynchronizer properties
IntegrationNamespaceUtils.setValueIfAttributeDefined(synchronizerBuilder, element, "remote-directory");
BeanDefinition expressionDef = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression(
"remote-directory", "remote-directory-expression", parserContext, element, true);
synchronizerBuilder.addPropertyValue("remoteDirectoryExpression", expressionDef);
IntegrationNamespaceUtils.setValueIfAttributeDefined(synchronizerBuilder, element, "delete-remote-files");
IntegrationNamespaceUtils.setValueIfAttributeDefined(synchronizerBuilder, element, "preserve-timestamp");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import java.util.Arrays;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.file.filters.FileListFilter;
import org.springframework.integration.file.filters.ReversibleFileListFilter;
Expand All @@ -42,9 +46,6 @@
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* Base class charged with knowing how to connect to a remote file system,
* scan it for new files and then download the files.
Expand Down Expand Up @@ -81,7 +82,7 @@ public abstract class AbstractInboundFileSynchronizer<F>
/**
* the path on the remote mount as a String.
*/
private volatile String remoteDirectory;
private volatile Expression remoteDirectoryExpression;

/**
* An {@link FileListFilter} that runs against the <em>remote</em> file system view.
Expand Down Expand Up @@ -145,7 +146,18 @@ public void setTemporaryFileSuffix(String temporaryFileSuffix) {
* @param remoteDirectory The remote directory.
*/
public void setRemoteDirectory(String remoteDirectory) {
this.remoteDirectory = remoteDirectory;
this.remoteDirectoryExpression = new LiteralExpression(remoteDirectory);
}

/**
* Specify an expression that evaluates to the full path to the remote directory.
*
* @param remoteDirectoryExpression The remote directory expression.
* @since 4.2
*/
public void setRemoteDirectoryExpression(Expression remoteDirectoryExpression) {
Assert.notNull(remoteDirectoryExpression, "'remoteDirectoryExpression' must not be null");
this.remoteDirectoryExpression = remoteDirectoryExpression;
}

/**
Expand Down Expand Up @@ -184,7 +196,7 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {

@Override
public final void afterPropertiesSet() {
Assert.notNull(this.remoteDirectory, "remoteDirectory must not be null");
Assert.state(this.remoteDirectoryExpression != null, "'remoteDirectoryExpression' must not be null");
if (this.evaluationContext == null) {
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
}
Expand Down Expand Up @@ -212,14 +224,15 @@ public void synchronizeToLocalDirectory(final File localDirectory) {

@Override
public Integer doInSession(Session<F> session) throws IOException {
F[] files = session.list(AbstractInboundFileSynchronizer.this.remoteDirectory);
String remoteDirectory = remoteDirectoryExpression.getValue(evaluationContext, String.class);
F[] files = session.list(remoteDirectory);
if (!ObjectUtils.isEmpty(files)) {
List<F> filteredFiles = AbstractInboundFileSynchronizer.this.filterFiles(files);
List<F> filteredFiles = filterFiles(files);
for (F file : filteredFiles) {
try {
if (file != null) {
AbstractInboundFileSynchronizer.this.copyFileToLocalDirectory(
AbstractInboundFileSynchronizer.this.remoteDirectory, file, localDirectory,
copyFileToLocalDirectory(
remoteDirectory, file, localDirectory,
session);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,9 +731,9 @@ Only files matching this regular expression will be picked up by this adapter.
type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Allows you to provide a SpEL expression which
will compute the directory
path where the files will be transferred to
Specify a SpEL expression which
will be used to evaluate the directory
path to where the files will be transferred
(e.g., "headers.['remote_dir'] +
'/myTransfers'");
</xsd:documentation>
Expand All @@ -743,9 +743,9 @@ Only files matching this regular expression will be picked up by this adapter.
type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Allows you to provide a SpEL expression which
will compute the temporary directory
path where files will be transferred to before they are moved to the remote-directory
Specify a SpEL expression which
will be used to evaluate the temporary directory
path to where files will be transferred before they are moved to the remote-directory
(e.g., "headers.['remote_dir'] +
'/temp/myTransfers'");
</xsd:documentation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="remote-directory-expression"
type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Specify a SpEL expression which
will be used to evaluate the directory
path from where the files will be transferred
(e.g., "@someBean.fetchDirectory");
Mutually exclusive with 'remote-directory'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down Expand Up @@ -500,11 +512,11 @@
<xsd:complexType name="base-ftp-adapter-type">
<xsd:complexContent>
<xsd:extension base="base-adapter-type">
<xsd:attribute name="remote-directory" type="xsd:string"
use="optional">
<xsd:attribute name="remote-directory" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
Identifies the remote directory path (e.g., "/remote/mytransfers")
Mutually exclusive with 'remote-directory-expression'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
comparator="comparator"
temporary-file-suffix=".foo"
local-filter="acceptAllFilter"
remote-directory="foo/bar">
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.file.filters.FileListFilter;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
Expand Down Expand Up @@ -96,6 +97,8 @@ public void testFtpInboundChannelAdapterComplete() throws Exception{

FtpInboundFileSynchronizer fisync =
(FtpInboundFileSynchronizer) TestUtils.getPropertyValue(inbound, "synchronizer");
assertEquals("'foo/bar'", TestUtils.getPropertyValue(fisync, "remoteDirectoryExpression", Expression.class)
.getExpressionString());
assertNotNull(TestUtils.getPropertyValue(fisync, "localFilenameGeneratorExpression"));
assertTrue(TestUtils.getPropertyValue(fisync, "preserveTimestamp", Boolean.class));
assertEquals(".foo", TestUtils.getPropertyValue(fisync, "temporaryFileSuffix", String.class));
Expand Down Expand Up @@ -131,6 +134,8 @@ public void cachingSessionFactory() throws Exception{
String remoteFileSeparator = (String) TestUtils.getPropertyValue(fisync, "remoteFileSeparator");
assertNotNull(remoteFileSeparator);
assertEquals("/", remoteFileSeparator);
assertEquals("foo/bar", TestUtils.getPropertyValue(fisync, "remoteDirectoryExpression", Expression.class)
.getExpressionString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="remote-directory-expression"
type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Specify a SpEL expression which
will be used to evaluate the directory
path from where the files will be transferred
(e.g., "@someBean.fetchDirectory").
Mutually exclusive with 'remote-directory'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down Expand Up @@ -501,12 +513,12 @@
<xsd:complexType name="base-sftp-adapter-type">
<xsd:complexContent>
<xsd:extension base="base-adapter-type">
<xsd:attribute name="remote-directory" type="xsd:string"
use="optional">
<xsd:attribute name="remote-directory" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
Identifies the directory path (e.g.,
"/temp/mytransfers")
Mutually exclusive with 'remote-directory-expression'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-regex="f[o]+\.txt"
remote-directory="/foo"
remote-directory-expression="'/foo'"
local-directory="file:local-test-dir"
auto-create-local-directory="false"
remote-file-separator="."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.BeanDefinitionStoreException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.expression.Expression;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.file.filters.FileListFilter;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;

/**
* @author Oleg Zhurakousky
Expand All @@ -57,16 +59,17 @@ public void prepare(){

@Test
public void testAutoStartup() throws Exception{
ApplicationContext context =
ConfigurableApplicationContext context =
new ClassPathXmlApplicationContext("SftpInboundAutostartup-context.xml", this.getClass());

SourcePollingChannelAdapter adapter = context.getBean("sftpAutoStartup", SourcePollingChannelAdapter.class);
assertFalse(adapter.isRunning());
context.close();
}

@Test
public void testWithLocalFiles() throws Exception{
ApplicationContext context =
ConfigurableApplicationContext context =
new ClassPathXmlApplicationContext("InboundChannelAdapterParserTests-context.xml", this.getClass());
assertTrue(new File("src/main/resources").exists());

Expand All @@ -81,6 +84,8 @@ public void testWithLocalFiles() throws Exception{

assertNotNull(comparator);
SftpInboundFileSynchronizer synchronizer = (SftpInboundFileSynchronizer) TestUtils.getPropertyValue(source, "synchronizer");
assertEquals("'/foo'", TestUtils.getPropertyValue(synchronizer, "remoteDirectoryExpression", Expression.class)
.getExpressionString());
assertNotNull(TestUtils.getPropertyValue(synchronizer, "localFilenameGeneratorExpression"));
assertTrue(TestUtils.getPropertyValue(synchronizer, "preserveTimestamp", Boolean.class));
String remoteFileSeparator = (String) TestUtils.getPropertyValue(synchronizer, "remoteFileSeparator");
Expand All @@ -91,42 +96,53 @@ public void testWithLocalFiles() throws Exception{
assertNotNull(requestChannel.receive(2000));
FileListFilter<?> acceptAllFilter = context.getBean("acceptAllFilter", FileListFilter.class);
assertTrue(TestUtils.getPropertyValue(source, "fileSource.scanner.filter.fileFilters", Collection.class).contains(acceptAllFilter));
context.close();
}

@Test
public void testAutoChannel() {
ApplicationContext context =
ConfigurableApplicationContext context =
new ClassPathXmlApplicationContext("InboundChannelAdapterParserTests-context.xml", this.getClass());
// Auto-created channel
MessageChannel autoChannel = context.getBean("autoChannel", MessageChannel.class);
SourcePollingChannelAdapter autoChannelAdapter = context.getBean("autoChannel.adapter", SourcePollingChannelAdapter.class);
SourcePollingChannelAdapter autoChannelAdapter = context.getBean("autoChannel.adapter",
SourcePollingChannelAdapter.class);
assertEquals("/foo", TestUtils
.getPropertyValue(autoChannelAdapter, "source.synchronizer.remoteDirectoryExpression", Expression.class)
.getExpressionString());
assertSame(autoChannel, TestUtils.getPropertyValue(autoChannelAdapter, "outputChannel"));
context.close();
}

@Test(expected=BeanDefinitionStoreException.class)
//exactly one of 'filename-pattern' or 'filter' is allowed on SFTP inbound adapter
public void testFailWithFilePatternAndFilter() throws Exception{
assertTrue(!new File("target/bar").exists());
new ClassPathXmlApplicationContext("InboundChannelAdapterParserTests-context-fail.xml", this.getClass());
new ClassPathXmlApplicationContext("InboundChannelAdapterParserTests-context-fail.xml", this.getClass()).close();
}

@Test @Ignore
public void testLocalFilesAreFound() throws Exception{
assertTrue(new File("target").exists());
new ClassPathXmlApplicationContext("InboundChannelAdapterParserTests-context.xml", this.getClass());
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(
"InboundChannelAdapterParserTests-context.xml", this.getClass());
assertTrue(new File("target").exists());
context.close();
}

@Test
public void testLocalDirAutoCreated() throws Exception{
assertFalse(new File("foo").exists());
new ClassPathXmlApplicationContext("InboundChannelAdapterParserTests-context.xml", this.getClass());
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(
"InboundChannelAdapterParserTests-context.xml", this.getClass());
assertTrue(new File("foo").exists());
context.close();
}

@Test(expected=BeanCreationException.class)
public void testLocalDirAutoCreateFailed() throws Exception{
new ClassPathXmlApplicationContext("InboundChannelAdapterParserTests-context-fail-autocreate.xml", this.getClass());
new ClassPathXmlApplicationContext("InboundChannelAdapterParserTests-context-fail-autocreate.xml",
this.getClass()).close();
}

@After
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/ftp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ So, the root object of the SpEL Evaluation Context is the original name of the r

Starting with _Spring Integration 3.0_, you can specify the `preserve-timestamp` attribute (default `false`); when `true`, the local file's modified timestamp will be set to the value retrieved from the server; otherwise it will be set to the current time.

Starting with _version 4.2_, you can specify `remote-directory-expression` instead of `remote-directory`, allowing
you to dynamically determine the directory on each poll.
e.g `remote-directory-expression="@myBean.determineRemoteDir()"`.

Sometimes file filtering based on the simple pattern specified via `filename-pattern` attribute might not be sufficient.
If this is the case, you can use the `filename-regex` attribute to specify a Regular Expression (e.g.
`filename-regex=".*\.test$"`).
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/sftp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ So, the root object of the SpEL Evaluation Context is the original name of the r

Starting with _Spring Integration 3.0_, you can specify the `preserve-timestamp` attribute (default `false`); when `true`, the local file's modified timestamp will be set to the value retrieved from the server; otherwise it will be set to the current time.

Starting with _version 4.2_, you can specify `remote-directory-expression` instead of `remote-directory`, allowing
you to dynamically determine the directory on each poll.
e.g `remote-directory-expression="@myBean.determineRemoteDir()"`.

Sometimes file filtering based on the simple pattern specified via `filename-pattern` attribute might not be sufficient.
If this is the case, you can use the `filename-regex` attribute to specify a Regular Expression (e.g.
`filename-regex=".*\.test$"`).
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,9 @@ In addition, a `SimpleMessageGroupProcessor` is provided that simply returns the
When an output processor produces a collection of `Message<?>`, the aggregator releases those messages individually.
Configuring the `SimpleMessageGroupProcessor` makes the aggregator a message barrier, were messages are held up
until they all arrive, and are then released individually. See <<aggregator>> for more information.

==== (S)FTP Changes

You can now specify a `remote-directory-expression` on the inbound channel adapters, to determine the directory
at runtime.
See <<ftp>> and <<sftp>> for more information.