-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INT-4116: Introduce FileAggregator #3511
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright 2021 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.file.aggregator; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor; | ||
import org.springframework.integration.file.FileHeaders; | ||
import org.springframework.integration.store.MessageGroup; | ||
import org.springframework.messaging.Message; | ||
|
||
/** | ||
* An {@link AbstractAggregatingMessageGroupProcessor} implementation for file content collecting | ||
* previously splitted by the {@link org.springframework.integration.file.splitter.FileSplitter} | ||
* with the {@code markers} option turned on. | ||
* <p> | ||
* If no file markers present in the {@link MessageGroup}, then behavior of this processor is | ||
* similar to the {@link org.springframework.integration.aggregator.DefaultAggregatingMessageGroupProcessor}. | ||
* <p> | ||
* When no file content (only file markers are grouped), this processor emits an empty {@link ArrayList}. | ||
* Note: with no file content and markers turned off, | ||
* the {@link org.springframework.integration.file.splitter.FileSplitter} doesn't emit any messages | ||
* for possible aggregation downstream. | ||
* | ||
* @author Artem Bilan | ||
* | ||
* @since 5.5 | ||
*/ | ||
public class FileAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor { | ||
|
||
@Override | ||
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) { | ||
Collection<Message<?>> messages = group.getMessages(); | ||
List<Object> payloads = new ArrayList<>(messages.size() - 2); | ||
for (Message<?> message : messages) { | ||
if (!message.getHeaders().containsKey(FileHeaders.MARKER)) { | ||
payloads.add(message.getPayload()); | ||
} | ||
} | ||
return payloads; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Copyright 2021 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.file.aggregator; | ||
|
||
import org.springframework.beans.BeansException; | ||
import org.springframework.beans.factory.BeanFactory; | ||
import org.springframework.beans.factory.BeanFactoryAware; | ||
import org.springframework.integration.aggregator.CorrelationStrategy; | ||
import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy; | ||
import org.springframework.integration.aggregator.MessageGroupProcessor; | ||
import org.springframework.integration.aggregator.ReleaseStrategy; | ||
import org.springframework.integration.file.FileHeaders; | ||
import org.springframework.integration.file.splitter.FileSplitter; | ||
import org.springframework.integration.store.MessageGroup; | ||
import org.springframework.messaging.Message; | ||
|
||
/** | ||
* A convenient component to utilize a {@link FileSplitter.FileMarker}-based aggregation logic. | ||
* Implements all three {@link CorrelationStrategy}, {@link ReleaseStrategy} and {@link MessageGroupProcessor} | ||
* for runtime optimization. | ||
* Delegates to {@link HeaderAttributeCorrelationStrategy} with {@link FileHeaders#FILENAME} attribute, | ||
* {@link FileMarkerReleaseStrategy} and {@link FileAggregatingMessageGroupProcessor}, respectively. | ||
* <p> | ||
* The default {@link FileSplitter} behavior with markers enabled is do not provide a sequence details | ||
* headers, therefore correlation in this aggregator implementation is done by the {@link FileHeaders#FILENAME} | ||
* header which is still populated by the {@link FileSplitter} for each line emitted, including | ||
* {@link FileSplitter.FileMarker} messages. | ||
* <p> | ||
* If default behavior of this component does not satisfy the target logic, it is recommended to | ||
* configure an aggregator with individual strategies. | ||
* | ||
* @author Artem Bilan | ||
* | ||
* @since 5.5 | ||
*/ | ||
public class FileAggregator implements CorrelationStrategy, ReleaseStrategy, MessageGroupProcessor, BeanFactoryAware { | ||
|
||
private final CorrelationStrategy correlationStrategy = new HeaderAttributeCorrelationStrategy(FileHeaders.FILENAME); | ||
|
||
private final FileMarkerReleaseStrategy releaseStrategy = new FileMarkerReleaseStrategy(); | ||
|
||
private final FileAggregatingMessageGroupProcessor groupProcessor = new FileAggregatingMessageGroupProcessor(); | ||
|
||
@Override | ||
public void setBeanFactory(BeanFactory beanFactory) throws BeansException { | ||
this.groupProcessor.setBeanFactory(beanFactory); | ||
} | ||
|
||
@Override | ||
public Object getCorrelationKey(Message<?> message) { | ||
return this.correlationStrategy.getCorrelationKey(message); | ||
} | ||
|
||
@Override | ||
public boolean canRelease(MessageGroup group) { | ||
return this.releaseStrategy.canRelease(group); | ||
} | ||
|
||
@Override | ||
public Object processMessageGroup(MessageGroup group) { | ||
return this.groupProcessor.processMessageGroup(group); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Copyright 2021 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.file.aggregator; | ||
|
||
import java.util.Collection; | ||
|
||
import org.springframework.integration.aggregator.ReleaseStrategy; | ||
import org.springframework.integration.file.FileHeaders; | ||
import org.springframework.integration.file.splitter.FileSplitter; | ||
import org.springframework.integration.store.MessageGroup; | ||
import org.springframework.messaging.Message; | ||
import org.springframework.messaging.MessageHeaders; | ||
|
||
/** | ||
* A {@link ReleaseStrategy} which makes a decision based on the presence of | ||
* {@link org.springframework.integration.file.splitter.FileSplitter.FileMarker.Mark#END} | ||
* message in the group and its {@link org.springframework.integration.file.FileHeaders#LINE_COUNT} header. | ||
* | ||
* @author Artem Bilan | ||
* | ||
* @since 5.5 | ||
*/ | ||
public class FileMarkerReleaseStrategy implements ReleaseStrategy { | ||
|
||
@Override | ||
public boolean canRelease(MessageGroup group) { | ||
int size = group.size(); | ||
if (size > 1) { // Need more than only a START marker | ||
Collection<Message<?>> messages = group.getMessages(); | ||
for (Message<?> message : messages) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still no good; we can't iterate over the whole collection each time, it just won't scale with large groups. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, but see what we have so far in other place - Nevertheless I agree with your concern and really will address i, but I'd like to do that in the separate PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK; but let's open an issue so it's not forgotten. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you agree with my proposal, I'll start working on that immediately, so this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes; proposal looks good. |
||
if (checkForEndMarker(size, message.getHeaders())) { | ||
return true; | ||
} | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
private boolean checkForEndMarker(int groupSize, MessageHeaders headers) { | ||
if (FileSplitter.FileMarker.Mark.END.name().equals(headers.get(FileHeaders.MARKER))) { | ||
Long lineCount = headers.get(FileHeaders.LINE_COUNT, Long.class); | ||
return lineCount != null && lineCount == groupSize - 2; | ||
} | ||
else { | ||
return false; | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
/** | ||
* Provides support classes for file-based aggregation logic. | ||
*/ | ||
package org.springframework.integration.file.aggregator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we decided not to use Stream APIs in main code flows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... Will rework.
Thanks for the reminder!