Skip to content

Commit

Permalink
Bump Smallrye RM from 4.12.0 to 4.13.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Dec 22, 2023
1 parent c7efbbe commit c4c8b6e
Show file tree
Hide file tree
Showing 14 changed files with 463 additions and 15 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.7.2</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.12.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>4.13.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.4.0</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down
124 changes: 124 additions & 0 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ The partitions of the subscribed topic will be divided among the consumer thread
Note that if the `partitions` value exceed the number of partitions of the topic, some consumer threads won't be assigned any partitions.

Check warning on line 497 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 497, "column": 114}}}, "severity": "INFO"}
+
image::kafka-one-app-two-consumers.png[alt=Architecture, width=60%, align=center]
+
An alternative way to achieve concurrent consumers is using the

Check warning on line 501 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 501, "column": 54}}}, "severity": "INFO"}
https://smallrye.io/smallrye-reactive-messaging/latest/concepts/incoming-concurrency/[concurrency attribute].

. *Multiple consumer applications inside a consumer group*
+
Expand Down Expand Up @@ -658,6 +661,36 @@ mp.messaging.incoming.your-channel.group.id=${quarkus.uuid}

IMPORTANT: If the `group.id` attribute is not set, it defaults the `quarkus.application.name` configuration property.

==== Manual topic-partition assignment

The `assign-seek` channel attribute allows manually assigning topic-partitions to a Kafka incoming channel,
and optionally seek to a specified offset in the partition to start consuming records.
If `assign-seek` is used, the consumer will not be dynamically subscribed to topics,
but instead will statically assign the described partitions.
In manual topic-partition rebalancing doesn't happen and therefore rebalance listeners are never called.

The attribute takes a list of triplets separated by commas: `<topic>:<partition>:<offset>`.

For example, the configuration

[source, properties]
----
mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20
----

assigns the consumer to:

- Partition 0 of topic 'topic1', setting the initial position at offset 10.
- Partition 1 of topic 'topic2', setting the initial position at offset 20.

The topic, partition, and offset in each triplet can have the following variations:

- If the topic is omitted, the configured topic will be used.
- If the offset is omitted, partitions are assigned to the consumer but won't be seeked to offset.

Check warning on line 689 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'seeked'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'seeked'?", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 689, "column": 71}}}, "severity": "WARNING"}
- If offset is 0, it seeks to the beginning of the topic-partition.
- If offset is -1, it seeks to the end of the topic-partition.


=== Receiving Kafka Records in Batches

By default, incoming methods receive each Kafka record individually.
Expand Down Expand Up @@ -1376,6 +1409,55 @@ If you'd like to consume records only written and committed inside a Kafka trans
mp.messaging.incoming.prices-in.isolation.level=read_committed
----

== Kafka Request-Reply

The Kafka Request-Reply pattern allows to publish a request record to a Kafka topic and then await for a reply record that responds to the initial request.
The Kafka connector provides the `KafkaRequestReply` custom emitter that implements the requestor (or the client) of the request-reply pattern for Kafka outbound channels:

Check warning on line 1415 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'requestor'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'requestor'?", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1415, "column": 78}}}, "severity": "WARNING"}

It can be injected as a regular emitter `@Channel`:

Check warning on line 1417 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1417, "column": 20}}}, "severity": "INFO"}

[source, java]
----
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;
@ApplicationScoped
@Path("/kafka")
public class KafkaRequestReplyEmitter {
@Channel("request-reply")
KafkaRequestReply<Integer, String> requestReply;
@POST
@Path("/req-rep")
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> post(Integer request) {
return requestReply.request(request);
}
}
----

The request method publishes the record to the configured target topic of the outgoing channel,
and polls a reply topic (by default, the target topic with `-replies` suffix) for a reply record.
When the reply is received the returned `Uni` is completed with the record value.
The request send operation generates a **correlation id** and sets a header (by default `REPLY_CORRELATION_ID`),
which it expects to be sent back in the reply record.

The replier can be implemented using a Reactive Messaging processor (see <<processing-messages>>).

Check warning on line 1455 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1455, "column": 31}}}, "severity": "INFO"}

Check warning on line 1455 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsWarnings] Consider using 'information about' rather than 'information on' unless updating existing content that uses the term. Raw Output: {"message": "[Quarkus.TermsWarnings] Consider using 'information about' rather than 'information on' unless updating existing content that uses the term.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1455, "column": 99}}}, "severity": "WARNING"}

For more information on Kafka Request Reply feature and advanced configuration options,
see the https://smallrye.io/smallrye-reactive-messaging/latest/kafka/request-reply/[Smallrye Reactive Messaging Documentation].

[[processing-messages]]
== Processing Messages

Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic.

Check warning on line 1463 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'. Raw Output: {"message": "[Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1463, "column": 24}}}, "severity": "INFO"}
Expand Down Expand Up @@ -1938,6 +2020,34 @@ and for an outgoing channel checks that the topic used by the producer exist in
Note that to achieve this, an _admin connection_ is required.
You can adjust the timeout for topic verification calls to the broker using the `health-topic-verification-timeout` configuration.

Check warning on line 2021 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 2021, "column": 48}}}, "severity": "INFO"}

== Observability

If the xref:opentelemetry.adoc[OpenTelemetry extension] is present,
then the Kafka connector channels work out-of-the-box with the OpenTelemetry Tracing.
Messages written to Kafka topics propagate the current tracing span.
On incoming channels, if a consumed Kafka record contains tracing information the message processing inherits the message span as parent.

Tracing can be disabled explicitly per channel:

[source, properties]
----
mp.messaging.incoming.data.tracing-enabled=false
----

If the xref:telemetry-micrometer.adoc[Micrometer extension] is present,
then Kafka producer and consumer clients metrics are exposed as Micrometer meters.

Per channel metrics are also exposed as Micrometer meters.

Check warning on line 2040 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 2040, "column": 16}}}, "severity": "INFO"}
The number of messages produced or received per channel, acknowledgments and duration of processing are exposed.

The messaging meters can be disabled:

[source, properties]
----
quarkus.micrometer.binder.messaging.enabled=false
----


== Kafka Streams

This is described in a dedicated guide: xref:kafka-streams.adoc[Using Apache Kafka Streams].
Expand Down Expand Up @@ -2198,6 +2308,20 @@ With in-memory channels we were able to test application code processing message
Note that different in-memory channels are independent, and switching channel connector to in-memory does not simulate message delivery between channels configured to the same Kafka topic.
====

==== Context propagation with InMemoryConnector

By default, in-memory channels dispatch messages on the caller thread, which would be the main thread in unit tests.

The `quarkus-test-vertx` dependency provides the `@io.quarkus.test.vertx.RunOnVertxContext` annotation,
which when used on a test method, executes the test on a Vert.x context.

However, most of the other connectors handle context propagation dispatching messages on separate duplicated Vert.x contexts.

If your tests are dependent on context propagation,
you can configure the in-memory connector channels with the `run-on-vertx-context` attribute to dispatch events,
including messages and acknowledgements, on a Vert.x context.
Alternatively you can switch this behaviour using the `InMemorySource#runOnVertxContext` method.

=== Testing using a Kafka broker

If you are using <<kafka-dev-services>>, a Kafka broker will be started and available throughout the tests, unless it is disabled in `%test` profile.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.quarkus.micrometer.deployment.binder;

import java.util.function.BooleanSupplier;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.micrometer.runtime.MicrometerRecorder;
import io.quarkus.micrometer.runtime.config.MicrometerConfig;

public class ReactiveMessagingProcessor {

static final String MESSAGE_OBSERVATION_COLLECTOR = "io.smallrye.reactive.messaging.observation.MessageObservationCollector";
static final String METRICS_BEAN_CLASS = "io.quarkus.micrometer.runtime.binder.reactivemessaging.MicrometerObservationCollector";
static final Class<?> MESSAGE_OBSERVATION_COLLECTOR_CLASS = MicrometerRecorder
.getClassForName(MESSAGE_OBSERVATION_COLLECTOR);

static class ReactiveMessagingSupportEnabled implements BooleanSupplier {
MicrometerConfig mConfig;

public boolean getAsBoolean() {
return MESSAGE_OBSERVATION_COLLECTOR_CLASS != null &&
mConfig.checkBinderEnabledWithDefault(mConfig.binder.messaging);
}
}

@BuildStep(onlyIf = ReactiveMessagingSupportEnabled.class)
AdditionalBeanBuildItem createCDIEventConsumer() {
return AdditionalBeanBuildItem.builder()
.addBeanClass(METRICS_BEAN_CLASS)
.setUnremovable()
.build();
}
}
6 changes: 6 additions & 0 deletions extensions/micrometer/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.quarkus.micrometer.runtime.binder.reactivemessaging;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationCollector;
import io.smallrye.reactive.messaging.observation.ObservationContext;

@ApplicationScoped
public class MicrometerObservationCollector
implements MessageObservationCollector<MicrometerObservationCollector.MicrometerContext> {

@Inject
@ConfigProperty(name = "quarkus.messaging.observation.micrometer.enabled", defaultValue = "true")
boolean enabled;

@Override
public MicrometerContext initObservation(String channel, boolean incoming, boolean emitter) {
if (enabled) {
return new MicrometerContext(channel);
}
return null;
}

@Override
public MessageObservation onNewMessage(String channel, Message<?> message, MicrometerContext ctx) {
ctx.msgCount.increment();
return new DefaultMessageObservation(channel);
}

public static class MicrometerContext implements ObservationContext {
final Counter msgCount;
final Timer duration;
final Counter acks;
final Counter nacks;

public MicrometerContext(String channel) {
Tags tags = Tags.of(Tag.of("channel", channel));
this.msgCount = Counter.builder("quarkus.messaging.message.count")
.description("The number of messages observed")
.tags(tags)
.register(Metrics.globalRegistry);
this.duration = Timer.builder("quarkus.messaging.message.duration")
.description("The duration of the message processing")
.tags(tags)
.register(Metrics.globalRegistry);
this.acks = Counter.builder("quarkus.messaging.message.acks")
.description("The number of messages processed successfully")
.tags(tags)
.register(Metrics.globalRegistry);
this.nacks = Counter.builder("quarkus.messaging.message.failures")
.description("The number of messages processed with failures")
.tags(tags)
.register(Metrics.globalRegistry);
}

@Override
public void complete(MessageObservation observation) {
if (observation.getReason() == null) {
acks.increment();
} else {
nacks.increment();
}
duration.record(observation.getCompletionDuration());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public static class BinderConfig {

public GrpcClientConfigGroup grpcClient;

public ReactiveMessagingConfigGroup messaging;

public MPMetricsConfigGroup mpMetrics;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.micrometer.runtime.config;

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;

/**
* Build / static runtime config for Reactive Messaging Binders
*/
@ConfigGroup
public class ReactiveMessagingConfigGroup implements MicrometerConfig.CapabilityEnabled {
/**
* Kafka metrics support.
* <p>
* Support for Reactive Messaging metrics will be enabled if Micrometer support is enabled,
* MessageObservationCollector interface is on the classpath
* and either this value is true, or this value is unset and
* {@code quarkus.micrometer.binder-enabled-default} is true.
*/
@ConfigItem
public Optional<Boolean> enabled;

@Override
public Optional<Boolean> getEnabled() {
return enabled;
}

@Override
public String toString() {
return this.getClass().getSimpleName()
+ "{enabled=" + enabled
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ final class DotNames {

static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());
static final DotName KAFKA_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());
static final DotName KAFKA_TRANSACTIONS_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());
static final DotName KAFKA_REQUEST_REPLY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply.class.getName());

static final DotName TARGETED = DotName.createSimple(io.smallrye.reactive.messaging.Targeted.class.getName());
static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName());
Expand Down

0 comments on commit c4c8b6e

Please sign in to comment.