Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -102,11 +102,6 @@ public abstract class AbstractInboundFileSynchronizer<F>
*/
private Expression remoteDirectoryExpression;

/**
* The current evaluation of the expression.
*/
private String evaluatedRemoteDirectory;

/**
* An {@link FileListFilter} that runs against the <em>remote</em> file system view.
*/
Expand Down Expand Up @@ -203,7 +198,6 @@ public void setTemporaryFileSuffix(String temporaryFileSuffix) {
*/
public void setRemoteDirectory(String remoteDirectory) {
this.remoteDirectoryExpression = new LiteralExpression(remoteDirectory);
evaluateRemoteDirectory();
}

/**
Expand All @@ -229,7 +223,6 @@ public void setRemoteDirectoryExpressionString(String remoteDirectoryExpression)
protected final void doSetRemoteDirectoryExpression(Expression expression) {
Assert.notNull(expression, "'remoteDirectoryExpression' must not be null");
this.remoteDirectoryExpression = expression;
evaluateRemoteDirectory();
}

/**
Expand Down Expand Up @@ -298,7 +291,6 @@ public final void afterPropertiesSet() {
if (this.evaluationContext == null) {
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
}
evaluateRemoteDirectory();
if (!StringUtils.hasText(this.metadataStorePrefix)) {
this.metadataStorePrefix = this.name;
}
Expand Down Expand Up @@ -341,26 +333,27 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max
}
return;
}
String remoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext, String.class);
if (this.logger.isTraceEnabled()) {
this.logger.trace("Synchronizing " + this.evaluatedRemoteDirectory + " to " + localDirectory);
this.logger.trace("Synchronizing " + remoteDirectory + " to " + localDirectory);
}
try {
int transferred = this.remoteFileTemplate.execute(session ->
transferFilesFromRemoteToLocal(localDirectory, maxFetchSize, session));
transferFilesFromRemoteToLocal(remoteDirectory, localDirectory, maxFetchSize, session));
if (this.logger.isDebugEnabled()) {
this.logger.debug(transferred + " files transferred from '" + this.evaluatedRemoteDirectory + "'");
this.logger.debug(transferred + " files transferred from '" + remoteDirectory + "'");
}
}
catch (Exception e) {
throw new MessagingException("Problem occurred while synchronizing '"
+ this.evaluatedRemoteDirectory + "' to local directory", e);
+ remoteDirectory + "' to local directory", e);
}
}

private Integer transferFilesFromRemoteToLocal(File localDirectory, int maxFetchSize, Session<F> session)
throws IOException {
private Integer transferFilesFromRemoteToLocal(String remoteDirectory, File localDirectory,
int maxFetchSize, Session<F> session) throws IOException {

F[] files = session.list(this.evaluatedRemoteDirectory);
F[] files = session.list(remoteDirectory);
if (!ObjectUtils.isEmpty(files)) {
files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e), this.comparator);
}
Expand All @@ -372,6 +365,12 @@ private Integer transferFilesFromRemoteToLocal(File localDirectory, int maxFetch
int copied = filteredFiles.size();
int accepted = 0;

EvaluationContext localFileEvaluationContext = null;
if (this.localFilenameGeneratorExpression != null) {
localFileEvaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
localFileEvaluationContext.setVariable("remoteDirectory", remoteDirectory);
}

for (F file : filteredFiles) {
if (filteringOneByOne) {
if ((maxFetchSize < 0 || accepted < maxFetchSize) && this.filter
Expand All @@ -383,7 +382,9 @@ private Integer transferFilesFromRemoteToLocal(File localDirectory, int maxFetch
copied--;
}
}
copied = copyIfNotNull(localDirectory, session, filteringOneByOne, filteredFiles, copied, file);
copied =
copyIfNotNull(remoteDirectory, localDirectory, localFileEvaluationContext, session,
filteringOneByOne, filteredFiles, copied, file);
}
return copied;
}
Expand All @@ -392,13 +393,16 @@ private Integer transferFilesFromRemoteToLocal(File localDirectory, int maxFetch
}
}

private int copyIfNotNull(File localDirectory, Session<F> session, boolean filteringOneByOne, List<F> filteredFiles,
int copied, @Nullable F file) throws IOException {
private int copyIfNotNull(String remoteDirectory, File localDirectory,
@Nullable EvaluationContext localFileEvaluationContext, Session<F> session, boolean filteringOneByOne,
List<F> filteredFiles, int copied, @Nullable F file) throws IOException {

boolean renamedFailed = false;
try {
if (file != null && !copyFileToLocalDirectory(this.evaluatedRemoteDirectory, file,
localDirectory, session)) {
if (file != null &&
!copyFileToLocalDirectory(remoteDirectory, localFileEvaluationContext, file, localDirectory,
session)) {

renamedFailed = true;
}
}
Expand Down Expand Up @@ -440,11 +444,12 @@ protected void rollbackFromFileToListEnd(List<F> filteredFiles, F file) {
}
}

protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, // NOSONAR
File localDirectory, Session<F> session) throws IOException {
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, // NOSONAR
@Nullable EvaluationContext localFileEvaluationContext, F remoteFile, File localDirectory,
Session<F> session) throws IOException {

String remoteFileName = getFilename(remoteFile);
String localFileName = generateLocalFileName(remoteFileName);
String localFileName = generateLocalFileName(remoteFileName, localFileEvaluationContext);
String remoteFilePath = remoteDirectoryPath != null
? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
: remoteFileName;
Expand Down Expand Up @@ -567,22 +572,16 @@ else if (this.logger.isInfoEnabled()) {
return renamed;
}

private String generateLocalFileName(String remoteFileName) {
private String generateLocalFileName(String remoteFileName,
@Nullable EvaluationContext localFileEvaluationContext) {

if (this.localFilenameGeneratorExpression != null) {
return this.localFilenameGeneratorExpression.getValue(this.evaluationContext, remoteFileName,
return this.localFilenameGeneratorExpression.getValue(localFileEvaluationContext, remoteFileName,
String.class);
}
return remoteFileName;
}

protected void evaluateRemoteDirectory() {
if (this.evaluationContext != null) {
this.evaluatedRemoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext,
String.class);
this.evaluationContext.setVariable("remoteDirectory", this.evaluatedRemoteDirectory);
}
}

/**
* Obtain a metadata for remote file associated with the provided local file.
* @param localFile the local file to retrieve metadata for.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,21 +18,27 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.mockito.Mockito.mock;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.expression.EvaluationContext;
import org.springframework.integration.expression.SupplierExpression;
import org.springframework.integration.file.HeadDirectoryScanner;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.ChainFileListFilter;
Expand Down Expand Up @@ -78,8 +84,10 @@ protected String protocol() {
}

@Override
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, String remoteFile,
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath,
EvaluationContext localFileEvaluationContext, String remoteFile,
File localDirectory, Session<String> session) throws IOException {

if ("bar".equals(remoteFile) && failWhenCopyingBar.getAndSet(false)) {
throw new IOException("fail");
}
Expand Down Expand Up @@ -209,13 +217,60 @@ public void testExclusiveWatchService() {
assertThat(count.get()).isEqualTo(1);
}

@Test(expected = IllegalStateException.class)
@Test
public void testScannerAndWatchServiceConflict() {
final AtomicInteger count = new AtomicInteger();
AbstractInboundFileSynchronizingMessageSource<String> source = createSource(count);
source.setUseWatchService(true);
source.setScanner(new HeadDirectoryScanner(1));
source.afterPropertiesSet();
assertThatIllegalStateException()
.isThrownBy(source::afterPropertiesSet);
}

@Test
public void testRemoteDirectoryRefreshedOnEachSynchronization(@TempDir File localDir) {
AbstractInboundFileSynchronizer<String> sync =
new AbstractInboundFileSynchronizer<String>(new StringSessionFactory()) {

@Override
protected boolean isFile(String file) {
return true;
}

@Override
protected String getFilename(String file) {
return file;
}

@Override
protected long getModified(String file) {
return 0;
}

@Override
protected String protocol() {
return "mock";
}

};

Queue<String> remoteDirs = new LinkedList<>();
remoteDirs.add("dir1");
remoteDirs.add("dir2");
sync.setRemoteDirectoryExpression(new SupplierExpression<>(remoteDirs::poll));
sync.setLocalFilenameGeneratorExpressionString("#remoteDirectory+'/'+#root");
sync.setBeanFactory(mock(BeanFactory.class));
sync.afterPropertiesSet();

sync.synchronizeToLocalDirectory(localDir);
sync.synchronizeToLocalDirectory(localDir);

/*Files.find(localDir.toPath(),
Integer.MAX_VALUE,
(filePath, fileAttr) -> fileAttr.isRegularFile())
.forEach(System.out::println);*/

assertThat(localDir.list()).contains("dir1", "dir2");
}

private AbstractInboundFileSynchronizingMessageSource<String> createSource(AtomicInteger count) {
Expand Down Expand Up @@ -267,8 +322,10 @@ protected String protocol() {
}

@Override
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, String remoteFile,
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath,
EvaluationContext localFileEvaluationContext, String remoteFile,
File localDirectory, Session<String> session) {

count.incrementAndGet();
return true;
}
Expand Down Expand Up @@ -301,7 +358,7 @@ public boolean remove(String path) {

@Override
public String[] list(String path) {
return new String[] { "foo", "bar", "baz" };
return new String[]{ "foo", "bar", "baz" };
}

@Override
Expand Down Expand Up @@ -366,7 +423,7 @@ public Object getClientInstance() {

@Override
public String getHostPort() {
return null;
return "mock:6666";
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
import org.springframework.context.ApplicationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.file.DirectoryScanner;
import org.springframework.integration.file.filters.CompositeFileListFilter;
import org.springframework.integration.file.filters.FileListFilter;
Expand Down Expand Up @@ -136,7 +137,8 @@ public void testFtpInboundChannelAdapterComplete() throws Exception {
method.setAccessible(true);
genMethod.set(method);
}, method -> "generateLocalFileName".equals(method.getName()));
assertThat(genMethod.get().invoke(fisync, "foo")).isEqualTo("FOO.afoo");
assertThat(genMethod.get().invoke(fisync, "foo", ExpressionUtils.createStandardEvaluationContext(this.context)))
.isEqualTo("FOO.afoo");
assertThat(inbound.getMaxFetchSize()).isEqualTo(42);
}

Expand Down