diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 1ef74a7476f..4b7fc6fa727 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -823,6 +823,9 @@ const ( ReplicationBypassCorruptedData = "history.ReplicationBypassCorruptedData" // ReplicationEnableDLQMetrics is the flag to emit DLQ metrics ReplicationEnableDLQMetrics = "history.ReplicationEnableDLQMetrics" + // HistoryTaskDLQEnabled enables the history task DLQ. This applies to internal tasks like transfer and timer tasks. + // Do not turn this on if you aren't using Cassandra as the history task DLQ is not implemented for other databases. + HistoryTaskDLQEnabled = "history.TaskDLQEnabled" // ReplicationStreamSyncStatusDuration sync replication status duration ReplicationStreamSyncStatusDuration = "history.ReplicationStreamSyncStatusDuration" diff --git a/service/history/configs/config.go b/service/history/configs/config.go index e00ac392971..f2e82a09b88 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -100,6 +100,8 @@ type Config struct { QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn QueueMaxReaderCount dynamicconfig.IntPropertyFn + TaskDLQEnabled dynamicconfig.BoolPropertyFn + TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn TaskSchedulerEnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn TaskSchedulerRateLimiterStartupDelay dynamicconfig.DurationPropertyFn @@ -383,6 +385,8 @@ func NewConfig( QueuePendingTaskMaxCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskMaxCount, 10000), QueueMaxReaderCount: dc.GetIntProperty(dynamicconfig.QueueMaxReaderCount, 2), + TaskDLQEnabled: dc.GetBoolProperty(dynamicconfig.HistoryTaskDLQEnabled, false), + TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter, false), TaskSchedulerEnableRateLimiterShadowMode: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiterShadowMode, true), TaskSchedulerRateLimiterStartupDelay: dc.GetDurationProperty(dynamicconfig.TaskSchedulerRateLimiterStartupDelay, 30*time.Second), diff --git a/service/history/dlq.go b/service/history/dlq.go new file mode 100644 index 00000000000..3fca85a6a24 --- /dev/null +++ b/service/history/dlq.go @@ -0,0 +1,88 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "go.temporal.io/server/common/clock" + "go.temporal.io/server/service/history/configs" + "go.uber.org/fx" + + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/service/history/queues" +) + +type ( + executableDLQWrapper struct { + historyTaskQueueManager persistence.HistoryTaskQueueManager + clusterName string + timeSource clock.TimeSource + useDLQ dynamicconfig.BoolPropertyFn + } + executableDLQWrapperParams struct { + fx.In + + HistoryTaskQueueManager persistence.HistoryTaskQueueManager + ClusterMetadata cluster.Metadata + Config *configs.Config + TimeSource clock.TimeSource + } + dlqToggle struct { + queues.Executable + executableDLQ *queues.ExecutableDLQ + useDLQ dynamicconfig.BoolPropertyFn + } +) + +func NewExecutableDLQWrapper(params executableDLQWrapperParams) queues.ExecutableWrapper { + return executableDLQWrapper{ + historyTaskQueueManager: params.HistoryTaskQueueManager, + clusterName: params.ClusterMetadata.GetCurrentClusterName(), + timeSource: params.TimeSource, + useDLQ: params.Config.TaskDLQEnabled, + } +} + +func (d executableDLQWrapper) Wrap(e queues.Executable) queues.Executable { + executableDLQ := queues.NewExecutableDLQ( + e, + d.historyTaskQueueManager, + d.timeSource, + d.clusterName, + ) + return &dlqToggle{ + Executable: e, + executableDLQ: executableDLQ, + useDLQ: d.useDLQ, + } +} + +func (t *dlqToggle) Execute() error { + if t.useDLQ() { + return t.executableDLQ.Execute() + } + return t.Executable.Execute() +} diff --git a/service/history/dlq_test.go b/service/history/dlq_test.go new file mode 100644 index 00000000000..d5312e6d231 --- /dev/null +++ b/service/history/dlq_test.go @@ -0,0 +1,134 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/service/history/configs" + "go.uber.org/fx" + "go.uber.org/fx/fxtest" + + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/service/history" + "go.temporal.io/server/service/history/queues" + "go.temporal.io/server/service/history/queues/queuestest" +) + +type ( + testHistoryTaskQueueManager struct { + persistence.HistoryTaskQueueManager + requests []*persistence.EnqueueTaskRequest + } + fakeMetadata struct { + cluster.Metadata + } +) + +var errTerminal = new(serialization.DeserializationError) + +func TestNewExecutableDLQWrapper(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + enableDLQ bool + }{ + { + name: "DLQ enabled", + enableDLQ: true, + }, + { + name: "DLQ disabled", + enableDLQ: false, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + tqm := &testHistoryTaskQueueManager{} + var w queues.ExecutableWrapper + fxtest.New( + t, + fx.Provide( + func() persistence.HistoryTaskQueueManager { + return tqm + }, + func() cluster.Metadata { + return fakeMetadata{} + }, + func() *configs.Config { + dc := dynamicconfig.NewCollection(dynamicconfig.StaticClient(map[dynamicconfig.Key]interface{}{ + dynamicconfig.HistoryTaskDLQEnabled: tc.enableDLQ, + }), log.NewTestLogger()) + return configs.NewConfig(dc, 1, false, false) + }, + func() clock.TimeSource { + return clock.NewEventTimeSource() + }, + fx.Annotate(history.NewExecutableDLQWrapper, fx.As(new(queues.ExecutableWrapper))), + ), + fx.Populate(&w), + ) + executable := w.Wrap(queuestest.NewFakeExecutable(nil, errTerminal)) + + err := executable.Execute() + if tc.enableDLQ { + assert.ErrorIs(t, err, queues.ErrTerminalTaskFailure) + } else { + assert.NotErrorIs(t, err, queues.ErrTerminalTaskFailure) + assert.ErrorIs(t, err, errTerminal) + } + err = executable.Execute() + if tc.enableDLQ { + assert.NoError(t, err) + assert.Len(t, tqm.requests, 1) + } else { + assert.ErrorIs(t, err, errTerminal) + assert.Empty(t, tqm.requests) + } + }) + } +} + +func (f fakeMetadata) GetCurrentClusterName() string { + return "test-cluster-name" +} + +func (t *testHistoryTaskQueueManager) EnqueueTask( + _ context.Context, + request *persistence.EnqueueTaskRequest, +) (*persistence.EnqueueTaskResponse, error) { + t.requests = append(t.requests, request) + return nil, nil +} diff --git a/service/history/queue_factory_base.go b/service/history/queue_factory_base.go index 78865e66862..1201b6eeb9d 100644 --- a/service/history/queue_factory_base.go +++ b/service/history/queue_factory_base.go @@ -93,6 +93,7 @@ type ( var QueueModule = fx.Options( fx.Provide(QueueSchedulerRateLimiterProvider), + fx.Provide(NewExecutableDLQWrapper), fx.Provide( fx.Annotated{ Group: QueueFactoryFxGroup, diff --git a/service/history/queues/executable_factory.go b/service/history/queues/executable_factory.go index 3e0b8d7f5bd..fc47e1f83c9 100644 --- a/service/history/queues/executable_factory.go +++ b/service/history/queues/executable_factory.go @@ -44,6 +44,8 @@ type ( ExecutableWrapper interface { Wrap(e Executable) Executable } + // ExecutableWrapperFn is a convenience type to avoid having to create a struct that implements ExecutableWrapper. + ExecutableWrapperFn func(e Executable) Executable executableFactoryImpl struct { executor Executor @@ -115,3 +117,7 @@ func (f *executableFactoryImpl) NewExecutable(task tasks.Task, readerID int64) E func (f executableFactoryWrapper) NewExecutable(task tasks.Task, readerID int64) Executable { return f.wrapper.Wrap(f.factory.NewExecutable(task, readerID)) } + +func (f ExecutableWrapperFn) Wrap(e Executable) Executable { + return f(e) +}