-
Notifications
You must be signed in to change notification settings - Fork 841
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backport code to drop internal errors encountered during task processing #5385
Backport code to drop internal errors encountered during task processing #5385
Conversation
…ed (#5382) Internal errors encountered during task processing will be dropped (or sent to the DLQ) when this new config is enabled. These errors represent unprocessable tasks, so should not block our task queues. We're not 100% certain that we only return internal errors when a task is unprocessable, so this will be enabled by dynamicconfig for now.
This is more accurate for the behavior in 1.22.x but I want to use the same DC property
@@ -341,6 +348,13 @@ func (e *executableImpl) HandleErr(err error) (retErr error) { | |||
e.logger.Error("Drop task due to serialization error", tag.Error(err)) | |||
return nil | |||
} | |||
if common.IsInternalError(err) { | |||
e.logger.Error("Encountered internal error processing tasks", tag.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should not this log.Error be inside the if e.dropInternalErrors()
? if you are not returning you log the error anyway in line 361.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Looks like we missed this in my patch to v1.23 as well. I'll follow up with 1.23 to fix that when I next have a chance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think idea here is that even when dropInternalErrors
is false
, we still log and emit metric for the internal errors so that we can be confident that they are safe to drop. and then enable the flag. It's a shadow mode basically.
Having this log outside dropInternalErrors
makes it easier to filter logs and only look at internal error logs. With only the log on L361, I don't think there's a good way to audit just the internal errors. The Error() method of an InternalError doesn't say it's an internal error 🤦.
We can add extra tags, if double logging is a concern. The volume of InternalError should be ~0 though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add a tags.ErrorType tag to the main log message here. That'd give us a single log that handles both cases.
I'm going to do so in a separate PR and port over the code from https://github.com/temporalio/temporal/pull/5234/files#diff-3045f1928c46472037eb67e844c90f8745e01cd321e058ba212e3ea1a6b5147fR51 to unwrap things like fmt.wrapErr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put up a pr at #5400
@@ -297,6 +303,34 @@ func (s *executableSuite) TestExecuteHandleErr_Corrupted() { | |||
s.NoError(executable.HandleErr(err)) | |||
} | |||
|
|||
func (s *executableSuite) TestExecute_DropsInternalErrors_WhenEnabled() { | |||
executable := s.newTestExecutable(func(p *params) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would call it dropErrorsExecutable
}, | ||
) | ||
|
||
s.NoError(executable.HandleErr(executable.Execute())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would introduce variable for executable.HandleErr result. This way action will be separated from assertion.
@@ -184,6 +184,7 @@ func (s *memoryScheduledQueueSuite) newSpeculativeWorkflowTaskTimeoutTestExecuta | |||
nil, | |||
nil, | |||
nil, | |||
func() bool { return false }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is "drop internal errors", right?
@@ -69,7 +69,7 @@ func (s *sliceSuite) SetupTest() { | |||
s.controller = gomock.NewController(s.T()) | |||
|
|||
s.executableInitializer = func(readerID int64, t tasks.Task) Executable { | |||
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler) | |||
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler, func() bool { return false }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can create a function doDropInternalErrors
or something like this.
What changed?
Internal errors encountered during task processing will be dropped when this new config is enabled.
Why?
These errors represent unprocessable tasks, so should not block our task
queues.
How did you test it?
Potential risks
We're not 100% certain that we only return internal errors when a task
is unprocessable, so this will be enabled by dynamicconfig for now.
Is hotfix candidate?