Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.springframework.integration.file.DefaultFileNameGenerator;
import org.springframework.integration.file.FileNameGenerator;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.DefaultSessionFactoryResolver;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.remote.session.SessionFactoryResolver;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
import org.springframework.messaging.Message;
Expand All @@ -61,7 +63,7 @@ public class RemoteFileTemplate<F> implements RemoteFileOperations<F>, Initializ
/**
* the {@link SessionFactory} for acquiring remote file Sessions.
*/
protected final SessionFactory<F> sessionFactory;
protected SessionFactory<F> sessionFactory;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should remain final, the same for the SessionFactoryResolver: or one of another. Of course, the opposite one should be null from consturctors.


private volatile String temporaryFileSuffix =".writing";

Expand All @@ -87,9 +89,22 @@ public class RemoteFileTemplate<F> implements RemoteFileOperations<F>, Initializ

private volatile BeanFactory beanFactory;

private final SessionFactoryResolver<F> resolver;

public RemoteFileTemplate(SessionFactory<F> sessionFactory) {
Assert.notNull(sessionFactory, "sessionFactory must not be null");
this.sessionFactory = sessionFactory;
resolver = null;
}

public RemoteFileTemplate(SessionFactoryResolver<F> resolver) {
if(resolver == null) {
this.resolver = new DefaultSessionFactoryResolver<F>();
}
else {
this.resolver = resolver;
}
sessionFactory = null;
}

public void setAutoCreateDirectory(boolean autoCreateDirectory) {
Expand Down Expand Up @@ -200,6 +215,12 @@ public String send(final Message<?> message, String subDirectory, FileExistsMode
}

private String send(final Message<?> message, final String subDirectory, final FileExistsMode mode) {
if(resolver != null) {
SessionFactory<F> tmpSessionFactory = this.resolver.resolve(message);
if(tmpSessionFactory != null) {
this.sessionFactory = tmpSessionFactory;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't thik so: tmpSessionFactory is just for the current message. You must not mutate RemoteFileTemplate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolver can only be set constructor level, when it is set, sessionfactory is null, so I modify this.sessionFactory here.
Besides, tmpSessionFactory is used in another method: execute, if I don't use this.sessionFactory to pass the value, I still need to define a class variable to pass the tmpSessionFactory to execute.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't good, because one thread extracts session from the message and modify the class variable, it becomes visible for another thread. You don't follow here with stateless principle. Such a behaviour isn't desired for shared components, like RemoteFileTemplate.
You have to propagate this local session to that execute method, but don't mutate the component state.
I can guess that we can have this.sessionFactory, if this.resolver returns null, but I need to see entire picture of this premise...
Will take a look tomorrow closer.

}
}
Assert.notNull(this.directoryExpressionProcessor, "'remoteDirectoryExpression' is required");
Assert.isTrue(!FileExistsMode.APPEND.equals(mode) || !this.useTemporaryFileName,
"Cannot append when using a temporary file name");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.file.remote.session;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.SpelParserConfiguration;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.expression.IntegrationEvaluationContextAware;
import org.springframework.messaging.Message;

/**
* Default {@link SessionFactoryResolver}; Used to resolve sessionfactory from message sent
* containing sessionfactory bean name.
*
* @author David Liu
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaDocs on the matter

* @since 4.1
*
*/
public class DefaultSessionFactoryResolver<F> implements SessionFactoryResolver<F>, BeanFactoryAware, IntegrationEvaluationContextAware{

private final static String SESSIONFACTORY = "headers['sessionFactory']";

private final ExpressionParser expressionParser = new SpelExpressionParser(new SpelParserConfiguration(true, true));

private volatile Expression sessionFactoryExpression = expressionParser.parseExpression(SESSIONFACTORY);

private volatile BeanFactory beanFactory;

private EvaluationContext evaluationContext;

public void setSessionFactory(String expression) {
this.sessionFactoryExpression = expressionParser.parseExpression(expression);
}

@Override
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
}

@SuppressWarnings("unchecked")
@Override
public SessionFactory<F> resolve(Message<?> message) {
if(this.beanFactory != null) {
if(evaluationContext != null) {
return this.beanFactory.getBean(this.sessionFactoryExpression.getValue(evaluationContext, message, String.class), SessionFactory.class);
}
return this.beanFactory.getBean(this.sessionFactoryExpression.getValue(message, String.class), SessionFactory.class);
}
return null;
}

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad methods order. Blank line before the last } in each class

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.file.remote.session;

import org.springframework.messaging.Message;

/**
* A resolver to resolve sessionfactory from sending message
*
* @author David Liu
* @since 4.1
*
*/
public interface SessionFactoryResolver<F> {

SessionFactory<F> resolve(Message<?> message);

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad code style. Blank line around the method and before the last }
JavaDocs as well

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.integration.file.remote.SessionCallback;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.remote.session.SessionFactoryResolver;
import org.springframework.messaging.MessagingException;

/**
Expand All @@ -40,6 +41,9 @@ public class FtpRemoteFileTemplate extends RemoteFileTemplate<FTPFile> {
public FtpRemoteFileTemplate(SessionFactory<FTPFile> sessionFactory) {
super(sessionFactory);
}
public FtpRemoteFileTemplate(SessionFactoryResolver<FTPFile> resolver) {
super(resolver);
}

@SuppressWarnings("unchecked")
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.ftp.session;

import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.SpelParserConfiguration;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.expression.IntegrationEvaluationContextAware;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.remote.session.SessionFactoryResolver;
import org.springframework.messaging.Message;

/**
* Ftp {@link SessionFactoryResolver}; Used to resolve sessionfactory from message sent
* containing host, username, password, name.
*
* @author David Liu
* @since 4.1
*
*/
public class FtpSessionFactoryResolver<F> implements SessionFactoryResolver<F>, IntegrationEvaluationContextAware{

private final static String DEFAULT_HOST_EXPRESSION_STRING = "headers['ftp_host']";

private final static String DEFAULT_USER_EXPRESSION_STRING = "headers['ftp_username']";

private final static String DEFAULT_PASSWORD_EXPRESSION_STRING = "headers['ftp_password']";

private final static String DEFAULT_PORT_EXPRESSION_STRING = "headers['ftp_port']";

private final ExpressionParser expressionParser = new SpelExpressionParser(new SpelParserConfiguration(true, true));

private volatile Expression headerExpression = expressionParser.parseExpression(DEFAULT_HOST_EXPRESSION_STRING);

private volatile Expression userNameExpression = expressionParser.parseExpression(DEFAULT_USER_EXPRESSION_STRING);

private volatile Expression passwordExpression = expressionParser.parseExpression(DEFAULT_PASSWORD_EXPRESSION_STRING);

private volatile Expression portExpression = expressionParser.parseExpression(DEFAULT_PORT_EXPRESSION_STRING);

private EvaluationContext evaluationContext;

public void setHostExpression(String hostExpression) {
this.headerExpression = expressionParser.parseExpression(hostExpression);
}

public void setUserNameExpression(String userExpression) {
this.userNameExpression = expressionParser.parseExpression(userExpression);
}

public void setPasswordExpression(String passwordExpression) {
this.passwordExpression = expressionParser.parseExpression(passwordExpression);
}

public void setPortExpression(String portExpression) {
this.portExpression = expressionParser.parseExpression(portExpression);
}

@Override
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
}

@SuppressWarnings("unchecked")
@Override
public SessionFactory<F> resolve(Message<?> message) {
DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
sessionFactory.setHost(evaluationContext != null ? this.headerExpression.getValue(evaluationContext, message, String.class)
: this.headerExpression.getValue(message, String.class));
sessionFactory.setUsername(evaluationContext != null ? this.userNameExpression.getValue(evaluationContext, message, String.class)
: this.userNameExpression.getValue(message, String.class));
sessionFactory.setPassword(evaluationContext != null ? this.passwordExpression.getValue(evaluationContext, message, String.class)
: this.passwordExpression.getValue(message, String.class));
sessionFactory.setPort(evaluationContext != null ? this.portExpression.getValue(evaluationContext, message, Integer.class)
: this.portExpression.getValue(message, Integer.class));
return (SessionFactory<F>) sessionFactory;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.file.DefaultFileNameGenerator;
import org.springframework.integration.file.remote.ClientCallbackWithoutResult;
import org.springframework.integration.file.remote.SessionCallback;
import org.springframework.integration.file.remote.SessionCallbackWithoutResult;
import org.springframework.integration.file.remote.session.DefaultSessionFactoryResolver;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.ftp.TestFtpServer;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
Expand All @@ -55,6 +60,9 @@ public class FtpRemoteFileTemplateTests {
@Autowired
private DefaultFtpSessionFactory sessionFactory;

@Autowired
private ApplicationContext ctx;

@Before
@After
public void setup() {
Expand Down Expand Up @@ -109,4 +117,54 @@ public void doInSessionWithoutResult(Session<FTPFile> session) throws IOExceptio
assertFalse(template.exists("foo"));
}

@Test
public void testFtpSessionFactoryResolver() {
Message<?> message = MessageBuilder.withPayload("foo").setHeader("ftp_host", "localhost").
setHeader("ftp_username", "foo").setHeader("ftp_password", "foo").setHeader("ftp_port", sessionFactory.port).build();
DefaultFileNameGenerator fileNameGenerator = new DefaultFileNameGenerator();
fileNameGenerator.setExpression("'foobar.txt'");
FtpRemoteFileTemplate template = new FtpRemoteFileTemplate(new FtpSessionFactoryResolver<FTPFile>());
template.setFileNameGenerator(fileNameGenerator);
template.setRemoteDirectoryExpression(new LiteralExpression("foo/"));
template.setUseTemporaryFileName(false);
template.send(message,"foo/",FileExistsMode.IGNORE);
template.execute(new SessionCallback<FTPFile, Boolean>() {

@Override
public Boolean doInSession(Session<FTPFile> session) throws IOException {
session.mkdir("foo/");
return session.mkdir("foo/foo/");
}

});
template.send(message,"foo/",FileExistsMode.REPLACE);
assertTrue(template.exists("foo/foo/foobar.txt"));
}

@Test
public void testDefaultSessionFactoryResolver() {
Message<?> message = MessageBuilder.withPayload("foo").setHeader("sessionFactory", "ftpSessionFactory").build();
DefaultFileNameGenerator fileNameGenerator = new DefaultFileNameGenerator();
fileNameGenerator.setExpression("'foobar.txt'");
DefaultSessionFactoryResolver<FTPFile> resolver = new DefaultSessionFactoryResolver<FTPFile>();
resolver.setBeanFactory(ctx);
FtpRemoteFileTemplate template = new FtpRemoteFileTemplate(resolver);
template.setFileNameGenerator(fileNameGenerator);
template.setRemoteDirectoryExpression(new LiteralExpression("foo/"));
template.setUseTemporaryFileName(false);
template.send(message,"foo/",FileExistsMode.IGNORE);
template.execute(new SessionCallback<FTPFile, Boolean>() {

@Override
public Boolean doInSession(Session<FTPFile> session) throws IOException {
session.mkdir("foo/");
return session.mkdir("foo/foo/");
}

});
template.send(message,"foo/",FileExistsMode.REPLACE);
assertTrue(template.exists("foo/foo/foobar.txt"));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.integration.file.remote.SessionCallback;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.remote.session.SessionFactoryResolver;

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.ChannelSftp.LsEntry;
Expand All @@ -41,6 +42,10 @@ public SftpRemoteFileTemplate(SessionFactory<LsEntry> sessionFactory) {
super(sessionFactory);
}

public SftpRemoteFileTemplate(SessionFactoryResolver<LsEntry> resolver) {
super(resolver);
}

@SuppressWarnings("unchecked")
@Override
public <T, C> T executeWithClient(final ClientCallback<C, T> callback) {
Expand Down
Loading