diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index e0fc987a3da8..47269fb354ea 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -233,6 +233,7 @@ protected void finalOffsetCommit(boolean failed) { @Override public void removeMetrics() { Utils.closeQuietly(transactionMetrics, "source task transaction metrics tracker"); + super.removeMetrics(); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index 7882b862c848..9edaf08efe68 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -82,7 +83,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Collectors; +import static java.util.Collections.emptySet; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; @@ -291,6 +294,23 @@ private void createWorkerTask(TargetState initialState, Converter keyConverter, sourceConfig, Runnable::run, preProducerCheck, postProducerCheck); } + @Test + public void testRemoveMetrics() { + createWorkerTask(); + + workerTask.removeMetrics(); + + assertEquals(emptySet(), filterToTaskMetrics(metrics.metrics().metrics().keySet())); + } + + private Set filterToTaskMetrics(Set metricNames) { + return metricNames + .stream() + .filter(m -> metrics.registry().taskGroupName().equals(m.group()) + || metrics.registry().sourceTaskGroupName().equals(m.group())) + .collect(Collectors.toSet()); + } + @Test public void testStartPaused() throws Exception { createWorkerTask(TargetState.PAUSED);