diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5dd9187bfe2c..bd53af1c5897 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -14,7 +14,7 @@
+ files="(ApiMessageType|FieldSpec|MessageDataGenerator|KafkaConsumerTest).java"/>
offsets) {
@Override
public void commitSync(final Map offsets, final Duration timeout) {
acquireAndEnsureOpen();
+ long commitStart = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
offsets.forEach(this::updateLastSeenEpochIfNewer);
@@ -1493,6 +1494,7 @@ public void commitSync(final Map offsets, fin
"committing offsets " + offsets);
}
} finally {
+ kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
release();
}
}
@@ -1871,9 +1873,11 @@ public Map committed(final Set committed(final Set partitions, final Duration timeout) {
acquireAndEnsureOpen();
+ long start = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
- Map offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
+ final Map offsets;
+ offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
if (offsets == null) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
"committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
@@ -1883,6 +1887,7 @@ public Map committed(final Set implements Producer {
private final String clientId;
// Visible for testing
final Metrics metrics;
+ private final KafkaProducerMetrics producerMetrics;
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
@@ -356,6 +358,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
+ this.producerMetrics = new KafkaProducerMetrics(metrics);
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
@@ -590,9 +593,11 @@ else if (acks != -1)
public void initTransactions() {
throwIfNoTransactionManager();
throwIfProducerClosed();
+ long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ producerMetrics.recordInit(time.nanoseconds() - now);
}
/**
@@ -613,7 +618,9 @@ public void initTransactions() {
public void beginTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
+ long now = time.nanoseconds();
transactionManager.beginTransaction();
+ producerMetrics.recordBeginTxn(time.nanoseconds() - now);
}
/**
@@ -697,9 +704,11 @@ public void sendOffsetsToTransaction(Map offs
throwIfInvalidGroupMetadata(groupMetadata);
throwIfNoTransactionManager();
throwIfProducerClosed();
+ long start = time.nanoseconds();
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
/**
@@ -730,9 +739,11 @@ public void sendOffsetsToTransaction(Map offs
public void commitTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
+ long commitStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
}
/**
@@ -761,9 +772,11 @@ public void abortTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
log.info("Aborting incomplete transaction");
+ long abortStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
}
/**
@@ -1124,12 +1137,16 @@ private void ensureValidRecordSize(int size) {
@Override
public void flush() {
log.trace("Flushing accumulated records in producer.");
+
+ long start = time.nanoseconds();
this.accumulator.beginFlush();
this.sender.wakeup();
try {
this.accumulator.awaitFlushCompletion();
} catch (InterruptedException e) {
throw new InterruptException("Flush interrupted.", e);
+ } finally {
+ producerMetrics.recordFlush(time.nanoseconds() - start);
}
}
@@ -1245,6 +1262,7 @@ private void close(Duration timeout, boolean swallowException) {
}
Utils.closeQuietly(interceptors, "producer interceptors", firstException);
+ Utils.closeQuietly(producerMetrics, "producer metrics wrapper", firstException);
Utils.closeQuietly(metrics, "producer metrics", firstException);
Utils.closeQuietly(keySerializer, "producer keySerializer", firstException);
Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
new file mode 100644
index 000000000000..b8ea762e27f5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+import java.util.Map;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+ public static final String GROUP = "producer-metrics";
+ private static final String FLUSH = "flush";
+ private static final String TXN_INIT = "txn-init";
+ private static final String TXN_BEGIN = "txn-begin";
+ private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+ private static final String TXN_COMMIT = "txn-commit";
+ private static final String TXN_ABORT = "txn-abort";
+ private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";
+
+ private final Map tags;
+ private final Metrics metrics;
+ private final Sensor initTimeSensor;
+ private final Sensor beginTxnTimeSensor;
+ private final Sensor flushTimeSensor;
+ private final Sensor sendOffsetsSensor;
+ private final Sensor commitTxnSensor;
+ private final Sensor abortTxnSensor;
+
+ public KafkaProducerMetrics(Metrics metrics) {
+ this.metrics = metrics;
+ tags = this.metrics.config().tags();
+ flushTimeSensor = newLatencySensor(
+ FLUSH,
+ "Total time producer has spent in flush in nanoseconds."
+ );
+ initTimeSensor = newLatencySensor(
+ TXN_INIT,
+ "Total time producer has spent in initTransactions in nanoseconds."
+ );
+ beginTxnTimeSensor = newLatencySensor(
+ TXN_BEGIN,
+ "Total time producer has spent in beginTransaction in nanoseconds."
+ );
+ sendOffsetsSensor = newLatencySensor(
+ TXN_SEND_OFFSETS,
+ "Total time producer has spent in sendOffsetsToTransaction."
+ );
+ commitTxnSensor = newLatencySensor(
+ TXN_COMMIT,
+ "Total time producer has spent in commitTransaction."
+ );
+ abortTxnSensor = newLatencySensor(
+ TXN_ABORT,
+ "Total time producer has spent in abortTransaction."
+ );
+ }
+
+ @Override
+ public void close() {
+ removeMetric(FLUSH);
+ removeMetric(TXN_INIT);
+ removeMetric(TXN_BEGIN);
+ removeMetric(TXN_SEND_OFFSETS);
+ removeMetric(TXN_COMMIT);
+ removeMetric(TXN_ABORT);
+ }
+
+ public void recordFlush(long duration) {
+ flushTimeSensor.record(duration);
+ }
+
+ public void recordInit(long duration) {
+ initTimeSensor.record(duration);
+ }
+
+ public void recordBeginTxn(long duration) {
+ beginTxnTimeSensor.record(duration);
+ }
+
+ public void recordSendOffsets(long duration) {
+ sendOffsetsSensor.record(duration);
+ }
+
+ public void recordCommitTxn(long duration) {
+ commitTxnSensor.record(duration);
+ }
+
+ public void recordAbortTxn(long duration) {
+ abortTxnSensor.record(duration);
+ }
+
+ private Sensor newLatencySensor(String name, String description) {
+ Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX);
+ sensor.add(metricName(name, description), new CumulativeSum());
+ return sensor;
+ }
+
+ private MetricName metricName(final String name, final String description) {
+ return metrics.metricName(name + TOTAL_TIME_SUFFIX, GROUP, description, tags);
+ }
+
+ private void removeMetric(final String name) {
+ metrics.removeSensor(name + TOTAL_TIME_SUFFIX);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
index 643897375a72..2ad2cba09e8d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
@@ -30,7 +30,6 @@
public class SenderMetricsRegistry {
- final static String METRIC_GROUP_NAME = "producer-metrics";
final static String TOPIC_METRIC_GROUP_NAME = "producer-topic-metrics";
private final List allTemplates;
@@ -154,7 +153,7 @@ public SenderMetricsRegistry(Metrics metrics) {
}
private MetricName createMetricName(String name, String description) {
- return this.metrics.metricInstance(createTemplate(name, METRIC_GROUP_NAME, description, this.tags));
+ return this.metrics.metricInstance(createTemplate(name, KafkaProducerMetrics.GROUP, description, this.tags));
}
private MetricNameTemplate createTopicTemplate(String name, String description) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 57cd94231244..bc7d506275e9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -34,6 +34,7 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -1925,6 +1926,96 @@ public void testCommittedAuthenticationFailure() {
assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
}
+ @Test
+ public void testMeasureCommitSyncDurationOnFailure() {
+ final KafkaConsumer consumer
+ = consumerWithPendingError(new MockTime(Duration.ofSeconds(1).toMillis()));
+
+ try {
+ consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(10L)));
+ } catch (final RuntimeException e) {
+ }
+
+ final Metric metric = consumer.metrics()
+ .get(consumer.metrics.metricName("commit-sync-time-ns-total", "consumer-metrics"));
+ assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
+ }
+
+ @Test
+ public void testMeasureCommitSyncDuration() {
+ Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+ SubscriptionState subscription = new SubscriptionState(new LogContext(),
+ OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+ initMetadata(client, Collections.singletonMap(topic, 2));
+ Node node = metadata.fetch().nodes().get(0);
+ ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+ KafkaConsumer consumer = newConsumer(time, client, subscription, metadata,
+ assignor, true, groupInstanceId);
+ consumer.assign(singletonList(tp0));
+
+ client.prepareResponseFrom(
+ FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
+ Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+ client.prepareResponseFrom(
+ offsetCommitResponse(Collections.singletonMap(tp0, Errors.NONE)),
+ coordinator
+ );
+
+ consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(10L)));
+
+ final Metric metric = consumer.metrics()
+ .get(consumer.metrics.metricName("commit-sync-time-ns-total", "consumer-metrics"));
+ assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
+ }
+
+ @Test
+ public void testMeasureCommittedDurationOnFailure() {
+ final KafkaConsumer consumer
+ = consumerWithPendingError(new MockTime(Duration.ofSeconds(1).toMillis()));
+
+ try {
+ consumer.committed(Collections.singleton(tp0));
+ } catch (final RuntimeException e) {
+ }
+
+ final Metric metric = consumer.metrics()
+ .get(consumer.metrics.metricName("committed-time-ns-total", "consumer-metrics"));
+ assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
+ }
+
+ @Test
+ public void testMeasureCommittedDuration() {
+ long offset1 = 10000;
+ Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+ SubscriptionState subscription = new SubscriptionState(new LogContext(),
+ OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+ initMetadata(client, Collections.singletonMap(topic, 2));
+ Node node = metadata.fetch().nodes().get(0);
+ ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+ KafkaConsumer consumer = newConsumer(time, client, subscription, metadata,
+ assignor, true, groupInstanceId);
+ consumer.assign(singletonList(tp0));
+
+ // lookup coordinator
+ client.prepareResponseFrom(
+ FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
+ Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+ // fetch offset for one topic
+ client.prepareResponseFrom(
+ offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator);
+
+ consumer.committed(Collections.singleton(tp0)).get(tp0).offset();
+
+ final Metric metric = consumer.metrics()
+ .get(consumer.metrics.metricName("committed-time-ns-total", "consumer-metrics"));
+ assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
+ }
+
@Test
public void testRebalanceException() {
Time time = new MockTime();
@@ -2247,8 +2338,7 @@ public void testListOffsetShouldUpateSubscriptions() {
consumer.close(Duration.ZERO);
}
- private KafkaConsumer consumerWithPendingAuthenticationError() {
- Time time = new MockTime();
+ private KafkaConsumer consumerWithPendingAuthenticationError(final Time time) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -2262,6 +2352,14 @@ private KafkaConsumer consumerWithPendingAuthenticationError() {
return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
}
+ private KafkaConsumer consumerWithPendingAuthenticationError() {
+ return consumerWithPendingAuthenticationError(new MockTime());
+ }
+
+ private KafkaConsumer consumerWithPendingError(final Time time) {
+ return consumerWithPendingAuthenticationError(time);
+ }
+
private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer consumer) {
return new ConsumerRebalanceListener() {
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
new file mode 100644
index 000000000000..087f90b7efa3
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class KafkaConsumerMetricsTest {
+ private static final long METRIC_VALUE = 123L;
+ private static final String CONSUMER_GROUP_PREFIX = "consumer";
+ private static final String CONSUMER_METRIC_GROUP = "consumer-metrics";
+ private static final String COMMIT_SYNC_TIME_TOTAL = "commit-sync-time-ns-total";
+ private static final String COMMITTED_TIME_TOTAL = "committed-time-ns-total";
+
+ private final Metrics metrics = new Metrics();
+ private final KafkaConsumerMetrics consumerMetrics
+ = new KafkaConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX);
+
+ @Test
+ public void shouldRecordCommitSyncTime() {
+ // When:
+ consumerMetrics.recordCommitSync(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(COMMIT_SYNC_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordCommittedTime() {
+ // When:
+ consumerMetrics.recordCommitted(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(COMMITTED_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRemoveMetricsOnClose() {
+ // When:
+ consumerMetrics.close();
+
+ // Then:
+ assertMetricRemoved(COMMIT_SYNC_TIME_TOTAL);
+ assertMetricRemoved(COMMITTED_TIME_TOTAL);
+ }
+
+ private void assertMetricRemoved(final String name) {
+ assertNull(metrics.metric(metrics.metricName(name, CONSUMER_METRIC_GROUP)));
+ }
+
+ private void assertMetricValue(final String name) {
+ assertEquals(
+ metrics.metric(metrics.metricName(name, CONSUMER_METRIC_GROUP)).metricValue(),
+ (double) METRIC_VALUE
+ );
+ }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 2784f19c0254..c48c3feed609 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -27,6 +27,7 @@
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -42,6 +43,7 @@
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.network.Selectable;
@@ -813,6 +815,41 @@ public void testFlushCompleteSendOfInflightBatches() {
}
}
+ private static Double getMetricValue(final KafkaProducer, ?> producer, final String name) {
+ Metrics metrics = producer.metrics;
+ Metric metric = metrics.metric(metrics.metricName(name, "producer-metrics"));
+ return (Double) metric.metricValue();
+ }
+
+ @Test
+ public void testFlushMeasureLatency() {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+
+ try (KafkaProducer producer = kafkaProducer(
+ configs,
+ new StringSerializer(),
+ new StringSerializer(),
+ metadata,
+ client,
+ null,
+ time
+ )) {
+ producer.flush();
+ double first = getMetricValue(producer, "flush-time-ns-total");
+ assertTrue(first > 0);
+ producer.flush();
+ assertTrue(getMetricValue(producer, "flush-time-ns-total") > first);
+ }
+ }
+
@Test
public void testMetricConfigRecordingLevel() {
Properties props = new Properties();
@@ -951,6 +988,36 @@ public void testAbortTransaction() {
}
}
+ @Test
+ public void testMeasureAbortTransactionDuration() {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+ client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+ try (KafkaProducer producer = kafkaProducer(configs, new StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
+ producer.initTransactions();
+
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+ producer.beginTransaction();
+ producer.abortTransaction();
+ double first = getMetricValue(producer, "txn-abort-time-ns-total");
+ assertTrue(first > 0);
+
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+ producer.beginTransaction();
+ producer.abortTransaction();
+ assertTrue(getMetricValue(producer, "txn-abort-time-ns-total") > first);
+ }
+ }
+
@Test
public void testSendTxnOffsetsWithGroupId() {
Map configs = new HashMap<>();
@@ -988,6 +1055,62 @@ public void testSendTxnOffsetsWithGroupId() {
}
}
+ private void assertDurationAtLeast(KafkaProducer, ?> producer, String name, double floor) {
+ getAndAssertDurationAtLeast(producer, name, floor);
+ }
+
+ private double getAndAssertDurationAtLeast(KafkaProducer, ?> producer, String name, double floor) {
+ double value = getMetricValue(producer, name);
+ assertTrue(value > floor);
+ return value;
+ }
+
+ @Test
+ public void testMeasureTransactionDurations() {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ Duration tick = Duration.ofSeconds(1);
+ Time time = new MockTime(tick.toMillis());
+ MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+ client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+ try (KafkaProducer producer = kafkaProducer(configs, new StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
+ producer.initTransactions();
+ assertDurationAtLeast(producer, "txn-init-time-ns-total", tick.toNanos());
+
+ client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+ client.prepareResponse(txnOffsetsCommitResponse(Collections.singletonMap(
+ new TopicPartition("topic", 0), Errors.NONE)));
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+ producer.beginTransaction();
+ double beginFirst = getAndAssertDurationAtLeast(producer, "txn-begin-time-ns-total", tick.toNanos());
+ producer.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("group"));
+ double sendOffFirst = getAndAssertDurationAtLeast(producer, "txn-send-offsets-time-ns-total", tick.toNanos());
+ producer.commitTransaction();
+ double commitFirst = getAndAssertDurationAtLeast(producer, "txn-commit-time-ns-total", tick.toNanos());
+
+ client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
+ client.prepareResponse(txnOffsetsCommitResponse(Collections.singletonMap(
+ new TopicPartition("topic", 0), Errors.NONE)));
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+ producer.beginTransaction();
+ assertDurationAtLeast(producer, "txn-begin-time-ns-total", beginFirst + tick.toNanos());
+ producer.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("group"));
+ assertDurationAtLeast(producer, "txn-send-offsets-time-ns-total", sendOffFirst + tick.toNanos());
+ producer.commitTransaction();
+ assertDurationAtLeast(producer, "txn-commit-time-ns-total", commitFirst + tick.toNanos());
+ }
+ }
+
@Test
public void testSendTxnOffsetsWithGroupMetadata() {
final short maxVersion = (short) 3;
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
new file mode 100644
index 000000000000..e0688616b643
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class KafkaProducerMetricsTest {
+ private static final long METRIC_VALUE = 123L;
+ private static final String FLUSH_TIME_TOTAL = "flush-time-ns-total";
+ private static final String TXN_INIT_TIME_TOTAL = "txn-init-time-ns-total";
+ private static final String TXN_BEGIN_TIME_TOTAL = "txn-begin-time-ns-total";
+ private static final String TXN_COMMIT_TIME_TOTAL = "txn-commit-time-ns-total";
+ private static final String TXN_ABORT_TIME_TOTAL = "txn-abort-time-ns-total";
+ private static final String TXN_SEND_OFFSETS_TIME_TOTAL = "txn-send-offsets-time-ns-total";
+
+ private final Metrics metrics = new Metrics();
+ private final KafkaProducerMetrics producerMetrics = new KafkaProducerMetrics(metrics);
+
+ @Test
+ public void shouldRecordFlushTime() {
+ // When:
+ producerMetrics.recordFlush(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(FLUSH_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordInitTime() {
+ // When:
+ producerMetrics.recordInit(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_INIT_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordTxBeginTime() {
+ // When:
+ producerMetrics.recordBeginTxn(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_BEGIN_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordTxCommitTime() {
+ // When:
+ producerMetrics.recordCommitTxn(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_COMMIT_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordTxAbortTime() {
+ // When:
+ producerMetrics.recordAbortTxn(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_ABORT_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordSendOffsetsTime() {
+ // When:
+ producerMetrics.recordSendOffsets(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_SEND_OFFSETS_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRemoveMetricsOnClose() {
+ // When:
+ producerMetrics.close();
+
+ // Then:
+ assertMetricRemoved(FLUSH_TIME_TOTAL);
+ assertMetricRemoved(TXN_INIT_TIME_TOTAL);
+ assertMetricRemoved(TXN_BEGIN_TIME_TOTAL);
+ assertMetricRemoved(TXN_COMMIT_TIME_TOTAL);
+ assertMetricRemoved(TXN_ABORT_TIME_TOTAL);
+ assertMetricRemoved(TXN_SEND_OFFSETS_TIME_TOTAL);
+ }
+
+ private void assertMetricRemoved(final String name) {
+ assertNull(metrics.metric(metrics.metricName(name, KafkaProducerMetrics.GROUP)));
+ }
+
+ private void assertMetricValue(final String name) {
+ assertEquals(
+ metrics.metric(metrics.metricName(name, KafkaProducerMetrics.GROUP)).metricValue(),
+ (double) METRIC_VALUE
+ );
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 06171d385098..2ed3bd85162b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -110,7 +110,8 @@ class ActiveTaskCreator {
clientSupplier,
null,
processId,
- logContext);
+ logContext,
+ time);
taskProducers = Collections.emptyMap();
}
}
@@ -243,7 +244,8 @@ private StreamTask createActiveTask(final TaskId taskId,
clientSupplier,
taskId,
null,
- logContext);
+ logContext,
+ time);
taskProducers.put(taskId, streamsProducer);
} else {
streamsProducer = threadProducer;
@@ -326,4 +328,12 @@ private LogContext getLogContext(final TaskId taskId) {
return new LogContext(logPrefix);
}
+ public double totalProducerBlockedTime() {
+ if (threadProducer != null) {
+ return threadProducer.totalBlockedTime();
+ }
+ return taskProducers.values().stream()
+ .mapToDouble(StreamsProducer::totalBlockedTime)
+ .sum();
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index e55c5d7d1cb3..f45350d5c97d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -296,6 +296,7 @@ public void run() {
log.warn("Error happened during initialization of the global state store; this thread has shutdown");
streamsMetrics.removeAllThreadLevelSensors(getName());
+ streamsMetrics.removeAllThreadLevelMetrics(getName());
return;
}
@@ -338,6 +339,7 @@ public void run() {
}
streamsMetrics.removeAllThreadLevelSensors(getName());
+ streamsMetrics.removeAllThreadLevelMetrics(getName());
setState(DEAD);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eb3fe79bf134..a1da37359a88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -512,6 +512,21 @@ public StreamThread(final Time time,
ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
+ ThreadMetrics.addThreadStartTimeMetric(
+ threadId,
+ streamsMetrics,
+ time.milliseconds()
+ );
+ ThreadMetrics.addThreadBlockedTimeMetric(
+ threadId,
+ new StreamThreadTotalBlockedTime(
+ mainConsumer,
+ restoreConsumer,
+ taskManager::totalProducerBlockedTime
+ ),
+ streamsMetrics
+ );
+
this.time = time;
this.topologyMetadata = topologyMetadata;
this.logPrefix = logContext.logPrefix();
@@ -1127,6 +1142,7 @@ private void completeShutdown(final boolean cleanRun) {
log.error("Failed to close restore consumer due to the following error:", e);
}
streamsMetrics.removeAllThreadLevelSensors(getName());
+ streamsMetrics.removeAllThreadLevelMetrics(getName());
setState(State.DEAD);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java
new file mode 100644
index 000000000000..cf37633f2bc7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamThreadTotalBlockedTime {
+ private final Consumer, ?> consumer;
+ private final Consumer, ?> restoreConsumer;
+ private final Supplier producerTotalBlockedTime;
+
+ StreamThreadTotalBlockedTime(
+ final Consumer, ?> consumer,
+ final Consumer, ?> restoreConsumer,
+ final Supplier producerTotalBlockedTime) {
+ this.consumer = consumer;
+ this.restoreConsumer = restoreConsumer;
+ this.producerTotalBlockedTime = producerTotalBlockedTime;
+ }
+
+ private double metricValue(
+ final Map metrics,
+ final String name) {
+ return metrics.keySet().stream()
+ .filter(n -> n.name().equals(name))
+ .findFirst()
+ .map(n -> (Double) metrics.get(n).metricValue())
+ .orElse(0.0);
+ }
+
+ public double compute() {
+ return metricValue(consumer.metrics(), "io-waittime-total")
+ + metricValue(consumer.metrics(), "iotime-total")
+ + metricValue(consumer.metrics(), "committed-time-ns-total")
+ + metricValue(consumer.metrics(), "commit-sync-time-ns-total")
+ + metricValue(restoreConsumer.metrics(), "io-waittime-total")
+ + metricValue(restoreConsumer.metrics(), "iotime-total")
+ + producerTotalBlockedTime.get();
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 8655f01838c6..23ee0b1072cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -35,6 +36,7 @@
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
@@ -67,22 +69,26 @@ public class StreamsProducer {
private final Map eosV2ProducerConfigs;
private final KafkaClientSupplier clientSupplier;
private final StreamThread.ProcessingMode processingMode;
+ private final Time time;
private Producer producer;
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
+ private double oldProducerTotalBlockedTime = 0;
public StreamsProducer(final StreamsConfig config,
final String threadId,
final KafkaClientSupplier clientSupplier,
final TaskId taskId,
final UUID processId,
- final LogContext logContext) {
+ final LogContext logContext,
+ final Time time) {
Objects.requireNonNull(config, "config cannot be null");
Objects.requireNonNull(threadId, "threadId cannot be null");
this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier cannot be null");
log = Objects.requireNonNull(logContext, "logContext cannot be null").logger(getClass());
logPrefix = logContext.logPrefix().trim();
+ this.time = Objects.requireNonNull(time, "time");
processingMode = StreamThread.processingMode(config);
@@ -178,12 +184,50 @@ public void resetProducer() {
throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
}
+ oldProducerTotalBlockedTime += totalBlockedTime(producer);
+ final long start = time.nanoseconds();
producer.close();
+ final long closeTime = time.nanoseconds() - start;
+ oldProducerTotalBlockedTime += closeTime;
producer = clientSupplier.getProducer(eosV2ProducerConfigs);
transactionInitialized = false;
}
+ private double getMetricValue(final Map metrics,
+ final String name) {
+ final List found = metrics.keySet().stream()
+ .filter(n -> n.name().equals(name))
+ .collect(Collectors.toList());
+ if (found.isEmpty()) {
+ return 0.0;
+ }
+ if (found.size() > 1) {
+ final String err = String.format(
+ "found %d values for metric %s. total blocked time computation may be incorrect",
+ found.size(),
+ name
+ );
+ log.error(err);
+ throw new IllegalStateException(err);
+ }
+ return (Double) metrics.get(found.get(0)).metricValue();
+ }
+
+ private double totalBlockedTime(final Producer, ?> producer) {
+ return getMetricValue(producer.metrics(), "bufferpool-wait-time-total")
+ + getMetricValue(producer.metrics(), "flush-time-ns-total")
+ + getMetricValue(producer.metrics(), "txn-init-time-ns-total")
+ + getMetricValue(producer.metrics(), "txn-begin-time-ns-total")
+ + getMetricValue(producer.metrics(), "txn-send-offsets-time-ns-total")
+ + getMetricValue(producer.metrics(), "txn-commit-time-ns-total")
+ + getMetricValue(producer.metrics(), "txn-abort-time-ns-total");
+ }
+
+ public double totalBlockedTime() {
+ return oldProducerTotalBlockedTime + totalBlockedTime(producer);
+ }
+
private void maybeBeginTransaction() {
if (eosEnabled() && !transactionInFlight) {
try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 24b90a62ca53..9269c9d22906 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -124,6 +124,10 @@ void setMainConsumer(final Consumer mainConsumer) {
tasks.setMainConsumer(mainConsumer);
}
+ public double totalProducerBlockedTime() {
+ return tasks.totalProducerBlockedTime();
+ }
+
public UUID processId() {
return processId;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 35056ffbd082..96c0ee156178 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -101,6 +101,10 @@ void maybeCreateTasksFromNewTopologies() {
);
}
+ double totalProducerBlockedTime() {
+ return activeTaskCreator.totalProducerBlockedTime();
+ }
+
void createTasks(final Map> activeTasksToCreate,
final Map> standbyTasksToCreate) {
for (final Map.Entry> taskToBeCreated : activeTasksToCreate.entrySet()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 9a3689854ed2..dea23993d23a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -91,6 +91,7 @@ public int hashCode() {
private final Version version;
private final Deque clientLevelMetrics = new LinkedList<>();
private final Deque clientLevelSensors = new LinkedList<>();
+ private final Map> threadLevelMetrics = new HashMap<>();
private final Map> threadLevelSensors = new HashMap<>();
private final Map> taskLevelSensors = new HashMap<>();
private final Map> nodeLevelSensors = new HashMap<>();
@@ -200,6 +201,36 @@ public void addClientLevelMutableMetric(final String name,
}
}
+ public void addThreadLevelImmutableMetric(final String name,
+ final String description,
+ final String threadId,
+ final T value) {
+ final MetricName metricName = metrics.metricName(
+ name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
+ synchronized (threadLevelMetrics) {
+ threadLevelMetrics.computeIfAbsent(
+ threadSensorPrefix(threadId),
+ tid -> new LinkedList<>()
+ ).add(metricName);
+ metrics.addMetric(metricName, new ImmutableMetricValue<>(value));
+ }
+ }
+
+ public void addThreadLevelMutableMetric(final String name,
+ final String description,
+ final String threadId,
+ final Gauge valueProvider) {
+ final MetricName metricName = metrics.metricName(
+ name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
+ synchronized (threadLevelMetrics) {
+ threadLevelMetrics.computeIfAbsent(
+ threadSensorPrefix(threadId),
+ tid -> new LinkedList<>()
+ ).add(metricName);
+ metrics.addMetric(metricName, valueProvider);
+ }
+ }
+
public final Sensor clientLevelSensor(final String sensorName,
final RecordingLevel recordingLevel,
final Sensor... parents) {
@@ -271,6 +302,15 @@ public final void removeAllThreadLevelSensors(final String threadId) {
}
}
+ public final void removeAllThreadLevelMetrics(final String threadId) {
+ synchronized (threadLevelMetrics) {
+ final Deque names = threadLevelMetrics.remove(threadSensorPrefix(threadId));
+ while (names != null && !names.isEmpty()) {
+ metrics.removeMetric(names.pop());
+ }
+ }
+ }
+
public Map taskLevelTagMap(final String threadId, final String taskId) {
final Map tagMap = threadLevelTagMap(threadId);
tagMap.put(TASK_ID_TAG, taskId);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index 28cb10f09f59..8912f6e58d52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -18,6 +18,7 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import java.util.Map;
@@ -45,6 +46,8 @@ private ThreadMetrics() {}
private static final String CREATE_TASK = "task-created";
private static final String CLOSE_TASK = "task-closed";
private static final String SKIP_RECORD = "skipped-records";
+ private static final String BLOCKED_TIME = "blocked-time-ns-total";
+ private static final String THREAD_START_TIME = "thread-start-time";
private static final String COMMIT_DESCRIPTION = "calls to commit";
private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_DESCRIPTION;
@@ -91,6 +94,10 @@ private ThreadMetrics() {}
"The fraction of time the thread spent on polling records from consumer";
private static final String COMMIT_RATIO_DESCRIPTION =
"The fraction of time the thread spent on committing all tasks";
+ private static final String BLOCKED_TIME_DESCRIPTION =
+ "The total time the thread spent blocked on kafka";
+ private static final String THREAD_START_TIME_DESCRIPTION =
+ "The time that the thread was started";
public static Sensor createTaskSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
@@ -310,6 +317,28 @@ public static Sensor commitRatioSensor(final String threadId,
return sensor;
}
+ public static void addThreadStartTimeMetric(final String threadId,
+ final StreamsMetricsImpl streamsMetrics,
+ final long startTime) {
+ streamsMetrics.addThreadLevelImmutableMetric(
+ THREAD_START_TIME,
+ THREAD_START_TIME_DESCRIPTION,
+ threadId,
+ startTime
+ );
+ }
+
+ public static void addThreadBlockedTimeMetric(final String threadId,
+ final StreamThreadTotalBlockedTime blockedTime,
+ final StreamsMetricsImpl streamsMetrics) {
+ streamsMetrics.addThreadLevelMutableMetric(
+ BLOCKED_TIME,
+ BLOCKED_TIME_DESCRIPTION,
+ threadId,
+ (config, now) -> blockedTime.compute()
+ );
+ }
+
private static Sensor invocationRateAndCountSensor(final String threadId,
final String metricName,
final String descriptionOfRate,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 84f5cfce4837..9ada60f40367 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -191,6 +191,8 @@ public static void closeCluster() {
private static final String TASK_CREATED_TOTAL = "task-created-total";
private static final String TASK_CLOSED_RATE = "task-closed-rate";
private static final String TASK_CLOSED_TOTAL = "task-closed-total";
+ private static final String BLOCKED_TIME_TOTAL = "blocked-time-ns-total";
+ private static final String THREAD_START_TIME = "thread-start-time";
private static final String ACTIVE_PROCESS_RATIO = "active-process-ratio";
private static final String ACTIVE_BUFFER_COUNT = "active-buffer-count";
private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
@@ -503,6 +505,8 @@ private void checkThreadLevelMetrics() {
checkMetricByName(listMetricThread, TASK_CREATED_TOTAL, NUM_THREADS);
checkMetricByName(listMetricThread, TASK_CLOSED_RATE, NUM_THREADS);
checkMetricByName(listMetricThread, TASK_CLOSED_TOTAL, NUM_THREADS);
+ checkMetricByName(listMetricThread, BLOCKED_TIME_TOTAL, NUM_THREADS);
+ checkMetricByName(listMetricThread, THREAD_START_TIME, NUM_THREADS);
}
private void checkTaskLevelMetrics() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index b689dc18c51b..74d81bd7f3a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
@@ -55,6 +56,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThrows;
@@ -117,6 +119,16 @@ public void shouldNoOpCloseTaskProducerIfEosDisabled() {
assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
}
+ @Test
+ public void shouldReturnBlockedTimeWhenThreadProducer() {
+ final double blockedTime = 123.0;
+ createTasks();
+ final MockProducer, ?> producer = mockClientSupplier.producers.get(0);
+ addMetric(producer, "flush-time-ns-total", blockedTime);
+
+ assertThat(activeTaskCreator.totalProducerBlockedTime(), closeTo(blockedTime, 0.01));
+ }
+
// error handling
@Test
@@ -224,6 +236,23 @@ public void shouldCloseTaskProducersIfEosAlphaEnabled() {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
}
+ @SuppressWarnings("deprecation")
+ @Test
+ public void shouldReturnBlockedTimeWhenTaskProducers() {
+ properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ mockClientSupplier.setApplicationIdForProducer("appId");
+ createTasks();
+ double total = 0.0;
+ double blocked = 1.0;
+ for (final MockProducer, ?> producer : mockClientSupplier.producers) {
+ addMetric(producer, "flush-time-ns-total", blocked);
+ total += blocked;
+ blocked += 1.0;
+ }
+
+ assertThat(activeTaskCreator.totalProducerBlockedTime(), closeTo(total, 0.01));
+ }
+
// error handling
@SuppressWarnings("deprecation")
@@ -289,7 +318,6 @@ public void shouldThrowStreamsExceptionOnErrorCloseTaskProducerIfEosAlphaEnabled
}
-
// eos-v2 test
// functional test
@@ -488,4 +516,26 @@ private void createTasks() {
equalTo(mkSet(task00, task01))
);
}
+
+ private void addMetric(
+ final MockProducer, ?> producer,
+ final String name,
+ final double value) {
+ final MetricName metricName = metricName(name);
+ producer.setMockMetrics(metricName, new Metric() {
+ @Override
+ public MetricName metricName() {
+ return metricName;
+ }
+
+ @Override
+ public Object metricValue() {
+ return value;
+ }
+ });
+ }
+
+ private MetricName metricName(final String name) {
+ return new MetricName(name, "", "", Collections.emptyMap());
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 5a83c6839d3d..48364f27db58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -43,6 +43,7 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
@@ -133,7 +134,8 @@ public void setup() {
clientSupplier,
null,
processId,
- logContext
+ logContext,
+ Time.SYSTEM
);
mockProducer = clientSupplier.producers.get(0);
collector = new RecordCollectorImpl(
@@ -792,7 +794,8 @@ public void abortTransaction() {
},
taskId,
processId,
- logContext
+ logContext,
+ Time.SYSTEM
),
productionExceptionHandler,
streamsMetrics
@@ -823,7 +826,8 @@ public List partitionsFor(final String topic) {
},
null,
null,
- logContext
+ logContext,
+ Time.SYSTEM
),
productionExceptionHandler,
streamsMetrics
@@ -857,7 +861,8 @@ public Producer getProducer(final Map config) {
},
taskId,
processId,
- logContext
+ logContext,
+ Time.SYSTEM
),
productionExceptionHandler,
streamsMetrics
@@ -895,7 +900,8 @@ public synchronized Future send(final ProducerRecord partitionsFor(final String topic) {
},
null,
null,
- logContext
+ logContext,
+ Time.SYSTEM
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTimeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTimeTest.java
new file mode 100644
index 000000000000..c2f3f39c2cc6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTimeTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+public class StreamThreadTotalBlockedTimeTest {
+ private static final int IOTIME_TOTAL = 1;
+ private static final int IO_WATTIME_TOTAL = 2;
+ private static final int COMMITTED_TIME_TOTAL = 3;
+ private static final int COMMIT_SYNC_TIME_TOTAL = 4;
+ private static final int RESTORE_IOTIME_TOTAL = 5;
+ private static final int RESTORE_IO_WAITTIME_TOTAL = 6;
+ private static final double PRODUCER_BLOCKED_TIME = 7.0;
+
+ @Mock
+ Consumer, ?> consumer;
+ @Mock
+ Consumer, ?> restoreConsumer;
+ @Mock
+ Supplier producerBlocked;
+
+ private StreamThreadTotalBlockedTime blockedTime;
+
+ @Rule
+ public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Before
+ public void setup() {
+ blockedTime = new StreamThreadTotalBlockedTime(consumer, restoreConsumer, producerBlocked);
+ when(consumer.metrics()).thenAnswer(a -> new MetricsBuilder()
+ .addMetric("iotime-total", IOTIME_TOTAL)
+ .addMetric("io-waittime-total", IO_WATTIME_TOTAL)
+ .addMetric("committed-time-ns-total", COMMITTED_TIME_TOTAL)
+ .addMetric("commit-sync-time-ns-total", COMMIT_SYNC_TIME_TOTAL)
+ .build()
+ );
+ when(restoreConsumer.metrics()).thenAnswer(a -> new MetricsBuilder()
+ .addMetric("iotime-total", RESTORE_IOTIME_TOTAL)
+ .addMetric("io-waittime-total", RESTORE_IO_WAITTIME_TOTAL)
+ .build()
+ );
+ when(producerBlocked.get()).thenReturn(PRODUCER_BLOCKED_TIME);
+ }
+
+ @Test
+ public void shouldComputeTotalBlockedTime() {
+ assertThat(
+ blockedTime.compute(),
+ equalTo(IOTIME_TOTAL + IO_WATTIME_TOTAL + COMMITTED_TIME_TOTAL
+ + COMMIT_SYNC_TIME_TOTAL + RESTORE_IOTIME_TOTAL + RESTORE_IO_WAITTIME_TOTAL
+ + PRODUCER_BLOCKED_TIME)
+ );
+ }
+
+ private static class MetricsBuilder {
+ private final HashMap metrics = new HashMap<>();
+
+ private MetricsBuilder addMetric(final String name, final double value) {
+ final MetricName metricName = new MetricName(name, "", "", Collections.emptyMap());
+ metrics.put(
+ metricName,
+ new Metric() {
+ @Override
+ public MetricName metricName() {
+ return metricName;
+ }
+
+ @Override
+ public Object metricValue() {
+ return value;
+ }
+ }
+ );
+ return this;
+ }
+
+ public Map build() {
+ return Collections.unmodifiableMap(metrics);
+ }
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index f4dec8982946..5e074bf89515 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -24,6 +24,8 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -33,6 +35,7 @@
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
@@ -57,6 +60,8 @@
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertSame;
@@ -64,6 +69,13 @@
import static org.junit.Assert.assertTrue;
public class StreamsProducerTest {
+ private static final double BUFFER_POOL_WAIT_TIME = 1;
+ private static final double FLUSH_TME = 2;
+ private static final double TXN_INIT_TIME = 3;
+ private static final double TXN_BEGIN_TIME = 4;
+ private static final double TXN_SEND_OFFSETS_TIME = 5;
+ private static final double TXN_COMMIT_TIME = 6;
+ private static final double TXN_ABORT_TIME = 7;
private final LogContext logContext = new LogContext("test ");
private final String topic = "topic";
@@ -93,6 +105,8 @@ public class StreamsProducerTest {
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2))
);
+ private final Time mockTime = mock(Time.class);
+
final Producer mockedProducer = mock(Producer.class);
final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
@Override
@@ -106,7 +120,8 @@ public Producer getProducer(final Map config) {
clientSupplier,
null,
null,
- logContext
+ logContext,
+ mockTime
);
final StreamsProducer eosAlphaStreamsProducerWithMock = new StreamsProducer(
eosAlphaConfig,
@@ -114,7 +129,8 @@ public Producer getProducer(final Map config) {
clientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
@@ -136,8 +152,6 @@ public Producer getProducer(final Map config) {
mkEntry(new TopicPartition(topic, 0), new OffsetAndMetadata(0L, null))
);
-
-
@Before
public void before() {
mockClientSupplier.setCluster(cluster);
@@ -148,7 +162,8 @@ public void before() {
mockClientSupplier,
null,
null,
- logContext
+ logContext,
+ mockTime
);
nonEosMockProducer = mockClientSupplier.producers.get(0);
@@ -161,7 +176,8 @@ public void before() {
eosAlphaMockClientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
eosAlphaStreamsProducer.initTransaction();
eosAlphaMockProducer = eosAlphaMockClientSupplier.producers.get(0);
@@ -175,10 +191,13 @@ public void before() {
eosBetaMockClientSupplier,
null,
UUID.randomUUID(),
- logContext
+ logContext,
+ mockTime
);
eosBetaStreamsProducer.initTransaction();
eosBetaMockProducer = eosBetaMockClientSupplier.producers.get(0);
+ expect(mockTime.nanoseconds()).andAnswer(Time.SYSTEM::nanoseconds).anyTimes();
+ replay(mockTime);
}
@@ -251,7 +270,8 @@ public void shouldFailIfStreamsConfigIsNull() {
mockClientSupplier,
new TaskId(0, 0),
UUID.randomUUID(),
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("config cannot be null"));
@@ -267,7 +287,8 @@ public void shouldFailIfThreadIdIsNull() {
mockClientSupplier,
new TaskId(0, 0),
UUID.randomUUID(),
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("threadId cannot be null"));
@@ -283,7 +304,8 @@ public void shouldFailIfClientSupplierIsNull() {
null,
new TaskId(0, 0),
UUID.randomUUID(),
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("clientSupplier cannot be null"));
@@ -299,7 +321,8 @@ public void shouldFailIfLogContextIsNull() {
mockClientSupplier,
new TaskId(0, 0),
UUID.randomUUID(),
- null)
+ null,
+ mockTime)
);
assertThat(thrown.getMessage(), is("logContext cannot be null"));
@@ -343,7 +366,8 @@ public void shouldNotSetTransactionIdIfEosDisabled() {
mockClientSupplier,
null,
null,
- logContext
+ logContext,
+ mockTime
);
}
@@ -462,7 +486,8 @@ public void shouldSetTransactionIdUsingTaskIdIfEosAlphaEnabled() {
eosAlphaMockClientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
verify(mockMap);
@@ -489,7 +514,8 @@ public void shouldSetTransactionIdUsingProcessIdIfEosV2Enabled() {
eosAlphaMockClientSupplier,
null,
processId,
- logContext
+ logContext,
+ mockTime
);
verify(mockMap);
@@ -612,7 +638,8 @@ public void shouldCommitTxWithConsumerGroupMetadataOnEosBetaCommit() {
clientSupplier,
null,
UUID.randomUUID(),
- logContext
+ logContext,
+ mockTime
);
streamsProducer.initTransaction();
// call `send()` to start a transaction
@@ -665,7 +692,8 @@ public void shouldFailIfTaskIdIsNullForEosAlpha() {
mockClientSupplier,
null,
UUID.randomUUID(),
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("taskId cannot be null for exactly-once alpha"));
@@ -681,7 +709,8 @@ public void shouldFailIfProcessIdNullForEosBeta() {
mockClientSupplier,
new TaskId(0, 0),
null,
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("processId cannot be null for exactly-once v2"));
@@ -704,7 +733,8 @@ public Producer getProducer(final Map config) {
clientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
final TimeoutException thrown = assertThrows(
@@ -724,7 +754,8 @@ public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExac
eosAlphaMockClientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
final IllegalStateException thrown = assertThrows(
@@ -744,7 +775,8 @@ public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExac
eosBetaMockClientSupplier,
null,
UUID.randomUUID(),
- logContext
+ logContext,
+ mockTime
);
final IllegalStateException thrown = assertThrows(
@@ -772,7 +804,8 @@ public Producer getProducer(final Map config) {
clientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
final StreamsException thrown = assertThrows(
@@ -801,7 +834,8 @@ public Producer getProducer(final Map config) {
clientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
final RuntimeException thrown = assertThrows(
@@ -1105,7 +1139,8 @@ public void shouldResetTransactionInitializedOnResetProducer() {
clientSupplier,
null,
UUID.randomUUID(),
- logContext
+ logContext,
+ mockTime
);
streamsProducer.initTransaction();
@@ -1113,6 +1148,7 @@ public void shouldResetTransactionInitializedOnResetProducer() {
mockedProducer.close();
mockedProducer.initTransactions();
expectLastCall();
+ expect(mockedProducer.metrics()).andReturn(Collections.emptyMap()).anyTimes();
replay(mockedProducer);
streamsProducer.resetProducer();
@@ -1121,4 +1157,99 @@ public void shouldResetTransactionInitializedOnResetProducer() {
verify(mockedProducer);
}
+ @Test
+ public void shouldComputeTotalBlockedTime() {
+ setProducerMetrics(
+ nonEosMockProducer,
+ BUFFER_POOL_WAIT_TIME,
+ FLUSH_TME,
+ TXN_INIT_TIME,
+ TXN_BEGIN_TIME,
+ TXN_SEND_OFFSETS_TIME,
+ TXN_COMMIT_TIME,
+ TXN_ABORT_TIME
+ );
+
+ final double expectedTotalBlocked = BUFFER_POOL_WAIT_TIME + FLUSH_TME + TXN_INIT_TIME +
+ TXN_BEGIN_TIME + TXN_SEND_OFFSETS_TIME + TXN_COMMIT_TIME + TXN_ABORT_TIME;
+ assertThat(nonEosStreamsProducer.totalBlockedTime(), closeTo(expectedTotalBlocked, 0.01));
+ }
+
+ @Test
+ public void shouldComputeTotalBlockedTimeAfterReset() {
+ setProducerMetrics(
+ eosBetaMockProducer,
+ BUFFER_POOL_WAIT_TIME,
+ FLUSH_TME,
+ TXN_INIT_TIME,
+ TXN_BEGIN_TIME,
+ TXN_SEND_OFFSETS_TIME,
+ TXN_COMMIT_TIME,
+ TXN_ABORT_TIME
+ );
+ final double expectedTotalBlocked = BUFFER_POOL_WAIT_TIME + FLUSH_TME + TXN_INIT_TIME +
+ TXN_BEGIN_TIME + TXN_SEND_OFFSETS_TIME + TXN_COMMIT_TIME + TXN_ABORT_TIME;
+ assertThat(eosBetaStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));
+ reset(mockTime);
+ final long closeStart = 1L;
+ final long clodeDelay = 1L;
+ expect(mockTime.nanoseconds()).andReturn(closeStart).andReturn(closeStart + clodeDelay);
+ replay(mockTime);
+ eosBetaStreamsProducer.resetProducer();
+ setProducerMetrics(
+ eosBetaMockClientSupplier.producers.get(1),
+ BUFFER_POOL_WAIT_TIME,
+ FLUSH_TME,
+ TXN_INIT_TIME,
+ TXN_BEGIN_TIME,
+ TXN_SEND_OFFSETS_TIME,
+ TXN_COMMIT_TIME,
+ TXN_ABORT_TIME
+ );
+
+ assertThat(
+ eosBetaStreamsProducer.totalBlockedTime(),
+ closeTo(2 * expectedTotalBlocked + clodeDelay, 0.01)
+ );
+ }
+
+ private MetricName metricName(final String name) {
+ return new MetricName(name, "", "", Collections.emptyMap());
+ }
+
+ private void addMetric(
+ final MockProducer, ?> producer,
+ final String name,
+ final double value) {
+ final MetricName metricName = metricName(name);
+ producer.setMockMetrics(metricName, new Metric() {
+ @Override
+ public MetricName metricName() {
+ return metricName;
+ }
+
+ @Override
+ public Object metricValue() {
+ return value;
+ }
+ });
+ }
+
+ private void setProducerMetrics(
+ final MockProducer, ?> producer,
+ final double bufferPoolWaitTime,
+ final double flushTime,
+ final double txnInitTime,
+ final double txnBeginTime,
+ final double txnSendOffsetsTime,
+ final double txnCommitTime,
+ final double txnAbortTime) {
+ addMetric(producer, "bufferpool-wait-time-total", bufferPoolWaitTime);
+ addMetric(producer, "flush-time-ns-total", flushTime);
+ addMetric(producer, "txn-init-time-ns-total", txnInitTime);
+ addMetric(producer, "txn-begin-time-ns-total", txnBeginTime);
+ addMetric(producer, "txn-send-offsets-time-ns-total", txnSendOffsetsTime);
+ addMetric(producer, "txn-commit-time-ns-total", txnCommitTime);
+ addMetric(producer, "txn-abort-time-ns-total", txnAbortTime);
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index bfe05a6fc48c..24cf8c7f1cc6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -57,6 +57,7 @@
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
@@ -75,6 +76,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@@ -1209,4 +1211,92 @@ public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() {
verify(sensor);
}
+
+ @Test
+ public void shouldAddThreadLevelMutableMetric() {
+ final int measuredValue = 123;
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+ streamsMetrics.addThreadLevelMutableMetric(
+ "foobar",
+ "test metric",
+ "t1",
+ (c, t) -> measuredValue
+ );
+
+ final MetricName name = metrics.metricName(
+ "foobar",
+ THREAD_LEVEL_GROUP,
+ Collections.singletonMap("thread-id", "t1")
+ );
+ assertThat(metrics.metric(name), notNullValue());
+ assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+ }
+
+ @Test
+ public void shouldCleanupThreadLevelMutableMetric() {
+ final int measuredValue = 123;
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+ streamsMetrics.addThreadLevelMutableMetric(
+ "foobar",
+ "test metric",
+ "t1",
+ (c, t) -> measuredValue
+ );
+
+ streamsMetrics.removeAllThreadLevelMetrics("t1");
+
+ final MetricName name = metrics.metricName(
+ "foobar",
+ THREAD_LEVEL_GROUP,
+ Collections.singletonMap("thread-id", "t1")
+ );
+ assertThat(metrics.metric(name), nullValue());
+ }
+
+ @Test
+ public void shouldAddThreadLevelImmutableMetric() {
+ final int measuredValue = 123;
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+ streamsMetrics.addThreadLevelImmutableMetric(
+ "foobar",
+ "test metric",
+ "t1",
+ measuredValue
+ );
+
+ final MetricName name = metrics.metricName(
+ "foobar",
+ THREAD_LEVEL_GROUP,
+ Collections.singletonMap("thread-id", "t1")
+ );
+ assertThat(metrics.metric(name), notNullValue());
+ assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+ }
+
+ @Test
+ public void shouldCleanupThreadLevelImmutableMetric() {
+ final int measuredValue = 123;
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+ streamsMetrics.addThreadLevelImmutableMetric(
+ "foobar",
+ "test metric",
+ "t1",
+ measuredValue
+ );
+
+ streamsMetrics.removeAllThreadLevelMetrics("t1");
+
+ final MetricName name = metrics.metricName(
+ "foobar",
+ THREAD_LEVEL_GROUP,
+ Collections.singletonMap("thread-id", "t1")
+ );
+ assertThat(metrics.metric(name), nullValue());
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index ae0eae43f6b3..0a486db812dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -16,14 +16,20 @@
*/
package org.apache.kafka.streams.processor.internals.metrics;
+import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Map;
+import org.mockito.ArgumentCaptor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
@@ -387,4 +393,55 @@ public void shouldGetCloseTaskSensor() {
assertThat(sensor, is(expectedSensor));
}
+
+ @Test
+ public void shouldAddThreadStartTimeMetric() {
+ // Given:
+ final long startTime = 123L;
+
+ // When:
+ ThreadMetrics.addThreadStartTimeMetric(
+ "bongo",
+ streamsMetrics,
+ startTime
+ );
+
+ // Then:
+ verify(streamsMetrics).addThreadLevelImmutableMetric(
+ "thread-start-time",
+ "The time that the thread was started",
+ "bongo",
+ startTime
+ );
+ }
+
+ @Test
+ public void shouldAddTotalBlockedTimeMetric() {
+ // Given:
+ final double startTime = 123.45;
+ final StreamThreadTotalBlockedTime blockedTime = mock(StreamThreadTotalBlockedTime.class);
+ when(blockedTime.compute()).thenReturn(startTime);
+
+ // When:
+ ThreadMetrics.addThreadBlockedTimeMetric(
+ "burger",
+ blockedTime,
+ streamsMetrics
+ );
+
+ // Then:
+ final ArgumentCaptor> captor = gaugeCaptor();
+ verify(streamsMetrics).addThreadLevelMutableMetric(
+ eq("blocked-time-ns-total"),
+ eq("The total time the thread spent blocked on kafka"),
+ eq("burger"),
+ captor.capture()
+ );
+ assertThat(captor.getValue().value(null, 678L), is(startTime));
+ }
+
+ @SuppressWarnings("unchecked")
+ private ArgumentCaptor> gaugeCaptor() {
+ return ArgumentCaptor.forClass(Gauge.class);
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 3f438f939b82..6a95ccbd08a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -22,6 +22,7 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
@@ -208,7 +209,8 @@ private KeyValueStoreTestDriver(final StateSerdes serdes) {
new MockClientSupplier(),
null,
null,
- logContext),
+ logContext,
+ Time.SYSTEM),
new DefaultProductionExceptionHandler(),
new MockStreamsMetrics(new Metrics())
) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c8a320f8cfc0..c56f7bce9af1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -23,6 +23,7 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
@@ -423,7 +424,8 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
clientSupplier,
new TaskId(0, 0),
UUID.randomUUID(),
- logContext
+ logContext,
+ Time.SYSTEM
),
streamsConfig.defaultProductionExceptionHandler(),
new MockStreamsMetrics(metrics));
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index c73ab3e07368..05f10e98f2ef 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -362,7 +362,8 @@ public Consumer getGlobalConsumer(final Map conf
throw new IllegalStateException();
}
},
- logContext
+ logContext,
+ mockWallClockTime
);
setupGlobalTask(mockWallClockTime, streamsConfig, streamsMetrics, cache);
@@ -1334,8 +1335,9 @@ private static class TestDriverProducer extends StreamsProducer {
public TestDriverProducer(final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
- final LogContext logContext) {
- super(config, "TopologyTestDriver-StreamThread-1", clientSupplier, new TaskId(0, 0), UUID.randomUUID(), logContext);
+ final LogContext logContext,
+ final Time time) {
+ super(config, "TopologyTestDriver-StreamThread-1", clientSupplier, new TaskId(0, 0), UUID.randomUUID(), logContext, time);
}
@Override