Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate and replace MessageConverter with reactive IncomingMessageConverter #1772

Closed
wants to merge 3 commits into from

Conversation

kdubb
Copy link
Contributor

@kdubb kdubb commented Jun 16, 2022

Fixes #1769

IncomingMessageConverter / MessageConverter

Deprecate MesasgeConverter

MessageConverter's interface is synchronous which does not allow it to handle failures in any meaningful way. First, if message conversion fails the interface requires it to return something which cannot currently be null; meaning there is no current mechanism to "ignore" a failed message. Second, it is required that MessageConverters handle calling ack or nack for a failure. Unfortunately those methods return CompletionStages. Converters must ignore the results of ack/nack or perform some sort of blocking wait for them to complete.

After talking with @cescoffier we decided changing the interface to use a reactive method would be best.

Introducing IncomingMessageConverter

IncomingMessageConverter is the same interface but returns a Uni<Message<?>>. Changing its name serves two purposes. 1) The new name allows/prepares for outgoing message converters (as discussed in issues like #1162). 2) It allows continued use of (now deprecated) MessageConverter until such time as it is removed.

Adapter AnyMessageConverter

To facilitate using both IncomingMessageConverter and MessageConverter a new base interface AnyMessageConverter was added. It is marked deprecated along with MessageConverter and when it is decided to remove MessageConverter, AnyMessageConverter will be removed as well.

ConverterUtils Changes

Wrapping Legacy Converters

Calls to the deprecated MessageConverter.convert are now wrapped with failure handling that logs the failure, rejects (aka nacks) and ignores the message. This solves issues with stalling of message handling (e.g. #1769) since failures cannot be handled by these converters.

Wrapping is not done for IncomingMessageConverter, now that the interface provides the full capabilities to handle failures gracefully it is expected that they will. All failures from streams returned from IncomingMessageConverter are ignored without logging or ack/nack (this is explained in the interface's API docs).

canConvert

Previously due to caching of converters, the canCovert method was only ever called one time, when the converter was cached. This has the following problems:

  1. The interface's API docs say that it will be called every time before convert is called.
  2. The call to canConvert is passed an instance of Message<?> to make its decision but each call to convert will be passed a unique instance of Message<?> which may not be anything like the message passed to canCovert.

For these reasons canConvert is now always called (as the API docs already suggest) even after converter caching. If a converter cannot be used with the current message instance a new converter will be cached. For converters that see the same exact messages all the time there is little to no penalty. Additionally converters that may see a wide range of message types will now work correctly.

No Identity converter

The ConverterUtils.convert method checks for the ability to assign the payload type in the message to the type target payload type required by the injection target. There is no need to lookup and use an identity converter since the message payload is already in a form the injection target can use.

RabbitMQ Conversion

IncomingRabbitMQMessage's ad-hoc conversion in IncomingRabbitMQMessage.getPayload() has been replaced with a standard set of converts that replicate the previous behavior but with better error handling and more deterministic logic.

Additionally this removes the content-type-override for RabbitMQ incoming connectors. It was used as a hack to make conversion happen on unsupported content types. This should be done in an IncomingMessageConverter, with proper error handling, whenever this functionality is needed.

@kdubb
Copy link
Contributor Author

kdubb commented Jun 16, 2022

@cescoffier As discussed in the Zulip chat. There are a 5 or 6 failing Kafka tests that seem to be a result of the now required onItem().transformtoUniAndConcatenate() used during conversion.

@codecov-commenter
Copy link

Codecov Report

Merging #1772 (be40a56) into main (35fdf68) will decrease coverage by 61.33%.
The diff coverage is 0.00%.

Impacted file tree graph

@@              Coverage Diff              @@
##               main    #1772       +/-   ##
=============================================
- Coverage     73.83%   12.49%   -61.34%     
+ Complexity     3145      441     -2704     
=============================================
  Files           275      276        +1     
  Lines         11228    11238       +10     
  Branches       1437     1432        -5     
=============================================
- Hits           8290     1404     -6886     
- Misses         2249     9645     +7396     
+ Partials        689      189      -500     
Impacted Files Coverage Δ
...allrye/reactive/messaging/AnyMessageConverter.java 0.00% <0.00%> (ø)
.../smallrye/reactive/messaging/MessageConverter.java 0.00% <ø> (ø)
...reactive/messaging/providers/AbstractMediator.java 0.00% <ø> (-79.46%) ⬇️
...messaging/providers/extension/ChannelProducer.java 0.00% <ø> (-78.88%) ⬇️
...messaging/providers/extension/MediatorManager.java 0.00% <ø> (-82.56%) ⬇️
...ve/messaging/providers/helpers/ConverterUtils.java 0.00% <0.00%> (-63.64%) ⬇️
...tive/messaging/providers/i18n/ProviderLogging.java 0.00% <ø> (-100.00%) ⬇️
...allrye/reactive/messaging/health/HealthReport.java 0.00% <0.00%> (-100.00%) ⬇️
...ipse/microprofile/reactive/messaging/Metadata.java 0.00% <0.00%> (-100.00%) ⬇️
...tive/messaging/ce/impl/BaseCloudEventMetadata.java 0.00% <0.00%> (-100.00%) ⬇️
... and 221 more

This covers issues with message conversion failures related to both MANUAL and POST_PROCESSING ack strategies.
### IncomingMessageConverter / MessageConverter
* Introduces IncomingMessageConverter which as a reactive interface
* * Naming allows/prepares for outgoing message converters as well
* Deprecates MessageConverter

### ConverterUtils
* `canCovert` is now always called (as the API docs suggest) even after converter caching
*  Identity converter is no longer used; compatbile payloads and target types use go through no conversion
* Handles `MessageConverter.convert` failures
* * Wrapped with logging and automatic message rejection

Fixes smallrye#1769
The ad-hoc conversion that was happening in `IncomingRabbitMQMessage.getPayload()` has been removed and replaced with a standard set of `IncomingMessageConverter`s that support JSON, text and byte arrays; the same that was attempted in the getPayload.

This ensures conversion only happens once (instead of every time getPayload is called) and that errors don’t happen or are handled properly when the do.

The removes `content-type-override` option. It was a hack to get the ad-hoc conversion to treat messages as some other type. This should be handled in an `IncomingMessageConverter` with proper error handling (which wasn’t done before.
@ozangunalp
Copy link
Collaborator

Closing this as improvements were brought in with #1837 and #1856

@ozangunalp ozangunalp closed this Nov 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MessageConverter failures stall processing indefinitely with manual/post-processing acknowledgment
3 participants