Skip to content

Commit

Permalink
Merge d42cb73 into 2e0735b
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Aug 26, 2022
2 parents 2e0735b + d42cb73 commit a9651d9
Show file tree
Hide file tree
Showing 32 changed files with 758 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.smallrye.reactive.messaging;

import javax.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;

/**
* Interceptor for outgoing messages on connector channels.
* <p>
* To register an outgoing interceptor, expose a managed bean, implementing this interface,
* and qualified with {@code @Identifier} with the targeted channel name.
* <p>
* Only one interceptor is allowed to be bound for interception per outgoing channel.
* When multiple interceptors are available, implementation should override the {@link #getPriority()} method.
*/
@Experimental("Smallrye-only feature")
public interface OutgoingInterceptor extends Prioritized {

@Override
default int getPriority() {
return -1;
}

/**
* Called before message transmission
*
* @param message message to send
* @return the message to send, possibly mutated
*/
default Message<?> onMessage(Message<?> message) {
return message;
}

/**
* Called after message acknowledgment
*
* @param message acknowledged message
*/
void onMessageAck(Message<?> message);

/**
* Called after message negative-acknowledgement
*
* @param message message to negative-acknowledge
* @param failure failure
*/
void onMessageNack(Message<?> message, Throwable failure);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.smallrye.reactive.messaging;

import org.eclipse.microprofile.reactive.messaging.Message;

/**
* Metadata injected for holding the result of outgoing connector's transmission operation
* <p>
* Connector implementations are responsible for searching for this metadata on the outgoing message
* and setting the transmission result on that metadata object.
* <p>
* Implementations of {@link OutgoingInterceptor} can access the result
* on {@link OutgoingInterceptor#onMessageAck(Message)} callback.
*
* @param <T> type of the transmission result
*/
public class OutgoingMessageMetadata<T> {

private T result;

public static void setResultOnMessage(Message<?> message, Object result) {
message.getMetadata(OutgoingMessageMetadata.class).ifPresent(m -> m.setResult(result));
}

public void setResult(T result) {
this.result = result;
}

public T getResult() {
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
package io.smallrye.reactive.messaging.providers;
package io.smallrye.reactive.messaging;

import javax.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;

/**
* SPI to allow extension of publishers (Multi) included in the final graph
*/
public interface PublisherDecorator {
@Experimental("SmallRye only feature")
public interface PublisherDecorator extends Prioritized {

@Override
default int getPriority() {
return -1;
}

/**
* Decorate a Multi
*
* @param publisher the multi to decorate
* @param channelName the name of the channel to which this publisher publishes
* @param isConnector {@code true} if decorated channel is connector
* @return the extended multi
*/
Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, String channelName);
Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, String channelName,
boolean isConnector);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.smallrye.reactive.messaging;

import java.util.List;

import javax.enterprise.inject.spi.Prioritized;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;

/**
* SPI to allow extension of subscription targets (Multi) included in the final graph
*/
@Experimental("SmallRye only feature")
public interface SubscriberDecorator extends Prioritized {

@Override
default int getPriority() {
return -1;
}

/**
* Decorate a Multi
*
* @param toBeSubscribed the multi to decorate which will be subscribed by this channel
* @param channelName the list of channel names from which this subscriber consumes
* @param isConnector {@code true} if decorated channel is connector
* @return the extended multi
*/
Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> toBeSubscribed, List<String> channelName,
boolean isConnector);

}
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ nav:
- 'Method Signatures': concepts/signatures.md
- 'Skipping Messages': concepts/skipping.md
- 'Message Converters': concepts/converters.md
- 'Channel Decorators': concepts/decorators.md
- 'Broadcast' : concepts/broadcast.md
- 'Merge channels' : concepts/merge.md
- '@Incomings' : concepts/incomings.md
Expand Down
57 changes: 57 additions & 0 deletions documentation/src/main/docs/concepts/decorators.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Channel Decorators

SmallRye Reactive Messaging supports decorating reactive streams
of incoming and outgoing channels for implementing cross-cutting
concerns such as monitoring, tracing or message interception.

Two symmetrical APIs are proposed for decorating publisher and subscriber channels,
{{ javadoc('io.smallrye.reactive.messaging.PublisherDecorator') }}
and {{ javadoc('io.smallrye.reactive.messaging.SubscriberDecorator') }} respectively.

!!!Important
`@Incoming` channels and channels bound to an outbound connector are both `Subscriber`s.
Conversely `@Outgoing` channels and channels bound to an inbound connector are `Publisher`s.

For example, to provide a decorator which counts consumed messages from incoming connector,
implement a bean exposing the interface `PublisherDecorator`:

``` java
{{ insert('decorators/ConsumedMessageDecorator.java', 'code') }}
```

Decorators' `decorate` method is called only once at application deployment when graph wiring is taking place.
Decorators are very powerful because they receive the stream of messages (Mutiny `Multi<Message<?>>`)
and potentially return a new stream of messages.

Note that if a decorator is available it will be called on every channel.
The `decorate` method receives the channel name and whether the channel is a connector or not as parameters.

!!!Note
The `SubscriberDecorator` receive a list of channel names because `@Incoming` annotation is repeatable
and consuming methods can be linked to multiple channels.

## Intercepting Outgoing Messages

Decorators can be used to intercept and alter messages, both on incoming and outgoing channels.
Smallrye Reactive Messaging provides a `SubscriberDecorator` by default to allow intercepting outgoing messages for a specific channel.

To provide an outgoing interceptor implement a bean exposing the interface {{ javadoc('io.smallrye.reactive.messaging.OutgoingInterceptor') }}, qualified with a `@Identifier` with the channel name to intercept.
Only one interceptor is allowed to be bound for interception per outgoing channel.
If no interceptors are found with a `@Identifier` but a `@Default` one is available, it is used.
When multiple interceptors are available, the bean with the highest priority is used.

``` java
{{ insert('interceptors/MyInterceptor.java') }}
```

An `OutgoingInterceptor` can implement these three methods:

- `Message<?> onMessage(Message<?> message)` : Called before passing the message to the outgoing connector for transmission.
The message can be altered by returning a new message from this method.
- `void onMessageAck(Message<?> message)` : Called after message acknowledgment.
This callback can access `OutgoingMessageMetadata` which will hold the result of the message transmission to the broker, if supported by the connector. This is only supported by MQTT and Kafka connectors.
- `void onMessageNack(Message<?> message, Throwable failure)` : Called before message negative-acknowledgment.

!!!Note
If you are willing to adapt an incoming message payload to fit a consuming method receiving type,
you can use `MessageConverter`s.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package decorators;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.PublisherDecorator;

// <code>
@ApplicationScoped
public class ConsumedMessageDecorator implements PublisherDecorator {

private final Map<String, AtomicLong> counters = new HashMap<>();

@Override
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, String channelName,
boolean isConnector) {
if (isConnector) {
AtomicLong counter = new AtomicLong();
counters.put(channelName, counter);
return publisher.onItem().invoke(counter::incrementAndGet);
} else {
return publisher;
}
}

@Override
public int getPriority() {
return 10;
}

public long getMessageCount(String channel) {
return counters.get(channel).get();
}
}
// </code>
30 changes: 30 additions & 0 deletions documentation/src/main/java/interceptors/MyInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package interceptors;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.OutgoingInterceptor;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;

@Identifier("channel-a")
@ApplicationScoped
public class MyInterceptor implements OutgoingInterceptor {

@Override
public Message<?> onMessage(Message<?> message) {
return message.withPayload("changed " + message.getPayload());
}

@Override
public void onMessageAck(Message<?> message) {
message.getMetadata(OutgoingMessageMetadata.class)
.ifPresent(m -> m.getResult());
}

@Override
public void onMessageNack(Message<?> message, Throwable failure) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
Expand Down Expand Up @@ -199,7 +200,8 @@ record = getProducerRecord(message, outgoingMetadata, incomingMetadata, actualTo
@SuppressWarnings({ "unchecked", "rawtypes" })
Uni<RecordMetadata> sendUni = client.send((ProducerRecord) record);

Uni<Void> uni = sendUni.onItem().transformToUni(ignored -> {
Uni<Void> uni = sendUni.onItem().transformToUni(recordMetadata -> {
OutgoingMessageMetadata.setResultOnMessage(message, recordMetadata);
log.successfullyToTopic(message, record.topic());
return Uni.createFrom().completionStage(message.ack());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.junit.jupiter.api.Test;

import io.smallrye.metrics.setup.MetricCdiInjectionExtension;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.kafka.base.WeldTestBase;
import io.smallrye.reactive.messaging.providers.PublisherDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.vertx.core.json.Json;
Expand Down Expand Up @@ -91,6 +92,7 @@ private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> reference,
if (f != null) {
return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg));
} else {
OutgoingMessageMetadata.setResultOnMessage(msg, s);
return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg));
}
})
Expand Down
15 changes: 15 additions & 0 deletions smallrye-reactive-messaging-provider/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@
"new": "parameter void io.smallrye.reactive.messaging.providers.wiring.Wiring::prepare(boolean, io.smallrye.reactive.messaging.ChannelRegistry, ===java.util.List<io.smallrye.reactive.messaging.EmitterConfiguration>===, java.util.List<io.smallrye.reactive.messaging.providers.extension.ChannelConfiguration>, java.util.List<io.smallrye.reactive.messaging.MediatorConfiguration>)",
"parameterIndex": "2",
"justification": "EmitterConfiguration moved to api module"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method io.smallrye.mutiny.Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> io.smallrye.reactive.messaging.providers.locals.ContextDecorator::decorate(io.smallrye.mutiny.Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>>, java.lang.String)",
"new": "method io.smallrye.mutiny.Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> io.smallrye.reactive.messaging.providers.locals.ContextDecorator::decorate(io.smallrye.mutiny.Multi<? extends org.eclipse.microprofile.reactive.messaging.Message<?>>, java.lang.String, boolean)",
"justification": "PublisherDecorator adds 3rd parameter boolean isConnector"
},
{
"ignore": true,
"code": "java.class.noLongerImplementsInterface",
"old": "class io.smallrye.reactive.messaging.providers.locals.ContextDecorator",
"new": "class io.smallrye.reactive.messaging.providers.locals.ContextDecorator",
"interface": "io.smallrye.reactive.messaging.providers.PublisherDecorator",
"justification": "PublisherDecorator moved to api module"
}
]
}
Expand Down
Loading

0 comments on commit a9651d9

Please sign in to comment.