Skip to content

Commit

Permalink
KAFKA-13229: add total blocked time metric to streams (KIP-761) (apac…
Browse files Browse the repository at this point in the history
…he#11149)

* Add the following producer metrics:
flush-time-total: cumulative sum of time elapsed during in flush.
txn-init-time-total: cumulative sum of time elapsed during in initTransactions.
txn-begin-time-total: cumulative sum of time elapsed during in beginTransaction.
txn-send-offsets-time-total: cumulative sum of time elapsed during in sendOffsetsToTransaction.
txn-commit-time-total: cumulative sum of time elapsed during in commitTransaction.
txn-abort-time-total: cumulative sum of time elapsed during in abortTransaction.

* Add the following consumer metrics:
commited-time-total: cumulative sum of time elapsed during in committed.
commit-sync-time-total: cumulative sum of time elapsed during in commitSync.

* Add a total-blocked-time metric to streams that is the sum of:
consumer’s io-waittime-total
consumer’s iotime-total
consumer’s committed-time-total
consumer’s commit-sync-time-total
restore consumer’s io-waittime-total
restore consumer’s iotime-total
admin client’s io-waittime-total
admin client’s iotime-total
producer’s bufferpool-wait-time-total
producer's flush-time-total
producer's txn-init-time-total
producer's txn-begin-time-total
producer's txn-send-offsets-time-total
producer's txn-commit-time-total
producer's txn-abort-time-total

Reviewers: Bruno Cadonna <cadonna@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
rodesai authored and Ralph Debusmann committed Dec 22, 2021
1 parent a1f2cb9 commit ac89b59
Show file tree
Hide file tree
Showing 29 changed files with 1,302 additions and 44 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<suppress checks="NPathComplexity"
files="(MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
<suppress checks="JavaNCSS"
files="(ApiMessageType|FieldSpec|MessageDataGenerator).java"/>
files="(ApiMessageType|FieldSpec|MessageDataGenerator|KafkaConsumerTest).java"/>
<suppress checks="MethodLength"
files="(FieldSpec|MessageDataGenerator).java"/>
<suppress id="dontUseSystemExit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,7 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
acquireAndEnsureOpen();
long commitStart = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
offsets.forEach(this::updateLastSeenEpochIfNewer);
Expand All @@ -1493,6 +1494,7 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, fin
"committing offsets " + offsets);
}
} finally {
kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
release();
}
}
Expand Down Expand Up @@ -1871,9 +1873,11 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
acquireAndEnsureOpen();
long start = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
final Map<TopicPartition, OffsetAndMetadata> offsets;
offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
if (offsets == null) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
"committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
Expand All @@ -1883,6 +1887,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
return offsets;
}
} finally {
kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start);
release();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;

import java.util.concurrent.TimeUnit;
Expand All @@ -29,6 +30,8 @@ public class KafkaConsumerMetrics implements AutoCloseable {
private final MetricName lastPollMetricName;
private final Sensor timeBetweenPollSensor;
private final Sensor pollIdleSensor;
private final Sensor committedSensor;
private final Sensor commitSyncSensor;
private final Metrics metrics;
private long lastPollMs;
private long pollStartMs;
Expand Down Expand Up @@ -63,6 +66,26 @@ public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
metricGroupName,
"The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."),
new Avg());

this.commitSyncSensor = metrics.sensor("commit-sync-time-ns-total");
this.commitSyncSensor.add(
metrics.metricName(
"commit-sync-time-ns-total",
metricGroupName,
"The total time the consumer has spent in commitSync in nanoseconds"
),
new CumulativeSum()
);

this.committedSensor = metrics.sensor("committed-time-ns-total");
this.committedSensor.add(
metrics.metricName(
"committed-time-ns-total",
metricGroupName,
"The total time the consumer has spent in committed in nanoseconds"
),
new CumulativeSum()
);
}

public void recordPollStart(long pollStartMs) {
Expand All @@ -78,10 +101,20 @@ public void recordPollEnd(long pollEndMs) {
this.pollIdleSensor.record(pollIdleRatio);
}

public void recordCommitSync(long duration) {
this.commitSyncSensor.record(duration);
}

public void recordCommitted(long duration) {
this.committedSensor.record(duration);
}

@Override
public void close() {
metrics.removeMetric(lastPollMetricName);
metrics.removeSensor(timeBetweenPollSensor.name());
metrics.removeSensor(pollIdleSensor.name());
metrics.removeSensor(commitSyncSensor.name());
metrics.removeSensor(committedSensor.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
Expand Down Expand Up @@ -241,6 +242,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final String clientId;
// Visible for testing
final Metrics metrics;
private final KafkaProducerMetrics producerMetrics;
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
Expand Down Expand Up @@ -356,6 +358,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.producerMetrics = new KafkaProducerMetrics(metrics);
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Expand Down Expand Up @@ -590,9 +593,11 @@ else if (acks != -1)
public void initTransactions() {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
}

/**
Expand All @@ -613,7 +618,9 @@ public void initTransactions() {
public void beginTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
transactionManager.beginTransaction();
producerMetrics.recordBeginTxn(time.nanoseconds() - now);
}

/**
Expand Down Expand Up @@ -697,9 +704,11 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
throwIfInvalidGroupMetadata(groupMetadata);
throwIfNoTransactionManager();
throwIfProducerClosed();
long start = time.nanoseconds();
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}

/**
Expand Down Expand Up @@ -730,9 +739,11 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
public void commitTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
long commitStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
}

/**
Expand Down Expand Up @@ -761,9 +772,11 @@ public void abortTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
log.info("Aborting incomplete transaction");
long abortStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
}

/**
Expand Down Expand Up @@ -1124,12 +1137,16 @@ private void ensureValidRecordSize(int size) {
@Override
public void flush() {
log.trace("Flushing accumulated records in producer.");

long start = time.nanoseconds();
this.accumulator.beginFlush();
this.sender.wakeup();
try {
this.accumulator.awaitFlushCompletion();
} catch (InterruptedException e) {
throw new InterruptException("Flush interrupted.", e);
} finally {
producerMetrics.recordFlush(time.nanoseconds() - start);
}
}

Expand Down Expand Up @@ -1245,6 +1262,7 @@ private void close(Duration timeout, boolean swallowException) {
}

Utils.closeQuietly(interceptors, "producer interceptors", firstException);
Utils.closeQuietly(producerMetrics, "producer metrics wrapper", firstException);
Utils.closeQuietly(metrics, "producer metrics", firstException);
Utils.closeQuietly(keySerializer, "producer keySerializer", firstException);
Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;

import java.util.Map;

public class KafkaProducerMetrics implements AutoCloseable {

public static final String GROUP = "producer-metrics";
private static final String FLUSH = "flush";
private static final String TXN_INIT = "txn-init";
private static final String TXN_BEGIN = "txn-begin";
private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
private static final String TXN_COMMIT = "txn-commit";
private static final String TXN_ABORT = "txn-abort";
private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";

private final Map<String, String> tags;
private final Metrics metrics;
private final Sensor initTimeSensor;
private final Sensor beginTxnTimeSensor;
private final Sensor flushTimeSensor;
private final Sensor sendOffsetsSensor;
private final Sensor commitTxnSensor;
private final Sensor abortTxnSensor;

public KafkaProducerMetrics(Metrics metrics) {
this.metrics = metrics;
tags = this.metrics.config().tags();
flushTimeSensor = newLatencySensor(
FLUSH,
"Total time producer has spent in flush in nanoseconds."
);
initTimeSensor = newLatencySensor(
TXN_INIT,
"Total time producer has spent in initTransactions in nanoseconds."
);
beginTxnTimeSensor = newLatencySensor(
TXN_BEGIN,
"Total time producer has spent in beginTransaction in nanoseconds."
);
sendOffsetsSensor = newLatencySensor(
TXN_SEND_OFFSETS,
"Total time producer has spent in sendOffsetsToTransaction."
);
commitTxnSensor = newLatencySensor(
TXN_COMMIT,
"Total time producer has spent in commitTransaction."
);
abortTxnSensor = newLatencySensor(
TXN_ABORT,
"Total time producer has spent in abortTransaction."
);
}

@Override
public void close() {
removeMetric(FLUSH);
removeMetric(TXN_INIT);
removeMetric(TXN_BEGIN);
removeMetric(TXN_SEND_OFFSETS);
removeMetric(TXN_COMMIT);
removeMetric(TXN_ABORT);
}

public void recordFlush(long duration) {
flushTimeSensor.record(duration);
}

public void recordInit(long duration) {
initTimeSensor.record(duration);
}

public void recordBeginTxn(long duration) {
beginTxnTimeSensor.record(duration);
}

public void recordSendOffsets(long duration) {
sendOffsetsSensor.record(duration);
}

public void recordCommitTxn(long duration) {
commitTxnSensor.record(duration);
}

public void recordAbortTxn(long duration) {
abortTxnSensor.record(duration);
}

private Sensor newLatencySensor(String name, String description) {
Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX);
sensor.add(metricName(name, description), new CumulativeSum());
return sensor;
}

private MetricName metricName(final String name, final String description) {
return metrics.metricName(name + TOTAL_TIME_SUFFIX, GROUP, description, tags);
}

private void removeMetric(final String name) {
metrics.removeSensor(name + TOTAL_TIME_SUFFIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

public class SenderMetricsRegistry {

final static String METRIC_GROUP_NAME = "producer-metrics";
final static String TOPIC_METRIC_GROUP_NAME = "producer-topic-metrics";

private final List<MetricNameTemplate> allTemplates;
Expand Down Expand Up @@ -154,7 +153,7 @@ public SenderMetricsRegistry(Metrics metrics) {
}

private MetricName createMetricName(String name, String description) {
return this.metrics.metricInstance(createTemplate(name, METRIC_GROUP_NAME, description, this.tags));
return this.metrics.metricInstance(createTemplate(name, KafkaProducerMetrics.GROUP, description, this.tags));
}

private MetricNameTemplate createTopicTemplate(String name, String description) {
Expand Down
Loading

0 comments on commit ac89b59

Please sign in to comment.