From 64fe53cb91419474dbcb20e857e0d71105b8880f Mon Sep 17 00:00:00 2001 From: Tim Deeb-Swihart <409226+tdeebswihart@users.noreply.github.com> Date: Thu, 1 Feb 2024 15:00:00 -0800 Subject: [PATCH] Backport code to drop internal errors encountered during task processing (#5385) ## What changed? Internal errors encountered during task processing will be dropped when this new config is enabled. ## Why? These errors represent unprocessable tasks, so should not block our task queues. ## How did you test it? ## Potential risks We're not 100% certain that we only return internal errors when a task is unprocessable, so this will be enabled by dynamicconfig for now. ## Is hotfix candidate? --- common/dynamicconfig/constants.go | 3 ++ common/metrics/metric_defs.go | 1 + .../archival_queue_task_executor_test.go | 1 + service/history/configs/config.go | 2 + service/history/queues/executable.go | 14 ++++++ service/history/queues/executable_test.go | 43 ++++++++++++++++++- .../queues/memory_scheduled_queue_test.go | 1 + service/history/queues/queue_base.go | 1 + service/history/queues/reader_test.go | 2 +- service/history/queues/slice_test.go | 2 +- ...speculative_workflow_task_timeout_queue.go | 1 + .../timer_queue_active_task_executor_test.go | 1 + .../timer_queue_standby_task_executor_test.go | 1 + ...ransfer_queue_active_task_executor_test.go | 1 + ...ansfer_queue_standby_task_executor_test.go | 1 + .../visibility_queue_task_executor_test.go | 1 + 16 files changed, 73 insertions(+), 3 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a042142cfe4..9af0e4a54b7 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -820,6 +820,9 @@ const ( ReplicationBypassCorruptedData = "history.ReplicationBypassCorruptedData" // ReplicationEnableDLQMetrics is the flag to emit DLQ metrics ReplicationEnableDLQMetrics = "history.ReplicationEnableDLQMetrics" + // HistoryTaskDLQInteralErrors causes history task processing to send tasks failing with serviceerror.Internal to + // the dlq (or will drop them if not enabled) + HistoryTaskDropInternalErrors = "history.TaskDLQInternalErrors" // ReplicationStreamSyncStatusDuration sync replication status duration ReplicationStreamSyncStatusDuration = "history.ReplicationStreamSyncStatusDuration" diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 8468a3ee0c5..86314e78e1f 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1298,6 +1298,7 @@ var ( ) TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter") TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover") + TaskInternalErrorCounter = NewCounterDef("task_errors_internal") TaskThrottledCounter = NewCounterDef( "task_errors_throttled", WithDescription("The number of history task processing errors caused by resource exhausted errors, excluding workflow busy case."), diff --git a/service/history/archival_queue_task_executor_test.go b/service/history/archival_queue_task_executor_test.go index 9d3ab1561c2..ede269dc9ac 100644 --- a/service/history/archival_queue_task_executor_test.go +++ b/service/history/archival_queue_task_executor_test.go @@ -520,6 +520,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) { mockMetadata, nil, metrics.NoopMetricsHandler, + func() bool { return false }, ) err := executable.Execute() if len(p.ExpectedErrorSubstrings) > 0 { diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 5dfb32332fa..b65c3dff5f7 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -97,6 +97,7 @@ type Config struct { QueueCriticalSlicesCount dynamicconfig.IntPropertyFn QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn QueueMaxReaderCount dynamicconfig.IntPropertyFn + TaskDropInternalErrors dynamicconfig.BoolPropertyFn TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn TaskSchedulerEnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn @@ -380,6 +381,7 @@ func NewConfig( QueueCriticalSlicesCount: dc.GetIntProperty(dynamicconfig.QueueCriticalSlicesCount, 50), QueuePendingTaskMaxCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskMaxCount, 10000), QueueMaxReaderCount: dc.GetIntProperty(dynamicconfig.QueueMaxReaderCount, 2), + TaskDropInternalErrors: dc.GetBoolProperty(dynamicconfig.HistoryTaskDropInternalErrors, false), TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter, false), TaskSchedulerEnableRateLimiterShadowMode: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiterShadowMode, true), diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index 87b8d5b7e46..82d3926c120 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -41,6 +41,7 @@ import ( "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -129,6 +130,7 @@ type ( lastActiveness bool resourceExhaustedCount int // does NOT include consts.ErrResourceExhaustedBusyWorkflow taggedMetricsHandler metrics.Handler + dropInternalErrors dynamicconfig.BoolPropertyFn } ) @@ -144,7 +146,11 @@ func NewExecutable( clusterMetadata cluster.Metadata, logger log.Logger, metricsHandler metrics.Handler, + dropInternalErrors dynamicconfig.BoolPropertyFn, ) Executable { + if dropInternalErrors == nil { + dropInternalErrors = func() bool { return false } + } executable := &executableImpl{ Task: task, state: ctasks.TaskStatePending, @@ -166,6 +172,7 @@ func NewExecutable( ), metricsHandler: metricsHandler, taggedMetricsHandler: metricsHandler, + dropInternalErrors: dropInternalErrors, } executable.updatePriority() return executable @@ -341,6 +348,13 @@ func (e *executableImpl) HandleErr(err error) (retErr error) { e.logger.Error("Drop task due to serialization error", tag.Error(err)) return nil } + if common.IsInternalError(err) { + e.logger.Error("Encountered internal error processing tasks", tag.Error(err)) + e.taggedMetricsHandler.Counter(metrics.TaskInternalErrorCounter.GetMetricName()).Record(1) + if e.dropInternalErrors() { + return nil + } + } e.taggedMetricsHandler.Counter(metrics.TaskFailures.GetMetricName()).Record(1) diff --git a/service/history/queues/executable_test.go b/service/history/queues/executable_test.go index 7ea1dbd099c..83657e05aa2 100644 --- a/service/history/queues/executable_test.go +++ b/service/history/queues/executable_test.go @@ -40,6 +40,7 @@ import ( "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" @@ -65,6 +66,11 @@ type ( timeSource *clock.EventTimeSource } + + params struct { + dropInternalErrors dynamicconfig.BoolPropertyFn + } + option func(*params) ) func TestExecutableSuite(t *testing.T) { @@ -297,6 +303,34 @@ func (s *executableSuite) TestExecuteHandleErr_Corrupted() { s.NoError(executable.HandleErr(err)) } +func (s *executableSuite) TestExecute_DropsInternalErrors_WhenEnabled() { + executable := s.newTestExecutable(func(p *params) { + p.dropInternalErrors = func() bool { return true } + }) + + s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn( + func(_ context.Context, _ Executable) ([]metrics.Tag, bool, error) { + panic(serviceerror.NewInternal("injected error")) + }, + ) + + s.NoError(executable.HandleErr(executable.Execute())) +} + +func (s *executableSuite) TestExecute_DoesntDropInternalErrors_WhenDisabled() { + executable := s.newTestExecutable(func(p *params) { + p.dropInternalErrors = func() bool { return false } + }) + + s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn( + func(_ context.Context, _ Executable) ([]metrics.Tag, bool, error) { + panic(serviceerror.NewInternal("injected error")) + }, + ) + + s.Error(executable.HandleErr(executable.Execute())) +} + func (s *executableSuite) TestHandleErr_EntityNotExists() { executable := s.newTestExecutable() @@ -408,7 +442,13 @@ func (s *executableSuite) TestTaskCancellation() { s.False(executable.IsRetryableError(errors.New("some random error"))) } -func (s *executableSuite) newTestExecutable() Executable { +func (s *executableSuite) newTestExecutable(opts ...option) Executable { + p := params{ + dropInternalErrors: func() bool { return false }, + } + for _, opt := range opts { + opt(&p) + } return NewExecutable( DefaultReaderId, tasks.NewFakeTask( @@ -429,5 +469,6 @@ func (s *executableSuite) newTestExecutable() Executable { s.mockClusterMetadata, log.NewTestLogger(), metrics.NoopMetricsHandler, + p.dropInternalErrors, ) } diff --git a/service/history/queues/memory_scheduled_queue_test.go b/service/history/queues/memory_scheduled_queue_test.go index c01c6e464fe..402368fd71d 100644 --- a/service/history/queues/memory_scheduled_queue_test.go +++ b/service/history/queues/memory_scheduled_queue_test.go @@ -184,6 +184,7 @@ func (s *memoryScheduledQueueSuite) newSpeculativeWorkflowTaskTimeoutTestExecuta nil, nil, nil, + func() bool { return false }, ), wttt, ) diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 3afb2645a31..d7e61f428be 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -168,6 +168,7 @@ func newQueueBase( shard.GetClusterMetadata(), logger, metricsHandler, + shard.GetConfig().TaskDropInternalErrors, ) } diff --git a/service/history/queues/reader_test.go b/service/history/queues/reader_test.go index 759fde09096..12e4974a041 100644 --- a/service/history/queues/reader_test.go +++ b/service/history/queues/reader_test.go @@ -77,7 +77,7 @@ func (s *readerSuite) SetupTest() { s.metricsHandler = metrics.NoopMetricsHandler s.executableInitializer = func(readerID int64, t tasks.Task) Executable { - return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler) + return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler, func() bool { return false }) } s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{ PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000), diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go index 9cbfdcc7043..b2308466d42 100644 --- a/service/history/queues/slice_test.go +++ b/service/history/queues/slice_test.go @@ -69,7 +69,7 @@ func (s *sliceSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.executableInitializer = func(readerID int64, t tasks.Task) Executable { - return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler) + return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler, func() bool { return false }) } s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{ PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000), diff --git a/service/history/queues/speculative_workflow_task_timeout_queue.go b/service/history/queues/speculative_workflow_task_timeout_queue.go index 61ac18953b6..0c18a3da091 100644 --- a/service/history/queues/speculative_workflow_task_timeout_queue.go +++ b/service/history/queues/speculative_workflow_task_timeout_queue.go @@ -106,6 +106,7 @@ func (q SpeculativeWorkflowTaskTimeoutQueue) NotifyNewTasks(ts []tasks.Task) { q.clusterMetadata, q.logger, q.metricsHandler, + func() bool { return false }, ), wttt) q.timeoutQueue.Add(executable) } diff --git a/service/history/timer_queue_active_task_executor_test.go b/service/history/timer_queue_active_task_executor_test.go index 311108c083c..b3f4a0911c5 100644 --- a/service/history/timer_queue_active_task_executor_test.go +++ b/service/history/timer_queue_active_task_executor_test.go @@ -1610,5 +1610,6 @@ func (s *timerQueueActiveTaskExecutorSuite) newTaskExecutable( s.mockClusterMetadata, nil, metrics.NoopMetricsHandler, + func() bool { return false }, ) } diff --git a/service/history/timer_queue_standby_task_executor_test.go b/service/history/timer_queue_standby_task_executor_test.go index 1aa0b862fbb..c8431df0af5 100644 --- a/service/history/timer_queue_standby_task_executor_test.go +++ b/service/history/timer_queue_standby_task_executor_test.go @@ -1505,5 +1505,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) newTaskExecutable( s.mockClusterMetadata, nil, metrics.NoopMetricsHandler, + func() bool { return false }, ) } diff --git a/service/history/transfer_queue_active_task_executor_test.go b/service/history/transfer_queue_active_task_executor_test.go index 2d63fa0c789..0abf0511479 100644 --- a/service/history/transfer_queue_active_task_executor_test.go +++ b/service/history/transfer_queue_active_task_executor_test.go @@ -2805,5 +2805,6 @@ func (s *transferQueueActiveTaskExecutorSuite) newTaskExecutable( s.mockClusterMetadata, nil, metrics.NoopMetricsHandler, + func() bool { return false }, ) } diff --git a/service/history/transfer_queue_standby_task_executor_test.go b/service/history/transfer_queue_standby_task_executor_test.go index b94ff99aae5..ca146359364 100644 --- a/service/history/transfer_queue_standby_task_executor_test.go +++ b/service/history/transfer_queue_standby_task_executor_test.go @@ -1269,5 +1269,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) newTaskExecutable( s.mockClusterMetadata, nil, metrics.NoopMetricsHandler, + func() bool { return false }, ) } diff --git a/service/history/visibility_queue_task_executor_test.go b/service/history/visibility_queue_task_executor_test.go index 2f642030c34..0a2aa81b158 100644 --- a/service/history/visibility_queue_task_executor_test.go +++ b/service/history/visibility_queue_task_executor_test.go @@ -617,5 +617,6 @@ func (s *visibilityQueueTaskExecutorSuite) newTaskExecutable( s.mockShard.GetClusterMetadata(), nil, metrics.NoopMetricsHandler, + func() bool { return false }, ) }