New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ExecutableDLQObserver #4881
Conversation
73c1363
to
5804a83
Compare
logger.Error("Failed to send history task to the DLQ", tag.Error(err)) | ||
} else if err == nil && o.terminalFailureTime != nil { | ||
latency := o.timeSource.Now().Sub(*o.terminalFailureTime) | ||
o.metricsHandler.Timer(metrics.TaskDLQSendLatency.GetMetricName()).Record(latency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this latency will also include the retry backoff? What does latency for send to DLQ exactly mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, probably not that useful. I think I was trying to track the E2E time from detection until it's actually DLQ'd, but that doesn't seem as important as the latency of the actual work. I changed it to just record the latency of the final execution, which successfully sends the task to the DLQ.
func (o *ExecutableDLQObserver) Execute() error { | ||
err := o.ExecutableDLQ.Execute() | ||
if errors.Is(err, ErrTerminalTaskFailure) { | ||
o.metricsHandler.Counter(metrics.TaskTerminalFailures.GetMetricName()).Record(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shall we also tag namespace and task type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added, and I added tests for them
service/history/dlq.go
Outdated
"go.temporal.io/server/common/clock" | ||
"go.temporal.io/server/common/log" | ||
"go.temporal.io/server/common/metrics" | ||
"go.temporal.io/server/service/history/configs" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: plz group with other server imports and fix other files as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8d0cfa0
to
3912b0b
Compare
3912b0b
to
984fe66
Compare
return err | ||
} | ||
|
||
func (o *ExecutableDLQObserver) getMetricsHandler() metrics.Handler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we just need to tag the metrics handler once and reuse the tagged handler I think. Same for the logger.
I remember I've implemented a lazyLogger and used in executableImpl
, you might want to take a look.
} | ||
} | ||
|
||
func (o *ExecutableDLQObserver) Execute() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR but it just occurred to me.
If the State of the underlying task is not Pending (e.g. queue is shutting down or multi-cursor impl want to offload/delete some tasks from memory) anymore, the execution should be skipped.
https://github.com/temporalio/temporal/blob/main/service/history/queues/executable.go#L188
984fe66
to
67021ec
Compare
What changed?
I added
queues.ExecutableDLQObserver
which adds logging and metrics to the history task DLQ.Why?
In order for us to alert on things like high DLQ latency, high DLQ rate, or high DLQ failure rate.
How did you test it?
There's 100% test coverage for the new code.
Potential risks
Is hotfix candidate?