Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump Smallrye Reactive Messaging version from 4.12.0 to 4.13.0 #37899

Merged
merged 2 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.7.2</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.12.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>4.13.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.4.0</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down
135 changes: 132 additions & 3 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,19 @@

. *Multiple consumer threads inside a consumer group*
+
For a given application instance, the number of consumers inside the consumer group can be configured using `mp.messaging.incoming.$channel.partitions` property.
For a given application instance, the number of consumers inside the consumer group can be configured using `mp.messaging.incoming.$channel.concurrency` property.
The partitions of the subscribed topic will be divided among the consumer threads.
Note that if the `partitions` value exceed the number of partitions of the topic, some consumer threads won't be assigned any partitions.
Note that if the `concurrency` value exceed the number of partitions of the topic, some consumer threads won't be assigned any partitions.

Check warning on line 497 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 497, "column": 115}}}, "severity": "INFO"}
+
ozangunalp marked this conversation as resolved.
Show resolved Hide resolved
image::kafka-one-app-two-consumers.png[alt=Architecture, width=60%, align=center]
+
[NOTE]
.Deprecation
====
The https://smallrye.io/smallrye-reactive-messaging/latest/concepts/incoming-concurrency/[concurrency attribute]
provides a connector agnostic way for non-blocking concurrent channels and replaces the Kafka connector specific `partitions` attribute.
The `partitions` attribute is therefore deprecated and will be removed in future releases.
====

. *Multiple consumer applications inside a consumer group*
+
Expand Down Expand Up @@ -658,6 +666,36 @@

IMPORTANT: If the `group.id` attribute is not set, it defaults the `quarkus.application.name` configuration property.

==== Manual topic-partition assignment

The `assign-seek` channel attribute allows manually assigning topic-partitions to a Kafka incoming channel,
and optionally seek to a specified offset in the partition to start consuming records.
If `assign-seek` is used, the consumer will not be dynamically subscribed to topics,
but instead will statically assign the described partitions.
In manual topic-partition rebalancing doesn't happen and therefore rebalance listeners are never called.

The attribute takes a list of triplets separated by commas: `<topic>:<partition>:<offset>`.

For example, the configuration

[source, properties]
----
mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20
----

assigns the consumer to:

- Partition 0 of topic 'topic1', setting the initial position at offset 10.
- Partition 1 of topic 'topic2', setting the initial position at offset 20.

The topic, partition, and offset in each triplet can have the following variations:

- If the topic is omitted, the configured topic will be used.
- If the offset is omitted, partitions are assigned to the consumer but won't be sought to offset.
- If offset is 0, it seeks to the beginning of the topic-partition.
- If offset is -1, it seeks to the end of the topic-partition.


=== Receiving Kafka Records in Batches

By default, incoming methods receive each Kafka record individually.
Expand Down Expand Up @@ -1112,7 +1150,7 @@
====

[NOTE]
.Depreciation
.Deprecation
====
`MutinyEmitter#send(Message msg)` method is deprecated in favor of following methods receiving `Message` for emitting:

Expand Down Expand Up @@ -1376,9 +1414,58 @@
mp.messaging.incoming.prices-in.isolation.level=read_committed
----

== Kafka Request-Reply

The Kafka Request-Reply pattern allows to publish a request record to a Kafka topic and then await for a reply record that responds to the initial request.
The Kafka connector provides the `KafkaRequestReply` custom emitter that implements the requestor (or the client) of the request-reply pattern for Kafka outbound channels:

Check warning on line 1420 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'requestor'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'requestor'?", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1420, "column": 78}}}, "severity": "WARNING"}

It can be injected as a regular emitter `@Channel`:

Check warning on line 1422 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1422, "column": 20}}}, "severity": "INFO"}

[source, java]
----
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
@Path("/kafka")
public class KafkaRequestReplyEmitter {

@Channel("request-reply")
KafkaRequestReply<Integer, String> requestReply;

@POST
@Path("/req-rep")
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> post(Integer request) {
return requestReply.request(request);
}

}
----

The request method publishes the record to the configured target topic of the outgoing channel,
and polls a reply topic (by default, the target topic with `-replies` suffix) for a reply record.
When the reply is received the returned `Uni` is completed with the record value.
The request send operation generates a **correlation id** and sets a header (by default `REPLY_CORRELATION_ID`),
which it expects to be sent back in the reply record.

The replier can be implemented using a Reactive Messaging processor (see <<processing-messages>>).

Check warning on line 1460 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1460, "column": 31}}}, "severity": "INFO"}

Check warning on line 1460 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsWarnings] Consider using 'information about' rather than 'information on' unless updating existing content that uses the term. Raw Output: {"message": "[Quarkus.TermsWarnings] Consider using 'information about' rather than 'information on' unless updating existing content that uses the term.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1460, "column": 99}}}, "severity": "WARNING"}

For more information on Kafka Request Reply feature and advanced configuration options,
see the https://smallrye.io/smallrye-reactive-messaging/latest/kafka/request-reply/[Smallrye Reactive Messaging Documentation].

[[processing-messages]]
== Processing Messages

Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic.

Check warning on line 1468 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'. Raw Output: {"message": "[Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 1468, "column": 24}}}, "severity": "INFO"}
A processor method can be simply implemented using both the `@Incoming` and `@Outgoing` annotations:

[source, java]
Expand Down Expand Up @@ -1942,8 +2029,36 @@
and for an outgoing channel checks that the topic used by the producer exist in the broker.

Note that to achieve this, an _admin connection_ is required.
You can adjust the timeout for topic verification calls to the broker using the `health-topic-verification-timeout` configuration.

Check warning on line 2032 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 2032, "column": 48}}}, "severity": "INFO"}

== Observability

If the xref:opentelemetry.adoc[OpenTelemetry extension] is present,
then the Kafka connector channels work out-of-the-box with the OpenTelemetry Tracing.
Messages written to Kafka topics propagate the current tracing span.
On incoming channels, if a consumed Kafka record contains tracing information the message processing inherits the message span as parent.

Tracing can be disabled explicitly per channel:

[source, properties]
----
mp.messaging.incoming.data.tracing-enabled=false
----

If the xref:telemetry-micrometer.adoc[Micrometer extension] is present,
then Kafka producer and consumer clients metrics are exposed as Micrometer meters.

Per channel metrics are also exposed as Micrometer meters.

Check warning on line 2051 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 2051, "column": 16}}}, "severity": "INFO"}

Check warning on line 2051 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 2051, "column": 38}}}, "severity": "INFO"}
The number of messages produced or received per channel, acknowledgments and duration of processing are exposed.

The messaging meters can be disabled:

[source, properties]
----
quarkus.micrometer.binder.messaging.enabled=false
----


== Kafka Streams

This is described in a dedicated guide: xref:kafka-streams.adoc[Using Apache Kafka Streams].
Expand Down Expand Up @@ -2204,6 +2319,20 @@
Note that different in-memory channels are independent, and switching channel connector to in-memory does not simulate message delivery between channels configured to the same Kafka topic.
====

==== Context propagation with InMemoryConnector

By default, in-memory channels dispatch messages on the caller thread, which would be the main thread in unit tests.

The `quarkus-test-vertx` dependency provides the `@io.quarkus.test.vertx.RunOnVertxContext` annotation,
which when used on a test method, executes the test on a Vert.x context.

However, most of the other connectors handle context propagation dispatching messages on separate duplicated Vert.x contexts.

If your tests are dependent on context propagation,
you can configure the in-memory connector channels with the `run-on-vertx-context` attribute to dispatch events,
including messages and acknowledgements, on a Vert.x context.

Check warning on line 2333 in docs/src/main/asciidoc/kafka.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 2333, "column": 62}}}, "severity": "INFO"}
Alternatively you can switch this behaviour using the `InMemorySource#runOnVertxContext` method.

=== Testing using a Kafka broker

If you are using <<kafka-dev-services>>, a Kafka broker will be started and available throughout the tests, unless it is disabled in `%test` profile.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.quarkus.micrometer.deployment.binder;

import java.util.function.BooleanSupplier;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.micrometer.runtime.MicrometerRecorder;
import io.quarkus.micrometer.runtime.config.MicrometerConfig;

public class ReactiveMessagingProcessor {

static final String MESSAGE_OBSERVATION_COLLECTOR = "io.smallrye.reactive.messaging.observation.MessageObservationCollector";
static final String METRICS_BEAN_CLASS = "io.quarkus.micrometer.runtime.binder.reactivemessaging.MicrometerObservationCollector";
static final Class<?> MESSAGE_OBSERVATION_COLLECTOR_CLASS = MicrometerRecorder
.getClassForName(MESSAGE_OBSERVATION_COLLECTOR);

static class ReactiveMessagingSupportEnabled implements BooleanSupplier {
MicrometerConfig mConfig;

public boolean getAsBoolean() {
return MESSAGE_OBSERVATION_COLLECTOR_CLASS != null &&
mConfig.checkBinderEnabledWithDefault(mConfig.binder.messaging);
}
}

@BuildStep(onlyIf = ReactiveMessagingSupportEnabled.class)
AdditionalBeanBuildItem createCDIEventConsumer() {
return AdditionalBeanBuildItem.builder()
.addBeanClass(METRICS_BEAN_CLASS)
.setUnremovable()
.build();
}
}
6 changes: 6 additions & 0 deletions extensions/micrometer/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.quarkus.micrometer.runtime.binder.reactivemessaging;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationCollector;
import io.smallrye.reactive.messaging.observation.ObservationContext;

@ApplicationScoped
public class MicrometerObservationCollector
implements MessageObservationCollector<MicrometerObservationCollector.MicrometerContext> {

@Inject
@ConfigProperty(name = "quarkus.messaging.observation.micrometer.enabled", defaultValue = "true")
boolean enabled;

@Override
public MicrometerContext initObservation(String channel, boolean incoming, boolean emitter) {
if (enabled) {
return new MicrometerContext(channel);
}
return null;
}

@Override
public MessageObservation onNewMessage(String channel, Message<?> message, MicrometerContext ctx) {
ctx.msgCount.increment();
return new DefaultMessageObservation(channel);
}

public static class MicrometerContext implements ObservationContext {
final Counter msgCount;
final Timer duration;
final Counter acks;
final Counter nacks;

public MicrometerContext(String channel) {
Tags tags = Tags.of(Tag.of("channel", channel));
this.msgCount = Counter.builder("quarkus.messaging.message.count")
.description("The number of messages observed")
.tags(tags)
.register(Metrics.globalRegistry);
this.duration = Timer.builder("quarkus.messaging.message.duration")
.description("The duration of the message processing")
.tags(tags)
.register(Metrics.globalRegistry);
this.acks = Counter.builder("quarkus.messaging.message.acks")
.description("The number of messages processed successfully")
.tags(tags)
.register(Metrics.globalRegistry);
this.nacks = Counter.builder("quarkus.messaging.message.failures")
.description("The number of messages processed with failures")
.tags(tags)
.register(Metrics.globalRegistry);
}

@Override
public void complete(MessageObservation observation) {
if (observation.getReason() == null) {
acks.increment();
} else {
nacks.increment();
}
duration.record(observation.getCompletionDuration());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public static class BinderConfig {

public GrpcClientConfigGroup grpcClient;

public ReactiveMessagingConfigGroup messaging;

public MPMetricsConfigGroup mpMetrics;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.micrometer.runtime.config;

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;

/**
* Build / static runtime config for Reactive Messaging Binders
*/
@ConfigGroup
public class ReactiveMessagingConfigGroup implements MicrometerConfig.CapabilityEnabled {
/**
* Kafka metrics support.
* <p>
* Support for Reactive Messaging metrics will be enabled if Micrometer support is enabled,
* MessageObservationCollector interface is on the classpath
* and either this value is true, or this value is unset and
* {@code quarkus.micrometer.binder-enabled-default} is true.
*/
@ConfigItem
public Optional<Boolean> enabled;

@Override
public Optional<Boolean> getEnabled() {
return enabled;
}

@Override
public String toString() {
return this.getClass().getSimpleName()
+ "{enabled=" + enabled
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ final class DotNames {

static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());
static final DotName KAFKA_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());
static final DotName KAFKA_TRANSACTIONS_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());
static final DotName KAFKA_REQUEST_REPLY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply.class.getName());

static final DotName TARGETED = DotName.createSimple(io.smallrye.reactive.messaging.Targeted.class.getName());
static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName());
Expand Down