Skip to content

Commit

Permalink
[PDI-14179] - FTP put error when using ${Internal.Job.Filename.Direct…
Browse files Browse the repository at this point in the history
…ory} internal variable

- add a "localDirectory" parameter validation which fails on null value
- introduce a package-local method for creating the client to alleviate testing
- add unit tests for it
- add integration tests with embedded FTP server
  • Loading branch information
Andrey Khayrutdinov committed Oct 26, 2015
1 parent f400847 commit 638ef5a
Show file tree
Hide file tree
Showing 6 changed files with 377 additions and 104 deletions.
1 change: 1 addition & 0 deletions build.xml
Expand Up @@ -892,6 +892,7 @@
<test todir="${testreports.xml.dir}" name="org.pentaho.di.job.entries.sftp.JobEntrySFTPTest" haltonerror="false"/> <test todir="${testreports.xml.dir}" name="org.pentaho.di.job.entries.sftp.JobEntrySFTPTest" haltonerror="false"/>
<test todir="${testreports.xml.dir}" name="org.pentaho.di.job.entries.ftpsget.FTPSConnectionTest" haltonerror="false"/> <test todir="${testreports.xml.dir}" name="org.pentaho.di.job.entries.ftpsget.FTPSConnectionTest" haltonerror="false"/>
<test todir="${testreports.xml.dir}" name="org.pentaho.di.job.entries.ftpsget.JobEntryFTPSGetTest" haltonerror="false"/> <test todir="${testreports.xml.dir}" name="org.pentaho.di.job.entries.ftpsget.JobEntryFTPSGetTest" haltonerror="false"/>
<test todir="${testreports.xml.dir}" name="org.pentaho.di.job.entries.ftpput.JobEntryFTPPUTTest" haltonerror="false"/>


<!-- Some extra things to run --> <!-- Some extra things to run -->
<test todir="${testreports.xml.dir}" name="org.pentaho.di.job.entries.copyfiles.CopyFilesTest" <test todir="${testreports.xml.dir}" name="org.pentaho.di.job.entries.copyfiles.CopyFilesTest"
Expand Down
192 changes: 103 additions & 89 deletions engine/src/org/pentaho/di/job/entries/ftpput/JobEntryFTPPUT.java
Expand Up @@ -80,6 +80,8 @@
public class JobEntryFTPPUT extends JobEntryBase implements Cloneable, JobEntryInterface { public class JobEntryFTPPUT extends JobEntryBase implements Cloneable, JobEntryInterface {
private static Class<?> PKG = JobEntryFTPPUT.class; // for i18n purposes, needed by Translator2!! private static Class<?> PKG = JobEntryFTPPUT.class; // for i18n purposes, needed by Translator2!!


public static final int FTP_DEFAULT_PORT = 21;

private String serverName; private String serverName;
private String serverPort; private String serverPort;
private String userName; private String userName;
Expand Down Expand Up @@ -587,92 +589,14 @@ public Result execute( Result previousResult, int nr ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.Starting" ) ); logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.Starting" ) );
} }


// String substitution..
String realServerName = environmentSubstitute( serverName );
String realServerPort = environmentSubstitute( serverPort );
String realUsername = environmentSubstitute( userName );
String realPassword = Encr.decryptPasswordOptionallyEncrypted( environmentSubstitute( password ) );
String realRemoteDirectory = environmentSubstitute( remoteDirectory );
String realWildcard = environmentSubstitute( wildcard );
String realLocalDirectory = environmentSubstitute( localDirectory );

FTPClient ftpclient = null; FTPClient ftpclient = null;

try { try {
// Create ftp client to host:port ... // Create ftp client to host:port ...
ftpclient = new PDIFTPClient( log ); ftpclient = createAndSetUpFtpClient();
ftpclient.setRemoteAddr( InetAddress.getByName( realServerName ) );
if ( !Const.isEmpty( realServerPort ) ) {
ftpclient.setRemotePort( Const.toInt( realServerPort, 21 ) );
}

if ( !Const.isEmpty( proxyHost ) ) {
String realProxy_host = environmentSubstitute( proxyHost );
ftpclient.setRemoteAddr( InetAddress.getByName( realProxy_host ) );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobEntryFTPPUT.OpenedProxyConnectionOn", realProxy_host ) );
}

// FIXME: Proper default port for proxy
int port = Const.toInt( environmentSubstitute( proxyPort ), 21 );
if ( port != 0 ) {
ftpclient.setRemotePort( port );
}
} else {
ftpclient.setRemoteAddr( InetAddress.getByName( realServerName ) );

if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobEntryFTPPUT.OpenConnection", realServerName ) );
}
}

// set activeConnection connectmode ...
if ( activeConnection ) {
ftpclient.setConnectMode( FTPConnectMode.ACTIVE );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.SetActiveConnection" ) );
}
} else {
ftpclient.setConnectMode( FTPConnectMode.PASV );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.SetPassiveConnection" ) );
}
}

// Set the timeout
if ( timeout > 0 ) {
ftpclient.setTimeout( timeout );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.SetTimeout", "" + timeout ) );
}
}

ftpclient.setControlEncoding( controlEncoding );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.SetEncoding", controlEncoding ) );
}

// If socks proxy server was provided
if ( !Const.isEmpty( socksProxyHost ) ) {
// if a port was provided
if ( !Const.isEmpty( socksProxyPort ) ) {
FTPClient.initSOCKS( environmentSubstitute( socksProxyPort ), environmentSubstitute( socksProxyHost ) );
} else { // looks like we have a host and no port
throw new FTPException( BaseMessages.getString(
PKG, "JobFTPPUT.SocksProxy.PortMissingException", environmentSubstitute( socksProxyHost ) ) );
}
// now if we have authentication information
if ( !Const.isEmpty( socksProxyUsername )
&& Const.isEmpty( socksProxyPassword ) || Const.isEmpty( socksProxyUsername )
&& !Const.isEmpty( socksProxyPassword ) ) {
// we have a username without a password or vica versa
throw new FTPException( BaseMessages.getString(
PKG, "JobFTPPUT.SocksProxy.IncompleteCredentials", environmentSubstitute( socksProxyHost ),
getName() ) );
}
}


// login to ftp host ... // login to ftp host ...
String realUsername = environmentSubstitute( userName );
String realPassword = Encr.decryptPasswordOptionallyEncrypted( environmentSubstitute( password ) );
ftpclient.connect(); ftpclient.connect();
ftpclient.login( realUsername, realPassword ); ftpclient.login( realUsername, realPassword );


Expand All @@ -693,16 +617,22 @@ public Result execute( Result previousResult, int nr ) {
this.hookInOtherParsers( ftpclient ); this.hookInOtherParsers( ftpclient );


// move to spool dir ... // move to spool dir ...
String realRemoteDirectory = environmentSubstitute( remoteDirectory );
if ( !Const.isEmpty( realRemoteDirectory ) ) { if ( !Const.isEmpty( realRemoteDirectory ) ) {
ftpclient.chdir( realRemoteDirectory ); ftpclient.chdir( realRemoteDirectory );
if ( log.isDetailed() ) { if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.ChangedDirectory", realRemoteDirectory ) ); logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.ChangedDirectory", realRemoteDirectory ) );
} }
} }


// handle file:/// prefix String realLocalDirectory = environmentSubstitute( localDirectory );
if ( realLocalDirectory.startsWith( "file:" ) ) { if ( realLocalDirectory == null ) {
realLocalDirectory = new URI( realLocalDirectory ).getPath(); throw new FTPException( BaseMessages.getString( PKG, "JobFTPPUT.LocalDir.NotSpecified" ) );
} else {
// handle file:/// prefix
if ( realLocalDirectory.startsWith( "file:" ) ) {
realLocalDirectory = new URI( realLocalDirectory ).getPath();
}
} }


final List<String> files; final List<String> files;
Expand All @@ -719,17 +649,18 @@ public Result execute( Result previousResult, int nr ) {
} }
} }
} }

if ( log.isDetailed() ) { if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( logDetailed( BaseMessages.getString(
PKG, "JobFTPPUT.Log.FoundFileLocalDirectory", "" + files.size(), realLocalDirectory ) ); PKG, "JobFTPPUT.Log.FoundFileLocalDirectory", "" + files.size(), realLocalDirectory ) );
} }


Pattern pattern = null; String realWildcard = environmentSubstitute( wildcard );
Pattern pattern;
if ( !Const.isEmpty( realWildcard ) ) { if ( !Const.isEmpty( realWildcard ) ) {
pattern = Pattern.compile( realWildcard ); pattern = Pattern.compile( realWildcard );

} else {
} // end if pattern = null;
}


for ( String file : files ) { for ( String file : files ) {
if ( parentJob.isStopped() ) { if ( parentJob.isStopped() ) {
Expand Down Expand Up @@ -761,7 +692,7 @@ public Result execute( Result previousResult, int nr ) {
} }
} }


if ( !fileExist || ( !onlyPuttingNewFiles && fileExist ) ) { if ( !fileExist || !onlyPuttingNewFiles ) {
if ( log.isDebug() ) { if ( log.isDebug() ) {
logDebug( BaseMessages.getString( logDebug( BaseMessages.getString(
PKG, "JobFTPPUT.Log.PuttingFileToRemoteDirectory", file, realRemoteDirectory ) ); PKG, "JobFTPPUT.Log.PuttingFileToRemoteDirectory", file, realRemoteDirectory ) );
Expand Down Expand Up @@ -806,6 +737,89 @@ public Result execute( Result previousResult, int nr ) {
return result; return result;
} }


// package-local visibility for testing purposes
FTPClient createAndSetUpFtpClient() throws IOException, FTPException {
String realServerName = environmentSubstitute( serverName );
String realServerPort = environmentSubstitute( serverPort );

FTPClient ftpClient = createFtpClient();
ftpClient.setRemoteAddr( InetAddress.getByName( realServerName ) );
if ( !Const.isEmpty( realServerPort ) ) {
ftpClient.setRemotePort( Const.toInt( realServerPort, FTP_DEFAULT_PORT ) );
}

if ( !Const.isEmpty( proxyHost ) ) {
String realProxyHost = environmentSubstitute( proxyHost );
ftpClient.setRemoteAddr( InetAddress.getByName( realProxyHost ) );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobEntryFTPPUT.OpenedProxyConnectionOn", realProxyHost ) );
}

// FIXME: Proper default port for proxy
int port = Const.toInt( environmentSubstitute( proxyPort ), FTP_DEFAULT_PORT );
if ( port != 0 ) {
ftpClient.setRemotePort( port );
}
} else {
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobEntryFTPPUT.OpenConnection", realServerName ) );
}
}

// set activeConnection connectmode ...
if ( activeConnection ) {
ftpClient.setConnectMode( FTPConnectMode.ACTIVE );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.SetActiveConnection" ) );
}
} else {
ftpClient.setConnectMode( FTPConnectMode.PASV );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.SetPassiveConnection" ) );
}
}

// Set the timeout
if ( timeout > 0 ) {
ftpClient.setTimeout( timeout );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.SetTimeout", "" + timeout ) );
}
}

ftpClient.setControlEncoding( controlEncoding );
if ( log.isDetailed() ) {
logDetailed( BaseMessages.getString( PKG, "JobFTPPUT.Log.SetEncoding", controlEncoding ) );
}

// If socks proxy server was provided
if ( !Const.isEmpty( socksProxyHost ) ) {
// if a port was provided
if ( !Const.isEmpty( socksProxyPort ) ) {
FTPClient.initSOCKS( environmentSubstitute( socksProxyPort ), environmentSubstitute( socksProxyHost ) );
} else { // looks like we have a host and no port
throw new FTPException( BaseMessages.getString(
PKG, "JobFTPPUT.SocksProxy.PortMissingException", environmentSubstitute( socksProxyHost ) ) );
}
// now if we have authentication information
if ( !Const.isEmpty( socksProxyUsername )
&& Const.isEmpty( socksProxyPassword ) || Const.isEmpty( socksProxyUsername )
&& !Const.isEmpty( socksProxyPassword ) ) {
// we have a username without a password or vica versa
throw new FTPException( BaseMessages.getString(
PKG, "JobFTPPUT.SocksProxy.IncompleteCredentials", environmentSubstitute( socksProxyHost ),
getName() ) );
}
}

return ftpClient;
}

// package-local visibility for testing purposes
FTPClient createFtpClient() {
return new PDIFTPClient( log );
}

public boolean evaluates() { public boolean evaluates() {
return true; return true;
} }
Expand Down
Expand Up @@ -87,21 +87,22 @@ JobFTPPUTPUT.ProxyPort.Tooltip=Proxy port
JobFTPPUT.FolderExists.Title.Ok=OK JobFTPPUT.FolderExists.Title.Ok=OK
JobFTPPUT.Connected.Title.Bad=Connection KO JobFTPPUT.Connected.Title.Bad=Connection KO
JobFTPPUT.SocksProxy.IncompleteCredentials=Socks proxy credentials are incomplete. If credentials are not needed then clear both username and password. JobFTPPUT.SocksProxy.IncompleteCredentials=Socks proxy credentials are incomplete. If credentials are not needed then clear both username and password.
JobFTPPUT.LocalDir.Label=Local directory\: JobFTPPUT.LocalDir.Label=Local directory\:
JobFTPPUT.LocalDir.NotSpecified=Please specify Local directory
JobEntryFTPPUT.OpenedProxyConnectionOn=Opened FTP connection to proxy server [{0}] JobEntryFTPPUT.OpenedProxyConnectionOn=Opened FTP connection to proxy server [{0}]
JobFTPPUT.Log.SetEncoding=Set control encoding to [{0}] JobFTPPUT.Log.SetEncoding=Set control encoding to [{0}]
JobFTPPUT.RemoteDir.Tooltip=The remote directory on the FTP server JobFTPPUT.RemoteDir.Tooltip=The remote directory on the FTP server
JobFTPPUT.Log.FoundFileLocalDirectory=Found [{0}] files in the local directory [{1}] JobFTPPUT.Log.FoundFileLocalDirectory=Found [{0}] files in the local directory [{1}]
JobFTPPUT.Username.Label=Username\: JobFTPPUT.Username.Label=Username\:
JobFTPPUT.LocalDir.Tooltip=The directory on the local server JobFTPPUT.LocalDir.Tooltip=The directory on the local server
JobFTPPUT.RemoveFiles.Tooltip=Removes files on remote server after transfer.\nBe CAREFUL using this option, probably deletion cannot be undone. JobFTPPUT.RemoveFiles.Tooltip=Removes files on remote server after transfer.\nBe CAREFUL using this option, probably deletion cannot be undone.
JobEntryFTP.DEBUG.Created.Other.Parser=Parser [{0}] created. JobEntryFTP.DEBUG.Created.Other.Parser=Parser [{0}] created.
JobEntryFTP.DEBUG.Created.Factory=Factory created. JobEntryFTP.DEBUG.Created.Factory=Factory created.
JobFTPPUT.Server.Label=FTP server name / IP address\: JobFTPPUT.Server.Label=FTP server name / IP address\:
JobFTPPUT.Log.DeletedFile=Deleted local file [{0}] JobFTPPUT.Log.DeletedFile=Deleted local file [{0}]
JobFTPPUT.Name.Default=Put files via FTP JobFTPPUT.Name.Default=Put files via FTP
JobFTPPUT.SocksProxyHost.Tooltip=The host name or IP address of the socks proxy host JobFTPPUT.SocksProxyHost.Tooltip=The host name or IP address of the socks proxy host
JobFTPPUT.Password.Label=Password\: JobFTPPUT.Password.Label=Password\:
JobFTPPUT.Tab.General.Label=General JobFTPPUT.Tab.General.Label=General
JobFTPPUT.ProxyHost.Label=Proxy host JobFTPPUT.ProxyHost.Label=Proxy host
JobFTPPUT.Log.UnableToLoadFromXml=Unable to load job entry of type ''FTPPUT'' from XML node JobFTPPUT.Log.UnableToLoadFromXml=Unable to load job entry of type ''FTPPUT'' from XML node
@@ -0,0 +1,91 @@
/*! ******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.di.job.entries.ftpput;

import com.enterprisedt.net.ftp.FTPClient;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.net.InetAddress;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

/**
* @author Andrey Khayrutdinov
*/
public class JobEntryFTPPUTTest {

private JobEntryFTPPUT entry;
private FTPClient ftpClient;

@Before
public void setUp() throws Exception {
ftpClient = mock( FTPClient.class );

entry = spy( new JobEntryFTPPUT() );
doReturn( ftpClient ).when( entry ).createFtpClient();
}

@Test
public void createFtpClient_SetsLocalhostByDefault() throws Exception {
entry.setServerName( null );
entry.createAndSetUpFtpClient();
assertEquals( "localhost", getHostFromClient() );
}

@Test
public void createFtpClient_DoesNotChangePortByDefault() throws Exception {
entry.setServerPort( null );
entry.createAndSetUpFtpClient();
verify( ftpClient, never() ).setRemotePort( anyInt() );
}

@Test
public void createFtpClient_UsesProxyIfSet() throws Exception {
entry.setProxyHost( "localhost" );
entry.setProxyPort( "123" );
entry.createAndSetUpFtpClient();

assertEquals( "localhost", getHostFromClient() );
// we cannot distinguish values of proxy and target server, as both of them refers to localhost
// that is why I put invocation counter here
verify( ftpClient, times( 2 ) ).setRemoteAddr( InetAddress.getByName( "localhost" ) );
verify( ftpClient ).setRemotePort( 123 );
}

@Test
public void createFtpClient_UsesTimeoutIfSet() throws Exception {
entry.setTimeout( 10 );
entry.createAndSetUpFtpClient();
verify( ftpClient ).setTimeout( 10 );
}


private String getHostFromClient() throws Exception {
ArgumentCaptor<InetAddress> addrCaptor = ArgumentCaptor.forClass( InetAddress.class );
verify( ftpClient, atLeastOnce() ).setRemoteAddr( addrCaptor.capture() );
return addrCaptor.getValue().getHostName();
}
}

0 comments on commit 638ef5a

Please sign in to comment.