Skip to content

Commit

Permalink
Introduce SubscriberDecorator and move decorator interfaces to the ap…
Browse files Browse the repository at this point in the history
…i module

OutgoingInterceptor for intercepting outgoing messages
  • Loading branch information
ozangunalp committed Sep 1, 2022
1 parent 2e0735b commit 49d45d8
Show file tree
Hide file tree
Showing 35 changed files with 838 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
* To register a converter, expose a, generally {@code ApplicationScoped} bean, implementing this interface.
* <p>
* When multiple converters are available, implementation should override the {@link #getPriority()} method.
* The default priority is {@link #CONVERTER_DEFAULT_PRIORITY}. Converters with higher priority are executed first.
* Converters with higher priority (lesser value) are executed first.
* The default priority is {@link #CONVERTER_DEFAULT_PRIORITY}.
*/
@Experimental("SmallRye only feature")
public interface MessageConverter extends Prioritized {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.smallrye.reactive.messaging;

import javax.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;

/**
* Interceptor for outgoing messages on connector channels.
* <p>
* To register an outgoing interceptor, expose a managed bean, implementing this interface,
* and qualified with {@code @Identifier} with the targeted channel name.
* <p>
* Only one interceptor is allowed to be bound for interception per outgoing channel.
* When multiple interceptors are available, implementation should override the {@link #getPriority()} method.
*/
@Experimental("Smallrye-only feature")
public interface OutgoingInterceptor extends Prioritized {

@Override
default int getPriority() {
return -1;
}

/**
* Called before message transmission
*
* @param message message to send
* @return the message to send, possibly mutated
*/
default Message<?> onMessage(Message<?> message) {
return message;
}

/**
* Called after message acknowledgment
*
* @param message acknowledged message
*/
void onMessageAck(Message<?> message);

/**
* Called after message negative-acknowledgement
*
* @param message message to negative-acknowledge
* @param failure failure
*/
void onMessageNack(Message<?> message, Throwable failure);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.smallrye.reactive.messaging;

import org.eclipse.microprofile.reactive.messaging.Message;

/**
* Metadata injected for holding the result of outgoing connector's transmission operation
* <p>
* Connector implementations are responsible for searching for this metadata on the outgoing message
* and setting the transmission result on that metadata object.
* <p>
* Implementations of {@link OutgoingInterceptor} can access the result
* on {@link OutgoingInterceptor#onMessageAck(Message)} callback.
*
* @param <T> type of the transmission result
*/
public class OutgoingMessageMetadata<T> {

private T result;

public static void setResultOnMessage(Message<?> message, Object result) {
message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> m.setResult(result));
}

public void setResult(T result) {
this.result = result;
}

public T getResult() {
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.smallrye.reactive.messaging;

import javax.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;

/**
* SPI to allow extension of publishers (Multi) included in the final graph.
*
* {@code PublisherDecorator}s are invoked higher priority first (from the least value to the greatest).
*
* The decorator priority is obtained with the {@link #getPriority()} method.
* The default priority is {@link #DEFAULT_PRIORITY}.
*/
@Experimental("SmallRye only feature")
public interface PublisherDecorator extends Prioritized {

/**
* Default priority
*/
int DEFAULT_PRIORITY = 1000;

/**
* Decorate a Multi
*
* @param publisher the multi to decorate
* @param channelName the name of the channel to which this publisher publishes
* @param isConnector {@code true} if decorated channel is connector
* @return the extended multi
*/
Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, String channelName,
boolean isConnector);

@Override
default int getPriority() {
return DEFAULT_PRIORITY;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.smallrye.reactive.messaging;

import java.util.List;

import javax.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;

/**
* SPI to allow extension of subscription targets (Multi) included in the final graph.
*
* {@code SubscriberDecorator}s are invoked higher priority first (from the least value to the greatest).
*
* The decorator priority is obtained with the {@link #getPriority()} method.
* The default priority is {@link #DEFAULT_PRIORITY}.
*/
@Experimental("SmallRye only feature")
public interface SubscriberDecorator extends Prioritized {

/**
* Default priority
*/
int DEFAULT_PRIORITY = 1000;

/**
* Decorate a Multi
*
* @param toBeSubscribed the multi to decorate which will be subscribed by this channel
* @param channelName the list of channel names from which this subscriber consumes
* @param isConnector {@code true} if decorated channel is connector
* @return the extended multi
*/
Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> toBeSubscribed, List<String> channelName,
boolean isConnector);

@Override
default int getPriority() {
return DEFAULT_PRIORITY;
}

}
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ nav:
- 'Method Signatures': concepts/signatures.md
- 'Skipping Messages': concepts/skipping.md
- 'Message Converters': concepts/converters.md
- 'Channel Decorators': concepts/decorators.md
- 'Broadcast' : concepts/broadcast.md
- 'Merge channels' : concepts/merge.md
- '@Incomings' : concepts/incomings.md
Expand Down
2 changes: 1 addition & 1 deletion documentation/src/main/docs/concepts/converters.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ message is passed as received.
If multiple suitable converters are present, implementations should
override the `getPriority` method returning the priority. The default
priority is `100`. The converter lookup invokes converters with higher
priority first.
priority (from the least value to the greatest) first.
59 changes: 59 additions & 0 deletions documentation/src/main/docs/concepts/decorators.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Channel Decorators

SmallRye Reactive Messaging supports decorating reactive streams
of incoming and outgoing channels for implementing cross-cutting
concerns such as monitoring, tracing or message interception.

Two symmetrical APIs are proposed for decorating publisher and subscriber channels,
{{ javadoc('io.smallrye.reactive.messaging.PublisherDecorator') }}
and {{ javadoc('io.smallrye.reactive.messaging.SubscriberDecorator') }} respectively.

!!!Important
`@Incoming` channels and channels bound to an outbound connector are both `Subscriber`s.
Conversely `@Outgoing` channels and channels bound to an inbound connector are `Publisher`s.

For example, to provide a decorator which counts consumed messages from incoming connector,
implement a bean exposing the interface `PublisherDecorator`:

``` java
{{ insert('decorators/ConsumedMessageDecorator.java', 'code') }}
```

Decorators' `decorate` method is called only once per channel at application deployment when graph wiring is taking place.
Decorators are very powerful because they receive the stream of messages (Mutiny `Multi<Message<?>>`)
and potentially return a new stream of messages.

Note that if a decorator is available it will be called on every channel.
The `decorate` method receives the channel name and whether the channel is a connector or not as parameters.
Decorators are called ordered from highest to lowest priority (from the least value to the greatest),
obtained using the `javax.enterprise.inject.spi.Prioritized#getPriority` method.

!!!Note
The `SubscriberDecorator` receive a list of channel names because `@Incoming` annotation is repeatable
and consuming methods can be linked to multiple channels.

## Intercepting Outgoing Messages

Decorators can be used to intercept and alter messages, both on incoming and outgoing channels.
Smallrye Reactive Messaging provides a `SubscriberDecorator` by default to allow intercepting outgoing messages for a specific channel.

To provide an outgoing interceptor implement a bean exposing the interface {{ javadoc('io.smallrye.reactive.messaging.OutgoingInterceptor') }}, qualified with a `@Identifier` with the channel name to intercept.
Only one interceptor is allowed to be bound for interception per outgoing channel.
If no interceptors are found with a `@Identifier` but a `@Default` one is available, it is used.
When multiple interceptors are available, the bean with the highest priority is used.

``` java
{{ insert('interceptors/MyInterceptor.java') }}
```

An `OutgoingInterceptor` can implement these three methods:

- `Message<?> onMessage(Message<?> message)` : Called before passing the message to the outgoing connector for transmission.
The message can be altered by returning a new message from this method.
- `void onMessageAck(Message<?> message)` : Called after message acknowledgment.
This callback can access `OutgoingMessageMetadata` which will hold the result of the message transmission to the broker, if supported by the connector. This is only supported by MQTT and Kafka connectors.
- `void onMessageNack(Message<?> message, Throwable failure)` : Called before message negative-acknowledgment.

!!!Note
If you are willing to adapt an incoming message payload to fit a consuming method receiving type,
you can use `MessageConverter`s.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package decorators;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.PublisherDecorator;

// <code>
@ApplicationScoped
public class ConsumedMessageDecorator implements PublisherDecorator {

private final Map<String, AtomicLong> counters = new HashMap<>();

@Override
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, String channelName,
boolean isConnector) {
if (isConnector) {
AtomicLong counter = new AtomicLong();
counters.put(channelName, counter);
return publisher.onItem().invoke(counter::incrementAndGet);
} else {
return publisher;
}
}

@Override
public int getPriority() {
return 10;
}

public long getMessageCount(String channel) {
return counters.get(channel).get();
}
}
// </code>
30 changes: 30 additions & 0 deletions documentation/src/main/java/interceptors/MyInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package interceptors;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.OutgoingInterceptor;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;

@Identifier("channel-a")
@ApplicationScoped
public class MyInterceptor implements OutgoingInterceptor {

@Override
public Message<?> onMessage(Message<?> message) {
return message.withPayload("changed " + message.getPayload());
}

@Override
public void onMessageAck(Message<?> message) {
message.getMetadata(OutgoingMessageMetadata.class)
.ifPresent(m -> m.getResult());
}

@Override
public void onMessageNack(Message<?> message, Throwable failure) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
Expand Down Expand Up @@ -199,7 +200,8 @@ record = getProducerRecord(message, outgoingMetadata, incomingMetadata, actualTo
@SuppressWarnings({ "unchecked", "rawtypes" })
Uni<RecordMetadata> sendUni = client.send((ProducerRecord) record);

Uni<Void> uni = sendUni.onItem().transformToUni(ignored -> {
Uni<Void> uni = sendUni.onItem().transformToUni(recordMetadata -> {
OutgoingMessageMetadata.setResultOnMessage(message, recordMetadata);
log.successfullyToTopic(message, record.topic());
return Uni.createFrom().completionStage(message.ack());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.junit.jupiter.api.Test;

import io.smallrye.metrics.setup.MetricCdiInjectionExtension;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.kafka.base.WeldTestBase;
import io.smallrye.reactive.messaging.providers.PublisherDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.vertx.core.json.Json;
Expand Down Expand Up @@ -91,6 +92,7 @@ private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> reference,
if (f != null) {
return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg));
} else {
OutgoingMessageMetadata.setResultOnMessage(msg, s);
return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg));
}
})
Expand Down
15 changes: 15 additions & 0 deletions smallrye-reactive-messaging-provider/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@
"new": "parameter void io.smallrye.reactive.messaging.providers.wiring.Wiring::prepare(boolean, io.smallrye.reactive.messaging.ChannelRegistry, ===java.util.List<io.smallrye.reactive.messaging.EmitterConfiguration>===, java.util.List<io.smallrye.reactive.messaging.providers.extension.ChannelConfiguration>, java.util.List<io.smallrye.reactive.messaging.MediatorConfiguration>)",
"parameterIndex": "2",
"justification": "EmitterConfiguration moved to api module"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method io.smallrye.mutiny.Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> io.smallrye.reactive.messaging.providers.locals.ContextDecorator::decorate(io.smallrye.mutiny.Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>>, java.lang.String)",
"new": "method io.smallrye.mutiny.Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> io.smallrye.reactive.messaging.providers.locals.ContextDecorator::decorate(io.smallrye.mutiny.Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>>, java.lang.String, boolean)",
"justification": "PublisherDecorator adds 3rd parameter boolean isConnector"
},
{
"ignore": true,
"code": "java.class.noLongerImplementsInterface",
"old": "class io.smallrye.reactive.messaging.providers.locals.ContextDecorator",
"new": "class io.smallrye.reactive.messaging.providers.locals.ContextDecorator",
"interface": "io.smallrye.reactive.messaging.providers.PublisherDecorator",
"justification": "PublisherDecorator moved to api module"
}
]
}
Expand Down
Loading

0 comments on commit 49d45d8

Please sign in to comment.