Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decorator API for Publisher & Subscribers, Outgoing Message Interceptors #1856

Merged
merged 1 commit into from
Sep 2, 2022

Conversation

ozangunalp
Copy link
Collaborator

There have been previous discussions on message interceptors #1141

  • Using CDI interceptors for Reactive Messaging is not possible as consuming methods callbacks or stream processors are not called through CDI.
  • Using CDI decorators was discussed for intercepting calls to Emitters.
  • Another use case is to intercept outgoing messages for adapting outgoing payload and intercepting transmission result Support intercepting and converting outgoing messages #1162.
  • Interceptors need a way to access some sort of call context: i.e incoming message trace or @RequestScoped beans. More recently there is a need to improve OpenTelemetry support by propagating traces through RM streams OpenTelemetry support enhancement #1784.

An internal PublisherDecorator extension point already exists for incoming channels, which is used for metrics and Vert.x context association.

This PR

  • Moves the existing PublisherDecorator to the API module
  • Adds a SubscriberDecorator for decorating Multis that are sources for subscribers in the channel graph.
  • Both decorators are prioritized and the graph wiring will call them from highest to lowest priority.
  • They receive the channel name and isConnector flag for distinguishing channels bound to a connector.
  • Introduces a default implementation of SubscriberDecorator which discovers OutgoingInterceptor beans for attaching interceptors to channels.
  • OutgoingInterceptors can alter outgoing messages with onMessage callback, and when supported by the connector (Kafka and MQTT) can access transmission results withonMessageAck callback.

@codecov-commenter
Copy link

codecov-commenter commented Aug 26, 2022

Codecov Report

Merging #1856 (a9651d9) into main (9800690) will increase coverage by 3.33%.
The diff coverage is 84.79%.

❗ Current head a9651d9 differs from pull request most recent head d42cb73. Consider uploading reports for the commit d42cb73 to get more accurate results

Impacted file tree graph

@@             Coverage Diff              @@
##               main    #1856      +/-   ##
============================================
+ Coverage     73.69%   77.03%   +3.33%     
- Complexity     3147     3214      +67     
============================================
  Files           276      280       +4     
  Lines         11261    10951     -310     
  Branches       1437     1405      -32     
============================================
+ Hits           8299     8436     +137     
+ Misses         2271     1834     -437     
+ Partials        691      681      -10     
Impacted Files Coverage Δ
...allrye/reactive/messaging/OutgoingInterceptor.java 0.00% <0.00%> (ø)
...ye/reactive/messaging/OutgoingMessageMetadata.java 0.00% <0.00%> (ø)
...mallrye/reactive/messaging/PublisherDecorator.java 0.00% <0.00%> (ø)
...allrye/reactive/messaging/SubscriberDecorator.java 0.00% <0.00%> (ø)
...ye/reactive/messaging/kafka/i18n/KafkaLogging.java 100.00% <ø> (ø)
...ive/messaging/kafka/impl/ConfigurationCleaner.java 94.44% <ø> (+26.44%) ⬆️
...e/messaging/providers/metrics/MetricDecorator.java 88.88% <ø> (ø)
...ssaging/providers/metrics/MicrometerDecorator.java 100.00% <ø> (ø)
...reactive/messaging/providers/helpers/CDIUtils.java 72.72% <72.72%> (ø)
...active/messaging/kafka/impl/KafkaRecordHelper.java 78.57% <78.57%> (ø)
... and 49 more

@Ladicek
Copy link
Collaborator

Ladicek commented Aug 26, 2022

I didn't review properly, but I'd suggest to avoid extending Prioritized. Instead, users should annotate an interceptor with @Priority. That's consistent with how CDI interceptors are declared. Also, invocation order of interceptors should be from lowest priority value to highest priority value, to be consistent with CDI interceptors (see https://jakarta.ee/specifications/interceptors/2.0/interceptors-spec-2.0.html#use_of_the_priority_annotation_in_ordering_interceptors).

EDIT: my bad, I understood that multiple interceptors can apply. If only one of them can apply, it should indeed be the one with the highest priority value.

EDIT2: ok so these classes are actually all beans, so perhaps it would be best to leave instance selection to CDI completely. It has means to resolve ambiguity based on priority already (though having to declare @Alternative may be seen as annoying).

@cescoffier cescoffier changed the title Decorator API for Publisher & Subscibers, Outgoing Message Interceptors Decorator API for Publisher & Subscribers, Outgoing Message Interceptors Aug 31, 2022
@ozangunalp ozangunalp marked this pull request as ready for review September 1, 2022 08:37
…i module

OutgoingInterceptor for intercepting outgoing messages
@ozangunalp
Copy link
Collaborator Author

During my tests I couldn't reproduce CDI ordering the instances by default from the @Inject @Any Instance<A> instances injection.
I changed the doc to match the implementation, PublisherDecorators, SubscriberDecorators and OutgoingInterceptors invoked ordered from highest priority to lowest (from the least priority value to the greatest). It is the case for MessageConverters, I added a clarification to the documentation as well.

I see that WeldInstance does come with an instance comparator taking into account both Prioritized and @Priority. But we can't use it. Therefore for the sake of simplicity I am keeping only the Prioritized to describe the bean priority.

@Ladicek
Copy link
Collaborator

Ladicek commented Sep 1, 2022

Ah, right, iterating through Instance doesn't consider priority. Weld has proprietary means to do so, and ArC does guarantee order, but that's also a proprietary extension. You can use CDI mechanisms directly only when you need a single instance out of multiple. Oh well.

Copy link
Contributor

@cescoffier cescoffier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants