The FileSplitter
was added in version 4.1.2, and its namespace support was added in version 4.2.
The FileSplitter
splits text files into individual lines, based on BufferedReader.readLine()
.
By default, the splitter uses an Iterator
to emit lines one at a time as they are read from the file.
Setting the iterator
property to false
causes it to read all the lines into memory before emitting them as messages.
One use case for this might be if you want to detect I/O errors on the file before sending any messages containing lines.
However, it is only practical for relatively short files.
Inbound payloads can be File
, String
(a File
path), InputStream
, or Reader
.
Other payload types are emitted unchanged.
The following listing shows possible ways to configure a FileSplitter
:
- Java DSL
-
@SpringBootApplication public class FileSplitterApplication { public static void main(String[] args) { new SpringApplicationBuilder(FileSplitterApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow fileSplitterFlow() { return IntegrationFlow .from(Files.inboundAdapter(tmpDir.getRoot()) .filter(new ChainFileListFilter<File>() .addFilter(new AcceptOnceFileListFilter<>()) .addFilter(new ExpressionFileListFilter<>( new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName())))))) .split(Files.splitter() .markers() .charset(StandardCharsets.US_ASCII) .firstLineAsHeader("fileHeader") .applySequence(true)) .channel(c -> c.queue("fileSplittingResultChannel")) .get(); } }
- Kotlin DSL
-
@Bean fun fileSplitterFlow() = integrationFlow( Files.inboundAdapter(tmpDir.getRoot()) .filter( ChainFileListFilter<File?>() .addFilter(AcceptOnceFileListFilter()) .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name })) ) ) { split( Files.splitter() .markers() .charset(StandardCharsets.US_ASCII) .firstLineAsHeader("fileHeader") .applySequence(true) ) channel { queue("fileSplittingResultChannel") } }
- Java
-
@Splitter(inputChannel="toSplitter") @Bean public MessageHandler fileSplitter() { FileSplitter splitter = new FileSplitter(true, true); splitter.setApplySequence(true); splitter.setOutputChannel(outputChannel); return splitter; }
- XML
-
<int-file:splitter id="splitter" (1) iterator="" (2) markers="" (3) markers-json="" (4) apply-sequence="" (5) requires-reply="" (6) charset="" (7) first-line-as-header="" (8) input-channel="" (9) output-channel="" (10) send-timeout="" (11) auto-startup="" (12) order="" (13) phase="" /> (14)
-
The bean name of the splitter.
-
Set to
true
(the default) to use an iterator orfalse
to load the file into memory before sending lines. -
Set to
true
to emit start-of-file and end-of-file marker messages before and after the file data. Markers are messages withFileSplitter.FileMarker
payloads (withSTART
andEND
values in themark
property). You might use markers when sequentially processing files in a downstream flow where some lines are filtered. They enable the downstream processing to know when a file has been completely processed. In addition, afile_marker
header that containsSTART
orEND
is added to these messages. TheEND
marker includes a line count. If the file is empty, onlySTART
andEND
markers are emitted with0
as thelineCount
. The default isfalse
. Whentrue
,apply-sequence
isfalse
by default. See alsomarkers-json
(the next attribute). -
When
markers
is true, set this totrue
to have theFileMarker
objects be converted to a JSON string. (Uses aSimpleJsonSerializer
underneath). -
Set to
false
to disable the inclusion ofsequenceSize
andsequenceNumber
headers in messages. The default istrue
, unlessmarkers
istrue
. Whentrue
andmarkers
istrue
, the markers are included in the sequencing. Whentrue
anditerator
istrue
, thesequenceSize
header is set to0
, because the size is unknown. -
Set to
true
to cause aRequiresReplyException
to be thrown if there are no lines in the file. The default isfalse
. -
Set the charset name to be used when reading text data into
String
payloads. The default is the platform charset. -
The header name for the first line to be carried as a header in the messages emitted for the remaining lines. Since version 5.0.
-
Set the input channel used to send messages to the splitter.
-
Set the output channel to which messages are sent.
-
Set the send timeout. Only applies if the
output-channel
can block — such as a fullQueueChannel
. -
Set to
false
to disable automatically starting the splitter when the context is refreshed. The default istrue
. -
Set the order of this endpoint if the
input-channel
is a<publish-subscribe-channel/>
. -
Set the startup phase for the splitter (used when
auto-startup
istrue
).
The FileSplitter
also splits any text-based InputStream
into lines.
Starting with version 4.3, when used in conjunction with an FTP or SFTP streaming inbound channel adapter or an FTP or SFTP outbound gateway that uses the stream
option to retrieve a file, the splitter automatically closes the session that supports the stream when the file is completely consumed
See FTP Streaming Inbound Channel Adapter and SFTP Streaming Inbound Channel Adapter as well as FTP Outbound Gateway and SFTP Outbound Gateway for more information about these facilities.
When using Java configuration, an additional constructor is available, as the following example shows:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
When markersJson
is true, the markers are represented as a JSON string (using a SimpleJsonSerializer
).
Version 5.0 introduced the firstLineAsHeader
option to specify that the first line of content is a header (such as column names in a CSV file).
The argument passed to this property is the header name under which the first line is carried as a header in the messages emitted for the remaining lines.
This line is not included in the sequence header (if applySequence
is true) nor in the lineCount
associated with FileMarker.END
.
NOTE: Starting with version 5.5, the lineCount` is also included as a FileHeaders.LINE_COUNT
into headers of the FileMarker.END
message, since the FileMarker
could be serialized into JSON.
If a file contains only the header line, the file is treated as empty and, therefore, only FileMarker
instances are emitted during splitting (if markers are enabled — otherwise, no messages are emitted).
By default (if no header name is set), the first line is considered to be data and becomes the payload of the first emitted message.
If you need more complex logic about header extraction from the file content (not first line, not the whole content of the line, not one particular header, and so on), consider using header enricher ahead of the FileSplitter
.
Note that the lines that have been moved to the headers might be filtered downstream from the normal content process.
When apply-sequence
is true, the splitter adds the line number in the SEQUENCE_NUMBER
header (when markers
is true, the markers are counted as lines).
The line number can be used with an Idempotent Receiver to avoid reprocessing lines after a restart.
For example:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}