Skip to content

Commit

Permalink
Wire up dynamic config-backed history DLQ
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 27, 2023
1 parent ea5d1b5 commit c8ccf1e
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 0 deletions.
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -823,6 +823,9 @@ const (
ReplicationBypassCorruptedData = "history.ReplicationBypassCorruptedData"
// ReplicationEnableDLQMetrics is the flag to emit DLQ metrics
ReplicationEnableDLQMetrics = "history.ReplicationEnableDLQMetrics"
// HistoryTaskDLQEnabled enables the history task DLQ. This applies to internal tasks like transfer and timer tasks.
// Do not turn this on if you aren't using Cassandra as the history task DLQ is not implemented for other databases.
HistoryTaskDLQEnabled = "history.TaskDLQEnabled"

// ReplicationStreamSyncStatusDuration sync replication status duration
ReplicationStreamSyncStatusDuration = "history.ReplicationStreamSyncStatusDuration"
Expand Down
4 changes: 4 additions & 0 deletions service/history/configs/config.go
Expand Up @@ -100,6 +100,8 @@ type Config struct {
QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn
QueueMaxReaderCount dynamicconfig.IntPropertyFn

TaskDLQEnabled dynamicconfig.BoolPropertyFn

TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn
TaskSchedulerEnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn
TaskSchedulerRateLimiterStartupDelay dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -383,6 +385,8 @@ func NewConfig(
QueuePendingTaskMaxCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskMaxCount, 10000),
QueueMaxReaderCount: dc.GetIntProperty(dynamicconfig.QueueMaxReaderCount, 2),

TaskDLQEnabled: dc.GetBoolProperty(dynamicconfig.HistoryTaskDLQEnabled, false),

TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter, false),
TaskSchedulerEnableRateLimiterShadowMode: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiterShadowMode, true),
TaskSchedulerRateLimiterStartupDelay: dc.GetDurationProperty(dynamicconfig.TaskSchedulerRateLimiterStartupDelay, 30*time.Second),
Expand Down
88 changes: 88 additions & 0 deletions service/history/dlq.go
@@ -0,0 +1,88 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/service/history/configs"
"go.uber.org/fx"

"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/queues"
)

type (
executableDLQWrapper struct {
historyTaskQueueManager persistence.HistoryTaskQueueManager
clusterName string
timeSource clock.TimeSource
useDLQ dynamicconfig.BoolPropertyFn
}
executableDLQWrapperParams struct {
fx.In

HistoryTaskQueueManager persistence.HistoryTaskQueueManager
ClusterMetadata cluster.Metadata
Config *configs.Config
TimeSource clock.TimeSource
}
dlqToggle struct {
queues.Executable
executableDLQ *queues.ExecutableDLQ
useDLQ dynamicconfig.BoolPropertyFn
}
)

func NewExecutableDLQWrapper(params executableDLQWrapperParams) queues.ExecutableWrapper {
return executableDLQWrapper{
historyTaskQueueManager: params.HistoryTaskQueueManager,
clusterName: params.ClusterMetadata.GetCurrentClusterName(),
timeSource: params.TimeSource,
useDLQ: params.Config.TaskDLQEnabled,
}
}

func (d executableDLQWrapper) Wrap(e queues.Executable) queues.Executable {
executableDLQ := queues.NewExecutableDLQ(
e,
d.historyTaskQueueManager,
d.timeSource,
d.clusterName,
)
return &dlqToggle{
Executable: e,
executableDLQ: executableDLQ,
useDLQ: d.useDLQ,
}
}

func (t *dlqToggle) Execute() error {
if t.useDLQ() {
return t.executableDLQ.Execute()
}
return t.Executable.Execute()
}
134 changes: 134 additions & 0 deletions service/history/dlq_test.go
@@ -0,0 +1,134 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/service/history/configs"
"go.uber.org/fx"
"go.uber.org/fx/fxtest"

"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/service/history"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/queues/queuestest"
)

type (
testHistoryTaskQueueManager struct {
persistence.HistoryTaskQueueManager
requests []*persistence.EnqueueTaskRequest
}
fakeMetadata struct {
cluster.Metadata
}
)

var errTerminal = new(serialization.DeserializationError)

func TestNewExecutableDLQWrapper(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
enableDLQ bool
}{
{
name: "DLQ enabled",
enableDLQ: true,
},
{
name: "DLQ disabled",
enableDLQ: false,
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

tqm := &testHistoryTaskQueueManager{}
var w queues.ExecutableWrapper
fxtest.New(
t,
fx.Provide(
func() persistence.HistoryTaskQueueManager {
return tqm
},
func() cluster.Metadata {
return fakeMetadata{}
},
func() *configs.Config {
dc := dynamicconfig.NewCollection(dynamicconfig.StaticClient(map[dynamicconfig.Key]interface{}{
dynamicconfig.HistoryTaskDLQEnabled: tc.enableDLQ,
}), log.NewTestLogger())
return configs.NewConfig(dc, 1, false, false)
},
func() clock.TimeSource {
return clock.NewEventTimeSource()
},
fx.Annotate(history.NewExecutableDLQWrapper, fx.As(new(queues.ExecutableWrapper))),
),
fx.Populate(&w),
)
executable := w.Wrap(queuestest.NewFakeExecutable(nil, errTerminal))

err := executable.Execute()
if tc.enableDLQ {
assert.ErrorIs(t, err, queues.ErrTerminalTaskFailure)
} else {
assert.NotErrorIs(t, err, queues.ErrTerminalTaskFailure)
assert.ErrorIs(t, err, errTerminal)
}
err = executable.Execute()
if tc.enableDLQ {
assert.NoError(t, err)
assert.Len(t, tqm.requests, 1)
} else {
assert.ErrorIs(t, err, errTerminal)
assert.Empty(t, tqm.requests)
}
})
}
}

func (f fakeMetadata) GetCurrentClusterName() string {
return "test-cluster-name"
}

func (t *testHistoryTaskQueueManager) EnqueueTask(
_ context.Context,
request *persistence.EnqueueTaskRequest,
) (*persistence.EnqueueTaskResponse, error) {
t.requests = append(t.requests, request)
return nil, nil
}
1 change: 1 addition & 0 deletions service/history/queue_factory_base.go
Expand Up @@ -93,6 +93,7 @@ type (

var QueueModule = fx.Options(
fx.Provide(QueueSchedulerRateLimiterProvider),
fx.Provide(NewExecutableDLQWrapper),
fx.Provide(
fx.Annotated{
Group: QueueFactoryFxGroup,
Expand Down
6 changes: 6 additions & 0 deletions service/history/queues/executable_factory.go
Expand Up @@ -44,6 +44,8 @@ type (
ExecutableWrapper interface {
Wrap(e Executable) Executable
}
// ExecutableWrapperFn is a convenience type to avoid having to create a struct that implements ExecutableWrapper.
ExecutableWrapperFn func(e Executable) Executable

executableFactoryImpl struct {
executor Executor
Expand Down Expand Up @@ -115,3 +117,7 @@ func (f *executableFactoryImpl) NewExecutable(task tasks.Task, readerID int64) E
func (f executableFactoryWrapper) NewExecutable(task tasks.Task, readerID int64) Executable {
return f.wrapper.Wrap(f.factory.NewExecutable(task, readerID))
}

func (f ExecutableWrapperFn) Wrap(e Executable) Executable {
return f(e)
}

0 comments on commit c8ccf1e

Please sign in to comment.