Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-321: Add Observation propagation #325

Merged
merged 11 commits into from
Oct 26, 2023
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ configure(rootProject) {
archivesBaseName = 'reactor-kafka'
description = 'Reactor Kafka: A reactive API for Apache Kafka'

dependencies {
api libs.micrometer.observation

testImplementation (libs.micrometer.tracing.test) {
exclude group: 'org.mockito'
}
}

jar {
manifest {
attributes 'Automatic-Module-Name': 'reactor.kafka'
Expand Down
7 changes: 5 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ reactorCore = "3.4.33-SNAPSHOT"
asciidoctor = "3.3.2"
kafka-doc = "28"
log4j = "2.17.1"
micrometer = "1.10.4"
micrometer = "1.10.10"
powermock = "2.0.9"
reactiveStreams = "1.0.3"
micrometerTracing = '1.1.4'

[libraries]
kafka = "org.apache.kafka:kafka-clients:3.6.0"
Expand All @@ -28,7 +29,9 @@ powermock-core = { module = "org.powermock:powermock-core", version.ref = "power
powermock-junit = { module = "org.powermock:powermock-module-junit4", version.ref = "powermock" }
powermock-mockito = { module = "org.powermock:powermock-api-mockito2", version.ref = "powermock" }
slf4j = "org.slf4j:slf4j-api:1.7.36"
testcontainers = "org.testcontainers:kafka:1.16.3"
testcontainers = "org.testcontainers:kafka:1.19.0"
micrometer-observation = { module = "io.micrometer:micrometer-observation", version.ref = "micrometer" }
micrometer-tracing-test = { module = "io.micrometer:micrometer-tracing-integration-test", version.ref = "micrometerTracing" }

[plugins]
artifactory = { id = "com.jfrog.artifactory", version = "4.27.1" }
Expand Down
36 changes: 36 additions & 0 deletions src/docs/asciidoc/api-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -608,3 +608,39 @@ methods can be invoked after the receive Flux corresponding to the last receive

To enable micrometer metrics for the underlying Kafka Consumers and Producers, add a `MicrometerConsumerListener` to the `ReceiverOptions` or a `MicrometerProducerListener` to the `SenderOptions` respectively.

=== Micrometer Observation

To enable Micrometer observation for produced and consumed records, add an `ObservationRegistry` to the `SenderOptions` and `ReceiverOptions` using the `withObservation()` API.
A custom `KafkaSenderObservationConvention` (and `KafkaReceiverObservationConvention`) can also be set.
See their default implementations in the `KafkaSenderObservation` and `KafkaReceiverObservation`, respectively.
The `DefaultKafkaSenderObservationConvention` exposes two low-cardinality tags: `reactor.kafka.type = sender` and `reactor.kafka.client.id` with the `ProducerConfig.CLIENT_ID_CONFIG` option or identity hash code of the `DefaultKafkaSender` instance prefixed with the `reactor-kafka-sender-`.
The `DefaultKafkaReceiverObservationConvention` exposes two low-cardinality tags: `reactor.kafka.type = receiver` and `reactor.kafka.client.id` with the `ConsumerConfig.CLIENT_ID_CONFIG` option or identity hash code of the `DefaultKafkaReceiver` instance prefixed with the `reactor-kafka-receiver-`.

If a `PropagatingSenderTracingObservationHandler` is configured on the `ObservationRegistry`, the tracing information from the context around a producer record is stored into its headers before publishing this record to the Kafka topic.
If a `PropagatingReceiverTracingObservationHandler` is configured on the `ObservationRegistry`, the tracing information from the mentioned Kafka record headers, is restored into the context on the receiver side with a child span.

Because the reverse order nature of the Reactor context, the observation functionality on the `KafkaReceiver` is limited just to a single `trace` logging message for each received record.
Restored tracing information will be correlated into logs if so configured for the logging system.
If there are requirements to continue an observation on the consumer side, the `KafkaReceiverObservation.RECEIVER_OBSERVATION` API must be used manually in the record processing operator:

[source,java]
--------
KafkaReceiver.create(receiverOptions.subscription(List.of(topic)))
.receive()
.flatMap(record -> {
Observation receiverObservation =
KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() ->
new KafkaRecordReceiverContext(
record, "user.receiver", receiverOptions.bootstrapServers()),
observationRegistry);

return Mono.just(record)
.flatMap(TARGET_RECORD_HANDLER)
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
})
.subscribe();
--------
Loading
Loading