Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Latest commit

 

History

History
184 lines (139 loc) · 10.8 KB

File metadata and controls

184 lines (139 loc) · 10.8 KB

SFTP Source

This source app supports transfer of files using the SFTP protocol. Files are transferred from the remote directory to the local directory where the application is deployed.

Messages emitted by the source are provided as a byte array by default. However, this can be customized using the --mode option:

  • ref Provides a java.io.File reference

  • lines Will split files line-by-line and emit a new message for each line

  • contents The default. Provides the contents of a file as a byte array

When using --mode=lines, you can also provide the additional option --withMarkers=true. If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data. The payload of these 2 additional marker messages is of type FileSplitter.FileMarker. The option withMarkers defaults to false if not explicitly set.

When configuring the sftp.factory.known-hosts-expression option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'.

See also MetadataStore options for possible shared persistent store configuration for the SftpPersistentAcceptOnceFileListFilter and IdempotentReceiverInterceptor used in the SFTP Source.

Multiple SFTP Servers

This source supports polling multiple sftp servers. This requires configuring multiple session factories. The following configuration will poll two sftp servers, consuming files in a round-robin fashion:

sftp.factories.one.host=host1
sftp.factories.one.port=1234,
sftp.factories.one.username = user1,
sftp.factories.one.password = pass1,
...
sftp.factories.two.host=host2,
sftp.factories.two.port=2345,
sftp.factories.two.username = user2,
sftp.factories.two.password = pass2,
sftp.directories=one.sftpSource,two.sftpSecondSource,
sftp.max-fetch=1,
sftp.fair=true

Note
The TaskLaunchRequest output functionality is currently supported here for legacy reasons. If you are interested in this feature, we recommend using the sftp-datafow-source which is intended specifically for this use case. A task launch request posted to the Data Flow Server API is much simpler to use than the TaskLaunchRequest supported by this app which supports launching tasks using one of the provided platform specific task launchers. Using a platform specific task launcher makes it possible to launch tasks when a Data Flow server is not deployed, but requires several additional configuration parameters.

Input

N/A (Fetches files from an SFTP server).

Output

mode = contents

Headers:

  • Content-Type: application/octet-stream

  • file_originalFile: <java.io.File>

  • file_name: <file name>

Payload:

A byte[] filled with the file contents.

mode = lines

Headers:

  • Content-Type: text/plain

  • file_originalFile: <java.io.File>

  • file_name: <file name>

  • correlationId: <UUID> (same for each line)

  • sequenceNumber: <n>

  • sequenceSize: 0 (number of lines is not know until the file is read)

Payload:

A String for each line.

The first line is optionally preceded by a message with a START marker payload. The last line is optionally followed by a message with an END marker payload.

Marker presence and format are determined by the with-markers and markers-json properties.

mode = ref

Headers:

None.

Payload:

A java.io.File object.

task-launcher-output = true

Headers:

  • Content-Type: application/json

  • file_remoteDirectory: <java.lang.String>

Payload:

A TaskLaunchRequest object with the following set as command line arguments (also bound to job parameters for Spring Batch):

  • <task.local-file-path-parameter-name>=<task.local-file-path-parameter-value>

  • <task.remote-file-path-parameter-name>=<task.remote-file-path-parameter-value>

  • Any provided`task.parameters`

task.resource-uri is required. task.deployment-properties and task.environment-properties are optional.

Options

The sftp source has the following options:

file.consumer.markers-json

When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default: true)

file.consumer.mode

The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default: <none>, possible values: ref,lines,contents)

file.consumer.with-markers

Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default: <none>)

sftp.auto-create-local-dir

Set to true to create the local directory if it does not exist. (Boolean, default: true)

sftp.delete-remote-files

Set to true to delete remote files after successful transfer. (Boolean, default: false)

sftp.directories

A list of factory "name.directory" pairs. (String[], default: <none>)

sftp.factories

A map of factory names to factories. (Map<String, Factory>, default: <none>)

sftp.factory.allow-unknown-keys

True to allow an unknown or changed key. (Boolean, default: false)

sftp.factory.host

The host name of the server. (String, default: localhost)

sftp.factory.known-hosts-expression

A SpEL expression resolving to the location of the known hosts file. (Expression, default: <none>)

sftp.factory.pass-phrase

Passphrase for user's private key. (String, default: <empty string>)

sftp.factory.password

The password to use to connect to the server. (String, default: <none>)

sftp.factory.port

The port of the server. (Integer, default: 22)

sftp.factory.private-key

Resource location of user's private key. (Resource, default: <none>)

sftp.factory.username

The username to use to connect to the server. (String, default: <none>)

sftp.fair

True for fair polling of multiple servers/directories. (Boolean, default: false)

sftp.filename-pattern

A filter pattern to match the names of files to transfer. (String, default: <none>)

sftp.filename-regex

A filter regex pattern to match the names of files to transfer. (Pattern, default: <none>)

sftp.list-only

Set to true to return file metadata without the entire payload. (Boolean, default: false)

sftp.local-dir

The local directory to use for file transfers. (File, default: <none>)

sftp.max-fetch

The maximum number of remote files to fetch per poll; default unlimited. Does not apply when listing files or building task launch requests. (Integer, default: <none>)

sftp.preserve-timestamp

Set to true to preserve the original timestamp. (Boolean, default: true)

sftp.remote-dir

The remote FTP directory. (String, default: /)

sftp.remote-file-separator

The remote file separator. (String, default: /)

sftp.stream

Set to true to stream the file rather than copy to a local directory. (Boolean, default: false)

sftp.task-launcher-output

Set to true to create output suitable for a task launch request. (Boolean, default: false)

sftp.task.application-name

The task application name. (String, default: <none>)

sftp.task.data-source-password

The datasource password to be applied to the TaskLaunchRequest. (String, default: <none>)

sftp.task.data-source-url

The datasource url to be applied to the TaskLaunchRequest. Defaults to h2 in-memory JDBC datasource url. (String, default: jdbc:h2:tcp://localhost:19092/mem:dataflow)

sftp.task.data-source-user-name

The datasource user name to be applied to the TaskLaunchRequest. Defaults to "sa" (String, default: sa)

sftp.task.deployment-properties

Comma delimited list of deployment properties to be applied to the TaskLaunchRequest. (String, default: <none>)

sftp.task.environment-properties

Comma delimited list of environment properties to be applied to the TaskLaunchRequest. (String, default: <none>)

sftp.task.local-file-path-parameter-name

Value to use as the local file parameter name. (String, default: localFilePath)

sftp.task.local-file-path-parameter-value

The file path to use as the local file parameter value. Defaults to "java.io.tmpdir". (String, default: <none>)

sftp.task.parameters

Comma separated list of optional parameters in key=value format. (List<String>, default: <none>)

sftp.task.remote-file-path-parameter-name

Value to use as the remote file parameter name. (String, default: remoteFilePath)

sftp.task.resource-uri

The URI of the task artifact to be applied to the TaskLaunchRequest. (String, default: <empty string>)

sftp.tmp-file-suffix

The suffix to use while the transfer is in progress. (String, default: .tmp)

trigger.cron

Cron expression value for the Cron Trigger. (String, default: <none>)

trigger.date-format

Format for the date value. (String, default: <none>)

trigger.fixed-delay

Fixed delay for periodic triggers. (Integer, default: 1)

trigger.initial-delay

Initial delay for periodic triggers. (Integer, default: 0)

trigger.max-messages

Maximum messages per poll, -1 means infinity. (Long, default: -1)

trigger.time-unit

The TimeUnit to apply to delay values. (TimeUnit, default: SECONDS, possible values: NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS)

Build

$ ./mvnw clean install -PgenerateApps
$ cd apps

You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:

$ ./mvnw clean package

Examples

java -jar sftp_source.jar --sftp.remote-dir=foo --file.consumer.mode=lines --trigger.fixed-delay=60 \
         --sftp.factory.host=sftpserver --sftp.factory.username=user --sftp.factory.password=pw --sftp.local-dir=/foo