Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support an Abstract(Reactive)MessageHandler#handleMessageInternal that will accept List<Message<?>> #3797

Closed
migroskub opened this issue May 11, 2022 · 8 comments · Fixed by #3820

Comments

@migroskub
Copy link
Contributor

Expected Behavior
I'd like there to be an interface called Abstract(Reactive)BulkMessageHandler which will have a handleMessageInternal that will accept List<Message<?>>.

Current Behavior
Most data flows these days are getting written with bulk-based data interactions. Spring Integration is awesome since it lets the user work with stream-based data and process messages one by one. But when it comes to writing the data to the databases, most people would want their application to have a sain performance and therefore they'll want to reduce the number of network requests, by using bulk writes/updates/ upserts. This is the ideal way for writing data in like 90% of the applications (except for Kafka which will do it itself/ S3 which is another story). Therefore I'd like a nice way of writing bulk data to data sources. The outbound channel adapter's interface these days has the Abstract(Reactive)MessageHandler#handleMessageInternal function, which accepts Message<?>. In the (low number of) channel adapters that supports bulks in Spring Integration, there's a check for instance of List or something similar, since they expect Message<List<?>>. This prevents me from using headers in my Message<?> since I want (and I guess most of the applications are the same) the headers to get added for each message. After I'm grouping to List<Message<?>>, I can hack and create a new message of Message<List<Message<?>>>. But I think it's ugly and looks like an effort for just using the interface, instead of adding another interface.

Context
I'd like to use and write bulk channel adapters.

@migroskub migroskub added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels May 11, 2022
@artembilan
Copy link
Member

Well, that's not how messaging works.
The message is a first class citizen and represents an abstraction to move the data between endpoints.
Single message is sent to the channel - single message is consumed on the other side. And in the end single message is propagated to the message handler.
This is also not what EIP state for: the messaging system is fully agnostic what is the content of the message, but it still a single message where the payload of the message is out of messaging scope.
It is really up to your implementation details to have this or that payload in the message. And in the end it is your handler responsibility to process that payload properly.
Also: the messaging is mostly stateless and one message must not have affect for others in the flow.
The Message<List<Message<?>>> is possible, but still as a target solution implementation details. Although it might be an abuse to have items as Message. Perhaps better to collect some business specific records in the aggregator if you still wish to handle data as a bulk in your final message handler.
This is exactly what is done by the PutRecordsRequest in AWS Kinesis client. The message as a record might fully not have any meaning for the target system you are going to write that batch.

Does it make sense?

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels May 11, 2022
@pojacks
Copy link

pojacks commented Jun 2, 2022

I agree with the idea. I think that there's should be some util funciton that accept List<Message<?>> (which is what has been returned from any aggregation) and returns Message<List<?>> that we can inject to the handleMessage().

A problem that currently can occur with the current architecture is that there are headers in the Message that are needed for error handling. Since when we pass Message<List<?>> to the message handler and we get an error we won't be able to access the headers that each message had.

@artembilan
Copy link
Member

@pojacks ,

did you see my explanation above why it is not like that and why it cannot be implemented as an out-of-the-box feature?
The messaging just does not care about the payload. So, it is fully up to you to have it like this Message<List<Message<?>>> and then your POJO method can accept an argument like List<Message<?>>.
Not sure what an out-of-the-box component would do with this since the standard MessageHandler is Message<?> message. So, the payload could be anything and it is up to target impl to decide what the payload type it would expect.

@pojacks
Copy link

pojacks commented Jun 2, 2022

@artembilan yes I do understand this, but the framework still can supply some opinionated utils.

Anyway, what do you think about the use case of the need for the Headers in the error handling methods? Doesn't it problematic to use channel adapters that except Message<Iterable<?>> and won't support error handling this way?

By saying channel adapters that except Message<Iterable<?>> I reference channel adapters like JDBC that work different when the payload is an Iterable.

My use case is that I put the Reactor Kafka's ReceiverOffsets in the header of each message so I'd be able to acknowledge the message after processing / after handling failures. This is currently not possible to do when I write bulks of data through iterables based channel adapters since on error cases I'll get the error for the whole iterable, and since the bulk isn't a bulk of Iterable<Message<?>> (but of Iterable<?>), I'll won't be able to access ReceiverOffsets and commit.

Or is there any other way of committing to Kafka that maybe I should use?

@artembilan
Copy link
Member

Well, that info still can be stored in the payload - in the item of your collection. Not necessary a Message.
Although, even if a Message that JdbcMessageHandler can deal with such a payload: its SqlParameterSource algorithm is not changed unless you need to keep in mind that with this:

								.map(payload -> new Message<Object>() {

									@Override
									public Object getPayload() {
										return payload;
									}

									@Override
									public MessageHeaders getHeaders() {
										return message.getHeaders();
									}

								});

That return payload; is the whole Message<?> from the item of your collection in the payload. So, the SqlParameterSourceFactory must be adjusted respectively.

Anyway I see your point and I'm sure that this JdbcMessageHandler has to be fixed with assumption that item of the Iterable could be a Message<?> by itself and don't create a new one.

But again: this is not what could look like a general handle(List<Message<?>>) requested originally.
There is just no way to have it as a top API where we know that Message<?> is a unit of work everywhere in the messaging and can also carry an Iterable<Message<?>> as its payload not breaking any messaging contract.

@artembilan
Copy link
Member

I still think that it is not the messaging abstraction responsibility to be aware of the target handler expectations.
Plus it goes really out of the scope of the messaging how to build a Collection<Message<?>>: how many of them to bulk, or how often to emit some window, where to store individual messages in between etc?..
The aggregator component is the good one to deal with all of these stuff, but it has a flaw to emit releases messages individually from the collection.
And we say that exactly in respective JavaDocs:

/**
 * A {@link MessageGroupProcessor} that simply returns the messages in the group.
 * It can be used to configure an aggregator as a barrier, such that when the group
 * is complete, the grouped messages are released as individual messages.
 *
 * @author Gary Russell
 * @since 4.2
 *
 */
public class SimpleMessageGroupProcessor implements MessageGroupProcessor {

	@Override
	public Object processMessageGroup(MessageGroup group) {
		return group.getMessages();
	}

}

So, I'm going to fix the AbstractMessageProducingHandler:

	protected boolean shouldSplitOutput(Iterable<?> reply) {
		for (Object next : reply) {
			if (next instanceof Message<?> || next instanceof AbstractIntegrationMessageBuilder<?>) {
				return true;
			}
		}
		return false;
	}

For extra option where we won't split, but rather emit a single message with the whole released group.
Such a single Message<Collection<Message<?>>> is going to travel through the flow via some message channel and endpoints until it reaches the bulk-aware MessageHandler like the mentioned JdbcMessageHandler.
This one is going to be fixed as I explained in the previous comment.

The possible end-user POJO method for similar scenario is able to deal with List<Message<?>> data even right now.

The FluxAggregatorMessageHandler does produce a single Mono<Message<Flux<Message<?>>>> already now by default: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#flux-aggregator.

So, downstream POJO method could have an argument like Flux<Message<?>> and you are good to go with possible bulk processing and so on.

What else am I missing from your use-cases, please?

@migroskub
Copy link
Contributor Author

migroskub commented Jun 3, 2022

@artembilan @pojacks I’m not so sure how does the current error handlers work, but doesn’t it just create a new Message<? extends Exception> with the error inside the payload? If so, there’s no way of handling these messages anyway (no way of using the payload for the log, nor no way of performing an acknowledgment like @pojacks wish). Am I wrong with describing the current strategy?

This interesting conversation becomes a little out of scope of this issue (that tries to emphasize the need for supporting bulk operations (and if not as the parameter, so maybe by examples in the docs).

Another point I think that’s relevant for this conversation is that the docs aren’t clear when it comes to the error handling part. I think the responses you’ll both give about the error handling should be added to the docs. And as I’ve said about the bulks support, the docs lack with example for error handling. I’d really like to see an error handler example here.

@artembilan anyway I think you suggestion for the return type of the aggregations is a good idea (and should appear in the docs).

@artembilan
Copy link
Member

Right. I'll document the feature for an aggregator.
And yes, I guess our docs for error handling need to be improved.

If your original message has a payload as List<Message<?>>, then when error happens a MessagingException is thrown with that original message as its failedMessage property.
If the process is async, this MessagingException is wrapped into an ErrorMessage and sent to the error channel. So, on the error handling side you got all the information you needed to process the error for the whole batch.
You just need to cast the payload into this MessagingException and call its getFailedMessage().getPayload() to get access to you original List<Message<?>> payload.
And again: this is out of messaging scope. This is just your implementation details you need to deal with this or other way.
I doubt the Framework can come up with some strategy to bring an opinion.
Although I still agree that this way for error data propagation should be documented.

@artembilan artembilan added in: jdbc and removed status: waiting-for-reporter Needs a feedback from the reporter labels Jun 6, 2022
@artembilan artembilan added this to the 6.0.0-M4 milestone Jun 6, 2022
artembilan added a commit to artembilan/spring-integration that referenced this issue Jun 6, 2022
Fixes spring-projects#3797

* Handle `Message` items of the `Iterable` payload properly in the `JdbcMessageHandler`.
Otherwise, they've been wrapped into an extra `Message`
* Produce a single message with a `Collection<Message<?>>` payload in the `AggregatingMessageHandler`
when the `getOutputProcessor()` is not an instance of `SimpleMessageGroupProcessor`
* Mention these changes in docs
* Point to the error handling sample from docs
garyrussell added a commit that referenced this issue Jun 7, 2022
* GH-3797: Improve batch processing in the framework

Fixes #3797

* Handle `Message` items of the `Iterable` payload properly in the `JdbcMessageHandler`.
Otherwise, they've been wrapped into an extra `Message`
* Produce a single message with a `Collection<Message<?>>` payload in the `AggregatingMessageHandler`
when the `getOutputProcessor()` is not an instance of `SimpleMessageGroupProcessor`
* Mention these changes in docs
* Point to the error handling sample from docs

* * Fix language in docs

Co-authored-by: Gary Russell <grussell@vmware.com>

Co-authored-by: Gary Russell <grussell@vmware.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants