Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INT-4116: Introduce FileAggregator #3511

Merged
merged 2 commits into from Mar 16, 2021

Conversation

artembilan
Copy link
Member

JIRA: https://jira.spring.io/browse/INT-4116

  • Implement a FileSplitter.FileMarker-based aggregation strategies
    and utilize them in a general FileAggregator component
  • Make HeaderAttributeCorrelationStrategy.attributeName as final; add Assert.notEmpty()
  • Fix AggregatorFactoryBean and AggregatorSpec to parse the provided processor
    for possible CorrelationStrategy and/or ReleaseStrategy
  • Introduce short-cut methods into Java & Kotlin DSL for an aggregate() configuration
  • Introduce a FileHeaders.LINE_COUNT for header to be populated in the FileSplitter.
    We need this info in the FileAggregator to avoid possible overhead with JSON deserialization
    of the FileSplitter.FileMarker messages
  • Test and document the feature
  • Improve FileSplitter doc for code block switch (tabs)

.findAny()
.map((message) -> message.getHeaders().get(FileHeaders.LINE_COUNT, Long.class))
.map((lineCount) -> lineCount == messages.size() - 2)
.orElse(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't scale well, especially with large files; it would be better to add getLast() to message group.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I see your point. It looks like the ReleaseStrategy contract would be better as canRelease(MessageGroup group, Message<?> currentMessage) since we always trigger it whenever we got a new message.
We can deprecate the current contract in favor of new and remove in the next 6.0 😄

But getLast() API in the MessageGroup won't hurt!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, after looking into this closely, the getLast() doesn't feel OK. My test with the filter() and ExecutorChannel confirms that the last one is not always the END marker. So, I would say that canRelease(MessageGroup group, Message<?> currentMessage) makes sense for the main code flow, but forceComplete(MessageGroup) is still need to iterate the whole group, since there is no way to be sure that the last added to the group is exactly a FileMarker message.

At least some optimization is on its way! 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if they go async, all bets are off. If the last marker is processed before the size matches, we'll never release the group.

If we want to support that (async), we'd need another api on the group (e.g. condition1Passed, set to true if we ever received the last marker).

Also, the file lines would be re-assembled out of order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not care about an order of those items, especially when we convert them to some domain objects for further batch inserts into DB. The applySequence on the FileSplitter and Resequencer were always there for us to reassemble an original order.

I need to give the problem you have described more thoughts...

Have a good weekend!

.filter((message) -> !message.getHeaders().containsKey(FileHeaders.MARKER))
.map(Message::getPayload)
.collect(Collectors.toList());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we decided not to use Stream APIs in main code flows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... Will rework.
Thanks for the reminder!

JIRA: https://jira.spring.io/browse/INT-4116

* Implement a `FileSplitter.FileMarker`-based aggregation strategies
and utilize them in a general `FileAggregator` component
* Make `HeaderAttributeCorrelationStrategy.attributeName` as `final`; add `Assert.notEmpty()`
* Fix `AggregatorFactoryBean` and `AggregatorSpec` to parse the provided processor
for possible `CorrelationStrategy` and/or `ReleaseStrategy`
* Introduce short-cut methods into Java & Kotlin DSL for an `aggregate()` configuration
* Introduce a `FileHeaders.LINE_COUNT` for header to be populated in the `FileSplitter`.
We need this info in the `FileAggregator` to avoid possible overhead with JSON deserialization
of the `FileSplitter.FileMarker` messages
* Test and document the feature
* Improve `FileSplitter` doc for code block switch (tabs)
@artembilan
Copy link
Member Author

Pushed a fix without Java Streams.

The feature you are asking would probably like this:

  1. The MessageGroup gets a new property String condition (or groupCondition if some DB vendors has the condition as a key word)
  2. The MessageGroupStore get a new option - coditionSupplier(Function<Message, String>)
  3. When we add a mesasge to the group, we calculate a condition against that message and store it into a group entity
  4. The ReleaseStrategy can then consult this new group property without walking through the whole group.

I suggest to make this condition as a String since it is the only way to have any variety of data structure: JSON, SpEL or just number representation. It is then a ReleaseStrategy responsibility to parse such a condition properly for its purposes. Even if it is SpEL to parse, it is still faster, then load the whole group from DB.

I would suggest to revise this feature in a separate issue/PR: really the proposed solution in the current PR fully reflects what we have so far and what an original JIRA is asking.

Any feedback welcome!

Will take a look into that a bit later: I need to fix failing JMS tests and check TaskScheduler for errorChannel.
Thanks

int size = group.size();
if (size > 1) { // Need more than only a START marker
Collection<Message<?>> messages = group.getMessages();
for (Message<?> message : messages) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still no good; we can't iterate over the whole collection each time, it just won't scale with large groups.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but see what we have so far in other place - SequenceAwareMessageGroup.
Not related to this one, but similar iteration on each added message.

Nevertheless I agree with your concern and really will address i, but I'd like to do that in the separate PR.
See my yesterday's comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK; but let's open an issue so it's not forgotten.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you agree with my proposal, I'll start working on that immediately, so this FileAggregator won't be a bottle neck any more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; proposal looks good.

@garyrussell garyrussell merged commit 4342586 into spring-projects:master Mar 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants