Starting with version 5.5, a FileAggregator
is introduced to cover other side of FileSplitter
use-case when START/END markers are enabled.
For convenience the FileAggregator
implements all three sequence details strategies:
-
The
HeaderAttributeCorrelationStrategy
with theFileHeaders.FILENAME
attribute is used for correlation key calculation. When markers are enabled on theFileSplitter
, it does not populate sequence details headers, since START/END marker messages are also included into the sequence size. TheFileHeaders.FILENAME
is still populated for each line emitted, including START/END marker messages. -
The
FileMarkerReleaseStrategy
- checks forFileSplitter.FileMarker.Mark.END
message in the group and then compare aFileHeaders.LINE_COUNT
header value with the group size minus2
-FileSplitter.FileMarker
instances. It also implements a convenientGroupConditionProvider
contact forconditionSupplier
function to be used in theAbstractCorrelatingMessageHandler
. See Message Group Condition for more information. -
The
FileAggregatingMessageGroupProcessor
just removesFileSplitter.FileMarker
messages from the group and collect the rest of messages into a list payload to produce.
The following listing shows possible ways to configure a FileAggregator
:
- Java DSL
-
@Bean public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) { return f -> f .split(Files.splitter() .markers() .firstLineAsHeader("firstLine")) .channel(c -> c.executor(taskExecutor)) .filter(payload -> !(payload instanceof FileSplitter.FileMarker), e -> e.discardChannel("aggregatorChannel")) .<String, String>transform(String::toUpperCase) .channel("aggregatorChannel") .aggregate(new FileAggregator()) .channel(c -> c.queue("resultChannel")); }
- Kotlin DSL
-
@Bean fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) = integrationFlow { split(Files.splitter().markers().firstLineAsHeader("firstLine")) channel { executor(taskExecutor) } filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") } transform(String::toUpperCase) channel("aggregatorChannel") aggregate(FileAggregator()) channel { queue("resultChannel") } }
- Java
-
@serviceActivator(inputChannel="toAggregateFile") @Bean public AggregatorFactoryBean fileAggregator() { AggregatorFactoryBean aggregator = new AggregatorFactoryBean(); aggregator.setProcessorBean(new FileAggregator()); aggregator.setOutputChannel(outputChannel); return aggregator; }
- XML
-
<int:chain input-channel="input" output-channel="output"> <int-file:splitter markers="true"/> <int:aggregator> <bean class="org.springframework.integration.file.aggregator.FileAggregator"/> </int:aggregator> </int:chain>
If default behavior of the FileAggregator
does not satisfy the target logic, it is recommended to configure an aggregator endpoint with individual strategies.
See FileAggregator
JavaDocs for more information.