Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2532,6 +2532,14 @@ Also see `idleBeforeDataMultiplier`.
|`true`
|Whether or not to maintain Micrometer timers for the consumer threads.

|[[micrometerTags]]<<micrometerTags,`micrometerTags`>>
|empty
|A map of static tags to be added to micrometer metrics.

|[[micrometerTagsProvider]]<<micrometerTagsProvider,`micrometerTagsProvider`>>
|`null`
|A function that provides dynamic tags, based on the consumer record.

|[[missingTopicsFatal]]<<missingTopicsFatal,`missingTopicsFatal`>>
|`false`
|When true prevents the container from starting if the confifgured topic(s) are not present on the broker.
Expand Down Expand Up @@ -3350,6 +3358,8 @@ The timers are named `spring.kafka.listener` and have the following tags:

You can add additional tags using the `ContainerProperties` `micrometerTags` property.

Starting with versions 2.9.8, 3.0.6, you can provide a function in `ContainerProperties` `micrometerTagsProvider`; the function receives the `ConsumerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.

NOTE: With the concurrent container, timers are created for each thread and the `name` tag is suffixed with `-n` where n is `0` to `concurrency-1`.

===== Monitoring KafkaTemplate Performance
Expand All @@ -3367,6 +3377,8 @@ The timers are named `spring.kafka.template` and have the following tags:

You can add additional tags using the template's `micrometerTags` property.

Starting with versions 2.9.8, 3.0.6, you can provide a `KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)` property; the function receives the `ProducerRecord<?, ?>` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`.

[[micrometer-native]]
===== Micrometer Native Metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClientConfig;
Expand Down Expand Up @@ -148,6 +149,9 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

@Nullable
private Function<ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider;

@Nullable
private KafkaAdmin kafkaAdmin;

Expand Down Expand Up @@ -362,6 +366,33 @@ public void setMicrometerTags(Map<String, String> tags) {
}
}

/**
* Set a function to provide dynamic tags based on the producer record. These tags
* will be added to any static tags provided in {@link #setMicrometerTags(Map)
* micrometerTags}. Only applies to record listeners, ignored for batch listeners.
* Does not apply if observation is enabled.
* @param micrometerTagsProvider the micrometerTagsProvider.
* @since 2.9.8
* @see #setMicrometerEnabled(boolean)
* @see #setMicrometerTags(Map)
* @see #setObservationEnabled(boolean)
*/
public void setMicrometerTagsProvider(
@Nullable Function<ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) {

this.micrometerTagsProvider = micrometerTagsProvider;
}

/**
* Return the Micrometer tags provider.
* @return the micrometerTagsProvider.
* @since 2.9.8
*/
@Nullable
public Function<ProducerRecord<?, ?>, Map<String, String>> getMicrometerTagsProvider() {
return this.micrometerTagsProvider;
}

/**
* Return the producer factory used by this template.
* @return the factory.
Expand Down Expand Up @@ -784,9 +815,7 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
}
try {
if (exception == null) {
if (sample != null) {
this.micrometerHolder.success(sample);
}
successTimer(sample, producerRecord);
observation.stop();
future.complete(new SendResult<>(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null) {
Expand All @@ -796,9 +825,7 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
+ ", metadata: " + metadata);
}
else {
if (sample != null) {
this.micrometerHolder.failure(sample, exception.getClass().getSimpleName());
}
failureTimer(sample, exception, producerRecord);
observation.error(exception);
observation.stop();
future.completeExceptionally(
Expand All @@ -818,6 +845,28 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
};
}

private void successTimer(@Nullable Object sample, ProducerRecord<?, ?> record) {
if (sample != null) {
if (this.micrometerTagsProvider == null) {
this.micrometerHolder.success(sample);
}
else {
this.micrometerHolder.success(sample, record);
}
}
}

private void failureTimer(@Nullable Object sample, Exception exception, ProducerRecord<?, ?> record) {
if (sample != null) {
if (this.micrometerTagsProvider == null) {
this.micrometerHolder.failure(sample, exception.getClass().getSimpleName());
}
else {
this.micrometerHolder.failure(sample, exception.getClass().getSimpleName(), record);
}
}
}


/**
* Return true if the template is currently running in a transaction on the calling
Expand Down Expand Up @@ -875,9 +924,18 @@ private MicrometerHolder obtainMicrometerHolder() {
MicrometerHolder holder = null;
try {
if (KafkaUtils.MICROMETER_PRESENT) {
Function<Object, Map<String, String>> mergedProvider = cr -> this.micrometerTags;
if (this.micrometerTagsProvider != null) {
mergedProvider = cr -> {
Map<String, String> tags = new HashMap<>(this.micrometerTags);
if (cr != null) {
tags.putAll(this.micrometerTagsProvider.apply((ProducerRecord<?, ?>) cr));
}
return tags;
};
}
holder = new MicrometerHolder(this.applicationContext, this.beanName,
"spring.kafka.template", "KafkaTemplate Timer",
this.micrometerTags);
"spring.kafka.template", "KafkaTemplate Timer", mergedProvider);
}
}
catch (@SuppressWarnings("unused") IllegalStateException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;

import org.aopalliance.aop.Advice;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.aop.framework.Advised;
import org.springframework.aop.framework.ProxyFactory;
Expand Down Expand Up @@ -183,6 +185,9 @@ public enum EOSMode {

private final List<Advice> adviceChain = new ArrayList<>();

@Nullable
private Function<ConsumerRecord<?, ?>, Map<String, String>> micrometerTagsProvider;

/**
* The ack mode to use when auto ack (in the configuration properties) is false.
* <ul>
Expand Down Expand Up @@ -660,7 +665,8 @@ public boolean isObservationEnabled() {
}

/**
* Set to true to enable observation via Micrometer.
* Set to true to enable observation via Micrometer. When false (default)
* basic Micrometer timers are used instead (when enabled).
* @param observationEnabled true to enable.
* @since 3.0
* @see #setMicrometerEnabled(boolean)
Expand All @@ -680,10 +686,42 @@ public void setMicrometerTags(Map<String, String> tags) {
}
}

/**
* Return static Micrometer tags.
* @return the tags.
* @since 2.3
*/
public Map<String, String> getMicrometerTags() {
return Collections.unmodifiableMap(this.micrometerTags);
}

/**
* Set a function to provide dynamic tags based on the consumer record. These tags
* will be added to any static tags provided in {@link #setMicrometerTags(Map)
* micrometerTags}. Only applies to record listeners, ignored for batch listeners.
* Does not apply if observation is enabled.
* @param micrometerTagsProvider the micrometerTagsProvider.
* @since 2.9.8
* @see #setMicrometerEnabled(boolean)
* @see #setMicrometerTags(Map)
* @see #setObservationEnabled(boolean)
*/
public void setMicrometerTagsProvider(
@Nullable Function<ConsumerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) {

this.micrometerTagsProvider = micrometerTagsProvider;
}

/**
* Return the Micrometer tags provider.
* @return the micrometerTagsProvider.
* @since 2.9.8
*/
@Nullable
public Function<ConsumerRecord<?, ?>, Map<String, String>> getMicrometerTagsProvider() {
return this.micrometerTagsProvider;
}

public Duration getConsumerStartTimeout() {
return this.consumerStartTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -800,6 +801,10 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final Object bootstrapServers;

@Nullable
private final Function<ConsumerRecord<?, ?>, Map<String, String>> micrometerTagsProvider =
this.containerProperties.getMicrometerTagsProvider();

private String clusterId;

private Map<TopicPartition, OffsetMetadata> definedPartitions;
Expand Down Expand Up @@ -1326,9 +1331,19 @@ private MicrometerHolder obtainMicrometerHolder() {
if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled()
&& !this.containerProperties.isObservationEnabled()) {

Function<Object, Map<String, String>> mergedProvider =
cr -> this.containerProperties.getMicrometerTags();
if (this.micrometerTagsProvider != null) {
mergedProvider = cr -> {
Map<String, String> tags = new HashMap<>(this.containerProperties.getMicrometerTags());
if (cr != null) {
tags.putAll(this.micrometerTagsProvider.apply((ConsumerRecord<?, ?>) cr));
}
return tags;
};
}
holder = new MicrometerHolder(getApplicationContext(), getBeanName(),
"spring.kafka.listener", "Kafka Listener Timer",
this.containerProperties.getMicrometerTags());
"spring.kafka.listener", "Kafka Listener Timer", mergedProvider);
}
}
catch (@SuppressWarnings(UNUSED) IllegalStateException ex) {
Expand Down Expand Up @@ -2309,7 +2324,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
try {
invokeBatchOnMessage(records, recordList);
batchInterceptAfter(records, null);
successTimer(sample);
successTimer(sample, null);
if (this.batchFailed) {
this.batchFailed = false;
if (this.commonErrorHandler != null) {
Expand All @@ -2322,7 +2337,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
}
}
catch (RuntimeException e) {
failureTimer(sample);
failureTimer(sample, null);
batchInterceptAfter(records, e);
if (this.commonErrorHandler == null) {
throw e;
Expand Down Expand Up @@ -2400,15 +2415,25 @@ private Object startMicrometerSample() {
return null;
}

private void successTimer(@Nullable Object sample) {
private void successTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record) {
if (sample != null) {
this.micrometerHolder.success(sample);
if (this.micrometerTagsProvider == null || record == null) {
this.micrometerHolder.success(sample);
}
else {
this.micrometerHolder.success(sample, record);
}
}
}

private void failureTimer(@Nullable Object sample) {
private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record) {
if (sample != null) {
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException");
if (this.micrometerTagsProvider == null || record == null) {
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException");
}
else {
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException", record);
}
}
}

Expand Down Expand Up @@ -2795,11 +2820,11 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
return observation.observe(() -> {
try {
invokeOnMessage(cRecord);
successTimer(sample);
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
}
catch (RuntimeException e) {
failureTimer(sample);
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
Expand Down
Loading