Skip to content

Commit

Permalink
Backport code to drop internal errors encountered during task process…
Browse files Browse the repository at this point in the history
…ing (#5385)

## What changed?

Internal errors encountered during task processing will be dropped when
this new config is enabled.

## Why?

These errors represent unprocessable tasks, so should not block our task
queues.

## How did you test it?


## Potential risks

We're not 100% certain that we only return internal errors when a task
is unprocessable, so this will be enabled by dynamicconfig for now.

## Is hotfix candidate?
  • Loading branch information
tdeebswihart committed Feb 1, 2024
1 parent 2647b36 commit 64fe53c
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 3 deletions.
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,9 @@ const (
ReplicationBypassCorruptedData = "history.ReplicationBypassCorruptedData"
// ReplicationEnableDLQMetrics is the flag to emit DLQ metrics
ReplicationEnableDLQMetrics = "history.ReplicationEnableDLQMetrics"
// HistoryTaskDLQInteralErrors causes history task processing to send tasks failing with serviceerror.Internal to
// the dlq (or will drop them if not enabled)
HistoryTaskDropInternalErrors = "history.TaskDLQInternalErrors"

// ReplicationStreamSyncStatusDuration sync replication status duration
ReplicationStreamSyncStatusDuration = "history.ReplicationStreamSyncStatusDuration"
Expand Down
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,7 @@ var (
)
TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter")
TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover")
TaskInternalErrorCounter = NewCounterDef("task_errors_internal")
TaskThrottledCounter = NewCounterDef(
"task_errors_throttled",
WithDescription("The number of history task processing errors caused by resource exhausted errors, excluding workflow busy case."),
Expand Down
1 change: 1 addition & 0 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
mockMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
err := executable.Execute()
if len(p.ExpectedErrorSubstrings) > 0 {
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type Config struct {
QueueCriticalSlicesCount dynamicconfig.IntPropertyFn
QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn
QueueMaxReaderCount dynamicconfig.IntPropertyFn
TaskDropInternalErrors dynamicconfig.BoolPropertyFn

TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn
TaskSchedulerEnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -380,6 +381,7 @@ func NewConfig(
QueueCriticalSlicesCount: dc.GetIntProperty(dynamicconfig.QueueCriticalSlicesCount, 50),
QueuePendingTaskMaxCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskMaxCount, 10000),
QueueMaxReaderCount: dc.GetIntProperty(dynamicconfig.QueueMaxReaderCount, 2),
TaskDropInternalErrors: dc.GetBoolProperty(dynamicconfig.HistoryTaskDropInternalErrors, false),

TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter, false),
TaskSchedulerEnableRateLimiterShadowMode: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiterShadowMode, true),
Expand Down
14 changes: 14 additions & 0 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -129,6 +130,7 @@ type (
lastActiveness bool
resourceExhaustedCount int // does NOT include consts.ErrResourceExhaustedBusyWorkflow
taggedMetricsHandler metrics.Handler
dropInternalErrors dynamicconfig.BoolPropertyFn
}
)

Expand All @@ -144,7 +146,11 @@ func NewExecutable(
clusterMetadata cluster.Metadata,
logger log.Logger,
metricsHandler metrics.Handler,
dropInternalErrors dynamicconfig.BoolPropertyFn,
) Executable {
if dropInternalErrors == nil {
dropInternalErrors = func() bool { return false }
}
executable := &executableImpl{
Task: task,
state: ctasks.TaskStatePending,
Expand All @@ -166,6 +172,7 @@ func NewExecutable(
),
metricsHandler: metricsHandler,
taggedMetricsHandler: metricsHandler,
dropInternalErrors: dropInternalErrors,
}
executable.updatePriority()
return executable
Expand Down Expand Up @@ -341,6 +348,13 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
e.logger.Error("Drop task due to serialization error", tag.Error(err))
return nil
}
if common.IsInternalError(err) {
e.logger.Error("Encountered internal error processing tasks", tag.Error(err))
e.taggedMetricsHandler.Counter(metrics.TaskInternalErrorCounter.GetMetricName()).Record(1)
if e.dropInternalErrors() {
return nil
}
}

e.taggedMetricsHandler.Counter(metrics.TaskFailures.GetMetricName()).Record(1)

Expand Down
43 changes: 42 additions & 1 deletion service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
Expand All @@ -65,6 +66,11 @@ type (

timeSource *clock.EventTimeSource
}

params struct {
dropInternalErrors dynamicconfig.BoolPropertyFn
}
option func(*params)
)

func TestExecutableSuite(t *testing.T) {
Expand Down Expand Up @@ -297,6 +303,34 @@ func (s *executableSuite) TestExecuteHandleErr_Corrupted() {
s.NoError(executable.HandleErr(err))
}

func (s *executableSuite) TestExecute_DropsInternalErrors_WhenEnabled() {
executable := s.newTestExecutable(func(p *params) {
p.dropInternalErrors = func() bool { return true }
})

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn(
func(_ context.Context, _ Executable) ([]metrics.Tag, bool, error) {
panic(serviceerror.NewInternal("injected error"))
},
)

s.NoError(executable.HandleErr(executable.Execute()))
}

func (s *executableSuite) TestExecute_DoesntDropInternalErrors_WhenDisabled() {
executable := s.newTestExecutable(func(p *params) {
p.dropInternalErrors = func() bool { return false }
})

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn(
func(_ context.Context, _ Executable) ([]metrics.Tag, bool, error) {
panic(serviceerror.NewInternal("injected error"))
},
)

s.Error(executable.HandleErr(executable.Execute()))
}

func (s *executableSuite) TestHandleErr_EntityNotExists() {
executable := s.newTestExecutable()

Expand Down Expand Up @@ -408,7 +442,13 @@ func (s *executableSuite) TestTaskCancellation() {
s.False(executable.IsRetryableError(errors.New("some random error")))
}

func (s *executableSuite) newTestExecutable() Executable {
func (s *executableSuite) newTestExecutable(opts ...option) Executable {
p := params{
dropInternalErrors: func() bool { return false },
}
for _, opt := range opts {
opt(&p)
}
return NewExecutable(
DefaultReaderId,
tasks.NewFakeTask(
Expand All @@ -429,5 +469,6 @@ func (s *executableSuite) newTestExecutable() Executable {
s.mockClusterMetadata,
log.NewTestLogger(),
metrics.NoopMetricsHandler,
p.dropInternalErrors,
)
}
1 change: 1 addition & 0 deletions service/history/queues/memory_scheduled_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (s *memoryScheduledQueueSuite) newSpeculativeWorkflowTaskTimeoutTestExecuta
nil,
nil,
nil,
func() bool { return false },
),
wttt,
)
Expand Down
1 change: 1 addition & 0 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func newQueueBase(
shard.GetClusterMetadata(),
logger,
metricsHandler,
shard.GetConfig().TaskDropInternalErrors,
)
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *readerSuite) SetupTest() {
s.metricsHandler = metrics.NoopMetricsHandler

s.executableInitializer = func(readerID int64, t tasks.Task) Executable {
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler)
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler, func() bool { return false })
}
s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{
PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000),
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *sliceSuite) SetupTest() {
s.controller = gomock.NewController(s.T())

s.executableInitializer = func(readerID int64, t tasks.Task) Executable {
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler)
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler, func() bool { return false })
}
s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{
PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (q SpeculativeWorkflowTaskTimeoutQueue) NotifyNewTasks(ts []tasks.Task) {
q.clusterMetadata,
q.logger,
q.metricsHandler,
func() bool { return false },
), wttt)
q.timeoutQueue.Add(executable)
}
Expand Down
1 change: 1 addition & 0 deletions service/history/timer_queue_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1610,5 +1610,6 @@ func (s *timerQueueActiveTaskExecutorSuite) newTaskExecutable(
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}
1 change: 1 addition & 0 deletions service/history/timer_queue_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1505,5 +1505,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) newTaskExecutable(
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2805,5 +2805,6 @@ func (s *transferQueueActiveTaskExecutorSuite) newTaskExecutable(
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1269,5 +1269,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) newTaskExecutable(
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}
1 change: 1 addition & 0 deletions service/history/visibility_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,5 +617,6 @@ func (s *visibilityQueueTaskExecutorSuite) newTaskExecutable(
s.mockShard.GetClusterMetadata(),
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}

0 comments on commit 64fe53c

Please sign in to comment.