Skip to content

Commit

Permalink
Add a replication DLQ
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Oct 18, 2023
1 parent b1f5d5e commit 8a808b9
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 124 deletions.
49 changes: 24 additions & 25 deletions service/history/dlq.go
Expand Up @@ -31,7 +31,6 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/queues"

Expand All @@ -40,25 +39,25 @@ import (

type (
executableDLQWrapper struct {
historyTaskQueueManager persistence.HistoryTaskQueueManager
useDLQ dynamicconfig.BoolPropertyFn
numHistoryShards int
clusterName string
namespaceRegistry namespace.Registry
timeSource clock.TimeSource
logger log.Logger
metricsHandler metrics.Handler
dlqWriter *queues.DLQWriter
useDLQ dynamicconfig.BoolPropertyFn
numHistoryShards int
clusterName string
namespaceRegistry namespace.Registry
timeSource clock.TimeSource
logger log.Logger
metricsHandler metrics.Handler
}
executableDLQWrapperParams struct {
fx.In

HistoryTaskQueueManager persistence.HistoryTaskQueueManager
Config *configs.Config
ClusterMetadata cluster.Metadata
TimeSource clock.TimeSource
Logger log.Logger
NamespaceRegistry namespace.Registry
MetricsHandler metrics.Handler
DLQWriter *queues.DLQWriter
Config *configs.Config
ClusterMetadata cluster.Metadata
TimeSource clock.TimeSource
Logger log.Logger
NamespaceRegistry namespace.Registry
MetricsHandler metrics.Handler
}
dlqToggle struct {
queues.Executable
Expand All @@ -69,21 +68,21 @@ type (

func NewExecutableDLQWrapper(params executableDLQWrapperParams) queues.ExecutableWrapper {
return executableDLQWrapper{
historyTaskQueueManager: params.HistoryTaskQueueManager,
useDLQ: params.Config.TaskDLQEnabled,
numHistoryShards: int(params.Config.NumberOfShards),
clusterName: params.ClusterMetadata.GetCurrentClusterName(),
namespaceRegistry: params.NamespaceRegistry,
timeSource: params.TimeSource,
logger: params.Logger,
metricsHandler: params.MetricsHandler,
dlqWriter: params.DLQWriter,
useDLQ: params.Config.TaskDLQEnabled,
numHistoryShards: int(params.Config.NumberOfShards),
clusterName: params.ClusterMetadata.GetCurrentClusterName(),
namespaceRegistry: params.NamespaceRegistry,
timeSource: params.TimeSource,
logger: params.Logger,
metricsHandler: params.MetricsHandler,
}
}

func (d executableDLQWrapper) Wrap(e queues.Executable) queues.Executable {
executableDLQ := queues.NewExecutableDLQ(
e,
d.historyTaskQueueManager,
d.dlqWriter,
d.timeSource,
d.clusterName,
)
Expand Down
35 changes: 7 additions & 28 deletions service/history/dlq_test.go
Expand Up @@ -25,7 +25,6 @@
package history_test

import (
"context"
"testing"

"github.com/golang/mock/gomock"
Expand All @@ -39,7 +38,6 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/service/history"
"go.temporal.io/server/service/history/configs"
Expand All @@ -50,10 +48,6 @@ import (
)

type (
testHistoryTaskQueueManager struct {
persistence.HistoryTaskQueueManager
requests []*persistence.EnqueueTaskRequest
}
fakeMetadata struct {
cluster.Metadata
}
Expand All @@ -69,11 +63,11 @@ func TestNewExecutableDLQWrapper(t *testing.T) {
enableDLQ bool
}{
{
name: "DLQ enabled",
name: "DLQEnabled",
enableDLQ: true,
},
{
name: "DLQ disabled",
name: "DLQDisabled",
enableDLQ: false,
},
} {
Expand All @@ -82,13 +76,13 @@ func TestNewExecutableDLQWrapper(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
tqm := &testHistoryTaskQueueManager{}
fakeQueueWriter := &queuestest.FakeQueueWriter{}
var w queues.ExecutableWrapper
fxtest.New(
t,
fx.Provide(
func() persistence.HistoryTaskQueueManager {
return tqm
func() *queues.DLQWriter {
return queues.NewDLQWriter(fakeQueueWriter)
},
func() *configs.Config {
dc := dynamicconfig.NewCollection(dynamicconfig.StaticClient(map[dynamicconfig.Key]interface{}{
Expand Down Expand Up @@ -130,10 +124,10 @@ func TestNewExecutableDLQWrapper(t *testing.T) {
err = executable.Execute()
if tc.enableDLQ {
assert.NoError(t, err)
assert.Len(t, tqm.requests, 1)
assert.Len(t, fakeQueueWriter.EnqueueTaskRequests, 1)
} else {
assert.ErrorIs(t, err, errTerminal)
assert.Empty(t, tqm.requests)
assert.Empty(t, fakeQueueWriter.EnqueueTaskRequests)
}
})
}
Expand All @@ -142,18 +136,3 @@ func TestNewExecutableDLQWrapper(t *testing.T) {
func (f fakeMetadata) GetCurrentClusterName() string {
return "test-cluster-name"
}

func (t *testHistoryTaskQueueManager) EnqueueTask(
_ context.Context,
request *persistence.EnqueueTaskRequest,
) (*persistence.EnqueueTaskResponse, error) {
t.requests = append(t.requests, request)
return nil, nil
}

func (t *testHistoryTaskQueueManager) CreateQueue(
context.Context,
*persistence.CreateQueueRequest,
) (*persistence.CreateQueueResponse, error) {
return nil, nil
}
9 changes: 7 additions & 2 deletions service/history/queue_factory_base.go
Expand Up @@ -27,6 +27,7 @@ package history
import (
"context"

"go.temporal.io/server/common/persistence"
"go.uber.org/fx"

"go.temporal.io/server/common/clock"
Expand Down Expand Up @@ -91,9 +92,13 @@ type (
)

var QueueModule = fx.Options(
fx.Provide(QueueSchedulerRateLimiterProvider),
fx.Provide(NewExecutableDLQWrapper),
fx.Provide(
QueueSchedulerRateLimiterProvider,
func(tqm persistence.HistoryTaskQueueManager) queues.QueueWriter {
return tqm
},
queues.NewDLQWriter,
NewExecutableDLQWrapper,
fx.Annotated{
Group: QueueFactoryFxGroup,
Target: NewTransferQueueFactory,
Expand Down
97 changes: 97 additions & 0 deletions service/history/queues/dlq_writer.go
@@ -0,0 +1,97 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues

import (
"context"
"errors"
"fmt"

"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/tasks"
)

type (
// DLQWriter can be used to write tasks to the DLQ.
DLQWriter struct {
w QueueWriter
}
// QueueWriter is a subset of persistence.HistoryTaskQueueManager.
QueueWriter interface {
CreateQueue(
ctx context.Context,
request *persistence.CreateQueueRequest,
) (*persistence.CreateQueueResponse, error)
EnqueueTask(
ctx context.Context,
request *persistence.EnqueueTaskRequest,
) (*persistence.EnqueueTaskResponse, error)
}
EnqueueDLQTaskRequest struct {
SourceCluster string
TargetCluster string
Task tasks.Task
}
)

func NewDLQWriter(w QueueWriter) *DLQWriter {
return &DLQWriter{
w: w,
}
}

// EnqueueTask sends a task to the DLQ, creating the queue if it doesn't already exist.
func (q *DLQWriter) EnqueueTask(ctx context.Context, req *EnqueueDLQTaskRequest) error {
// This task experienced a terminal failure, so we should try to send it to the DLQ.
ctx = headers.SetCallerInfo(ctx, headers.SystemPreemptableCallerInfo)
// TODO: use clock.ContextWithTimeout here instead
ctx, cancel := context.WithTimeout(ctx, sendToDLQTimeout)
defer cancel()
queueKey := persistence.QueueKey{
QueueType: persistence.QueueTypeHistoryDLQ,
Category: req.Task.GetCategory(),
SourceCluster: req.SourceCluster,
TargetCluster: req.TargetCluster,
}
_, err := q.w.CreateQueue(ctx, &persistence.CreateQueueRequest{
QueueKey: queueKey,
})
if err != nil {
if !errors.Is(err, persistence.ErrQueueAlreadyExists) {
return fmt.Errorf("%w: %v", ErrCreateDLQ, err)
}
}
_, err = q.w.EnqueueTask(ctx, &persistence.EnqueueTaskRequest{
QueueType: queueKey.QueueType,
SourceCluster: queueKey.SourceCluster,
TargetCluster: queueKey.TargetCluster,
Task: req.Task,
})
if err != nil {
return fmt.Errorf("%w: %v", ErrSendTaskToDLQ, err)
}
return nil
}
47 changes: 5 additions & 42 deletions service/history/queues/executable_dlq.go
Expand Up @@ -31,8 +31,6 @@ import (
"time"

"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
)

Expand All @@ -48,24 +46,12 @@ type (
// TODO: add metrics and logging to this
ExecutableDLQ struct {
Executable
dlq DLQ
dlqWriter *DLQWriter
timeSource clock.TimeSource
// dlqCause is the original error which caused this task to be sent to the DLQ. It is only set once.
dlqCause error
clusterName string
}

// DLQ is a dead letter queue that can be used to enqueue tasks that fail to be processed.
DLQ interface {
CreateQueue(
ctx context.Context,
request *persistence.CreateQueueRequest,
) (*persistence.CreateQueueResponse, error)
EnqueueTask(
ctx context.Context,
request *persistence.EnqueueTaskRequest,
) (*persistence.EnqueueTaskResponse, error)
}
)

const (
Expand All @@ -80,10 +66,10 @@ var (
)

// NewExecutableDLQ wraps an Executable to ensure that it is sent to the DLQ if it fails terminally.
func NewExecutableDLQ(executable Executable, dlq DLQ, timeSource clock.TimeSource, clusterName string) *ExecutableDLQ {
func NewExecutableDLQ(executable Executable, dlq *DLQWriter, timeSource clock.TimeSource, clusterName string) *ExecutableDLQ {
return &ExecutableDLQ{
Executable: executable,
dlq: dlq,
dlqWriter: dlq,
timeSource: timeSource,
clusterName: clusterName,
}
Expand All @@ -103,32 +89,9 @@ func (d *ExecutableDLQ) Execute() error {
return fmt.Errorf("%w: %v", ErrTerminalTaskFailure, err)
}
// This task experienced a terminal failure, so we should try to send it to the DLQ.
ctx := headers.SetCallerInfo(context.Background(), headers.SystemPreemptableCallerInfo)
ctx, cancel := clock.ContextWithTimeout(ctx, sendToDLQTimeout, d.timeSource)
defer cancel()
task := d.GetTask()
queueKey := persistence.QueueKey{
QueueType: persistence.QueueTypeHistoryDLQ,
Category: task.GetCategory(),
return d.dlqWriter.EnqueueTask(context.Background(), &EnqueueDLQTaskRequest{
SourceCluster: d.clusterName,
TargetCluster: d.clusterName,
}
_, err := d.dlq.CreateQueue(ctx, &persistence.CreateQueueRequest{
QueueKey: queueKey,
Task: d.GetTask(),
})
if err != nil {
if !errors.Is(err, persistence.ErrQueueAlreadyExists) {
return fmt.Errorf("%w: %v", ErrCreateDLQ, err)
}
}
_, err = d.dlq.EnqueueTask(ctx, &persistence.EnqueueTaskRequest{
QueueType: queueKey.QueueType,
SourceCluster: queueKey.SourceCluster,
TargetCluster: queueKey.TargetCluster,
Task: task,
})
if err != nil {
return fmt.Errorf("%w: %v", ErrSendTaskToDLQ, err)
}
return nil
}
4 changes: 2 additions & 2 deletions service/history/queues/executable_dlq_observer_test.go
Expand Up @@ -111,7 +111,7 @@ func TestExecutableDLQObserver(t *testing.T) {
}
timeSource := clock.NewEventTimeSource()

dlq := &queuestest.FakeDLQ{}
dlq := &queuestest.FakeQueueWriter{}
logger := &testLogger{}
metricsHandler := metricstest.NewCaptureHandler()
capture := metricsHandler.StartCapture()
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestExecutableDLQObserver_GetNamespaceByIDErr(t *testing.T) {
}
timeSource := clock.NewEventTimeSource()

dlq := &queuestest.FakeDLQ{}
dlq := &queuestest.FakeQueueWriter{}
logger := &testLogger{}
metricsHandler := metricstest.NewCaptureHandler()
capture := metricsHandler.StartCapture()
Expand Down

0 comments on commit 8a808b9

Please sign in to comment.