diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index a9d8dc43b35ba..f20f2c8fe3ba9 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -178,6 +178,7 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) { } TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); long correctedTimestamp = record.timestamp(); + long currentTimeMillis = System.currentTimeMillis(); Header eventProxiedTimeHeader = (Header) record.headers().lastWithName("eventProxiedTime"); // TODO: change to standardized header name when available if (eventProxiedTimeHeader != null) { String headerString = new String(eventProxiedTimeHeader.value(), StandardCharsets.UTF_8); @@ -187,7 +188,7 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) { log.error("Error parsing eventProxiedTime header value '{}' -- using record timestamp instead.", headerString, e); } } - long latency = System.currentTimeMillis() - correctedTimestamp; + long latency = currentTimeMillis - correctedTimestamp; metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); // Queue offset syncs only when offsetWriter is available