diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 6ab65bebdca2e..a9d8dc43b35ba 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -176,7 +177,17 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) { return; } TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - long latency = System.currentTimeMillis() - record.timestamp(); + long correctedTimestamp = record.timestamp(); + Header eventProxiedTimeHeader = (Header) record.headers().lastWithName("eventProxiedTime"); // TODO: change to standardized header name when available + if (eventProxiedTimeHeader != null) { + String headerString = new String(eventProxiedTimeHeader.value(), StandardCharsets.UTF_8); + try { + correctedTimestamp = Long.parseLong(headerString); + } catch (NumberFormatException e) { + log.error("Error parsing eventProxiedTime header value '{}' -- using record timestamp instead.", headerString, e); + } + } + long latency = System.currentTimeMillis() - correctedTimestamp; metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); // Queue offset syncs only when offsetWriter is available