Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INT-4116: Introduce FileAggregator #3511

Merged
merged 2 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,23 +17,25 @@
package org.springframework.integration.aggregator;

import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/**
* Default implementation of {@link CorrelationStrategy}. Uses a header
* attribute to determine the correlation key value.
* Default implementation of {@link CorrelationStrategy}.
* Uses a provided header attribute to determine the correlation key value.
*
* @author Marius Bogoevici
* @author Artem Bilan
*/
public class HeaderAttributeCorrelationStrategy implements CorrelationStrategy {

private String attributeName;
private final String attributeName;


public HeaderAttributeCorrelationStrategy(String attributeName) {
Assert.hasText(attributeName, "the 'attributeName' must not be empty");
this.attributeName = attributeName;
}


public Object getCorrelationKey(Message<?> message) {
return message.getHeaders().get(this.attributeName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.util.function.Function;

import org.aopalliance.aop.Advice;
import org.jetbrains.annotations.Nullable;

import org.springframework.expression.Expression;
import org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor;
Expand Down Expand Up @@ -211,14 +212,15 @@ protected AggregatingMessageHandler createHandler() {
}

AggregatingMessageHandler aggregator = new AggregatingMessageHandler(outputProcessor);

JavaUtils.INSTANCE
.acceptIfNotNull(this.expireGroupsUponCompletion, aggregator::setExpireGroupsUponCompletion)
.acceptIfNotNull(this.sendTimeout, aggregator::setSendTimeout)
.acceptIfNotNull(this.outputChannelName, aggregator::setOutputChannelName)
.acceptIfNotNull(this.lockRegistry, aggregator::setLockRegistry)
.acceptIfNotNull(this.messageStore, aggregator::setMessageStore)
.acceptIfNotNull(this.correlationStrategy, aggregator::setCorrelationStrategy)
.acceptIfNotNull(this.releaseStrategy, aggregator::setReleaseStrategy)
.acceptIfNotNull(obtainCorrelationStrategy(), aggregator::setCorrelationStrategy)
.acceptIfNotNull(obtainReleaseStrategy(), aggregator::setReleaseStrategy)
.acceptIfNotNull(this.groupTimeoutExpression, aggregator::setGroupTimeoutExpression)
.acceptIfNotNull(this.forceReleaseAdviceChain, aggregator::setForceReleaseAdviceChain)
.acceptIfNotNull(this.taskScheduler, aggregator::setTaskScheduler)
Expand All @@ -236,6 +238,28 @@ protected AggregatingMessageHandler createHandler() {
return aggregator;
}

@Nullable
private CorrelationStrategy obtainCorrelationStrategy() {
if (this.correlationStrategy == null && this.processorBean != null) {
CorrelationStrategyFactoryBean correlationStrategyFactoryBean = new CorrelationStrategyFactoryBean();
correlationStrategyFactoryBean.setTarget(this.processorBean);
correlationStrategyFactoryBean.afterPropertiesSet();
return correlationStrategyFactoryBean.getObject();
}
return this.correlationStrategy;
}

@Nullable
private ReleaseStrategy obtainReleaseStrategy() {
if (this.releaseStrategy == null && this.processorBean != null) {
ReleaseStrategyFactoryBean releaseStrategyFactoryBean = new ReleaseStrategyFactoryBean();
releaseStrategyFactoryBean.setTarget(this.processorBean);
releaseStrategyFactoryBean.afterPropertiesSet();
return releaseStrategyFactoryBean.getObject();
}
return this.releaseStrategy;
}

@Override
protected Class<? extends MessageHandler> getPreCreationHandlerType() {
return AggregatingMessageHandler.class;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,7 +69,10 @@ public AggregatorSpec processor(Object target, String methodName) {
return super.processor(target)
.outputProcessor(methodName != null
? new MethodInvokingMessageGroupProcessor(target, methodName)
: new MethodInvokingMessageGroupProcessor(target));
:
(target instanceof MessageGroupProcessor
? (MessageGroupProcessor) target
: new MethodInvokingMessageGroupProcessor(target)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,17 @@ public B aggregate() {
return aggregate(null);
}

/**
* A short-cut for the {@code aggregate((aggregator) -> aggregator.processor(aggregatorProcessor))}
* @param aggregatorProcessor the POJO representing aggregation strategies.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @since 5.5
* @see AggregatorSpec
*/
public B aggregate(Object aggregatorProcessor) {
return aggregate((aggregator) -> aggregator.processor(aggregatorProcessor));
}

/**
* Populate the {@link AggregatingMessageHandler} with provided options from {@link AggregatorSpec}.
* In addition accept options for the integration endpoint using {@link GenericEndpointSpec}.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -718,6 +718,14 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
this.delegate.resequence(resequencer)
}

/**
* A short-cut for the `aggregate { processor(aggregatorProcessor) }`
* @since 5.5
*/
fun aggregate(aggregator: Any) {
this.delegate.aggregate(aggregator)
}

/**
* Populate the [AggregatingMessageHandler] with provided options from [AggregatorSpec].
* In addition accept options for the integration endpoint using [GenericEndpointSpec].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public abstract class FileHeaders {
*/
public static final String MARKER = PREFIX + "marker";

/**
* The line count for END marker message after splitting
*/
public static final String LINE_COUNT = PREFIX + "lineCount";

/**
* A remote file information representation
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.file.aggregator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.store.MessageGroup;
import org.springframework.messaging.Message;

/**
* An {@link AbstractAggregatingMessageGroupProcessor} implementation for file content collecting
* previously splitted by the {@link org.springframework.integration.file.splitter.FileSplitter}
* with the {@code markers} option turned on.
* <p>
* If no file markers present in the {@link MessageGroup}, then behavior of this processor is
* similar to the {@link org.springframework.integration.aggregator.DefaultAggregatingMessageGroupProcessor}.
* <p>
* When no file content (only file markers are grouped), this processor emits an empty {@link ArrayList}.
* Note: with no file content and markers turned off,
* the {@link org.springframework.integration.file.splitter.FileSplitter} doesn't emit any messages
* for possible aggregation downstream.
*
* @author Artem Bilan
*
* @since 5.5
*/
public class FileAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {

@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
Collection<Message<?>> messages = group.getMessages();
List<Object> payloads = new ArrayList<>(messages.size() - 2);
for (Message<?> message : messages) {
if (!message.getHeaders().containsKey(FileHeaders.MARKER)) {
payloads.add(message.getPayload());
}
}
return payloads;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we decided not to use Stream APIs in main code flows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... Will rework.
Thanks for the reminder!


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.file.aggregator;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.store.MessageGroup;
import org.springframework.messaging.Message;

/**
* A convenient component to utilize a {@link FileSplitter.FileMarker}-based aggregation logic.
* Implements all three {@link CorrelationStrategy}, {@link ReleaseStrategy} and {@link MessageGroupProcessor}
* for runtime optimization.
* Delegates to {@link HeaderAttributeCorrelationStrategy} with {@link FileHeaders#FILENAME} attribute,
* {@link FileMarkerReleaseStrategy} and {@link FileAggregatingMessageGroupProcessor}, respectively.
* <p>
* The default {@link FileSplitter} behavior with markers enabled is do not provide a sequence details
* headers, therefore correlation in this aggregator implementation is done by the {@link FileHeaders#FILENAME}
* header which is still populated by the {@link FileSplitter} for each line emitted, including
* {@link FileSplitter.FileMarker} messages.
* <p>
* If default behavior of this component does not satisfy the target logic, it is recommended to
* configure an aggregator with individual strategies.
*
* @author Artem Bilan
*
* @since 5.5
*/
public class FileAggregator implements CorrelationStrategy, ReleaseStrategy, MessageGroupProcessor, BeanFactoryAware {

private final CorrelationStrategy correlationStrategy = new HeaderAttributeCorrelationStrategy(FileHeaders.FILENAME);

private final FileMarkerReleaseStrategy releaseStrategy = new FileMarkerReleaseStrategy();

private final FileAggregatingMessageGroupProcessor groupProcessor = new FileAggregatingMessageGroupProcessor();

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.groupProcessor.setBeanFactory(beanFactory);
}

@Override
public Object getCorrelationKey(Message<?> message) {
return this.correlationStrategy.getCorrelationKey(message);
}

@Override
public boolean canRelease(MessageGroup group) {
return this.releaseStrategy.canRelease(group);
}

@Override
public Object processMessageGroup(MessageGroup group) {
return this.groupProcessor.processMessageGroup(group);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.file.aggregator;

import java.util.Collection;

import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.store.MessageGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/**
* A {@link ReleaseStrategy} which makes a decision based on the presence of
* {@link org.springframework.integration.file.splitter.FileSplitter.FileMarker.Mark#END}
* message in the group and its {@link org.springframework.integration.file.FileHeaders#LINE_COUNT} header.
*
* @author Artem Bilan
*
* @since 5.5
*/
public class FileMarkerReleaseStrategy implements ReleaseStrategy {

@Override
public boolean canRelease(MessageGroup group) {
int size = group.size();
if (size > 1) { // Need more than only a START marker
Collection<Message<?>> messages = group.getMessages();
for (Message<?> message : messages) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still no good; we can't iterate over the whole collection each time, it just won't scale with large groups.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but see what we have so far in other place - SequenceAwareMessageGroup.
Not related to this one, but similar iteration on each added message.

Nevertheless I agree with your concern and really will address i, but I'd like to do that in the separate PR.
See my yesterday's comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK; but let's open an issue so it's not forgotten.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you agree with my proposal, I'll start working on that immediately, so this FileAggregator won't be a bottle neck any more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; proposal looks good.

if (checkForEndMarker(size, message.getHeaders())) {
return true;
}
}
}
return false;
}

private boolean checkForEndMarker(int groupSize, MessageHeaders headers) {
if (FileSplitter.FileMarker.Mark.END.name().equals(headers.get(FileHeaders.MARKER))) {
Long lineCount = headers.get(FileHeaders.LINE_COUNT, Long.class);
return lineCount != null && lineCount == groupSize - 2;
}
else {
return false;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Provides support classes for file-based aggregation logic.
*/
package org.springframework.integration.file.aggregator;
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,14 @@ private AbstractIntegrationMessageBuilder<Object> markerToReturn(FileMarker file
else {
payload = fileMarker;
}
return getMessageBuilderFactory().withPayload(payload)
.setHeader(FileHeaders.MARKER, fileMarker.mark.name());
AbstractIntegrationMessageBuilder<Object> messageBuilder =
getMessageBuilderFactory()
.withPayload(payload)
.setHeader(FileHeaders.MARKER, fileMarker.mark.name());
if (Mark.END.equals(fileMarker.mark)) {
messageBuilder.setHeader(FileHeaders.LINE_COUNT, fileMarker.lineCount);
}
return messageBuilder;
}

@Override
Expand Down