-
Notifications
You must be signed in to change notification settings - Fork 0
[KAFKA-192] Change latency calculations in kafka fork #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KAFKA-192] Change latency calculations in kafka fork #2
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR modifies latency calculations in Kafka's MirrorSourceTask to use a custom eventProxiedTime
header when available, falling back to the record's timestamp. This change aims to provide more accurate latency measurements by accounting for additional processing time indicated by the header.
- Adds logic to check for and parse the
eventProxiedTime
header for latency calculations - Includes error handling for header parsing failures with fallback to record timestamp
- Imports StandardCharsets for proper string encoding
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
Outdated
Show resolved
Hide resolved
try { | ||
correctedTimestamp = Long.parseLong(new String(eventProxiedTimeHeader.value(), StandardCharsets.UTF_8)); | ||
} catch (Exception e) { | ||
log.error("Error parsing eventProxiedTime header value '{}' -- using record timestamp instead.", eventProxiedTimeHeader.value(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message logs the raw byte array from eventProxiedTimeHeader.value(), which will not display the actual string value. Consider logging the parsed string instead for better debugging.
log.error("Error parsing eventProxiedTime header value '{}' -- using record timestamp instead.", eventProxiedTimeHeader.value(), e); | |
log.error("Error parsing eventProxiedTime header value '{}' -- using record timestamp instead.", new String(eventProxiedTimeHeader.value(), StandardCharsets.UTF_8), e); |
Copilot uses AI. Check for mistakes.
f3fbdf6
to
6d73367
Compare
The approval(s) from tkornai do(es)n't fullfill the approvers requirements because:
|
b8cee59
to
7914712
Compare
The approval(s) from tkornai do(es)n't fullfill the approvers requirements because:
|
dbdb2c6
into
4.0-connect_sni_host_check_disable
Context
If present, use
eventProxiedTime
header. Otherwise, userecord.timestamp()
.eventProxiedTime
header should be replaced by standardized header name when available.