Skip to content

Commit

Permalink
Add ExecutableDLQObserver
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 29, 2023
1 parent b768733 commit 984fe66
Show file tree
Hide file tree
Showing 7 changed files with 506 additions and 46 deletions.
13 changes: 13 additions & 0 deletions common/metrics/metric_defs.go
Expand Up @@ -734,6 +734,19 @@ var (
"task_errors",
WithDescription("The number of unexpected history task processing errors."),
)
TaskTerminalFailures = NewCounterDef(
"task_terminal_failures",
WithDescription("The number of times a history task failed with a terminal failure, causing it to be sent to the DLQ."),
)
TaskDLQFailures = NewCounterDef(
"task_dlq_failures",
WithDescription("The number of times we failed to send a history task to the DLQ."),
)
TaskDLQSendLatency = NewTimerDef(
"task_dlq_latency",
WithDescription("The amount of time it took to successfully send a task to the DLQ. This only records the"+
" latency of the final attempt to send the task to the DLQ, not the cumulative latency of all attempts."),
)
TaskDiscarded = NewCounterDef("task_errors_discarded")
TaskSkipped = NewCounterDef("task_skipped")
TaskVersionMisMatch = NewCounterDef("task_errors_version_mismatch")
Expand Down
38 changes: 30 additions & 8 deletions service/history/dlq.go
Expand Up @@ -26,43 +26,57 @@ package history

import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/service/history/configs"
"go.uber.org/fx"

"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/queues"

"go.uber.org/fx"
)

type (
executableDLQWrapper struct {
historyTaskQueueManager persistence.HistoryTaskQueueManager
useDLQ dynamicconfig.BoolPropertyFn
numHistoryShards int
clusterName string
namespaceRegistry namespace.Registry
timeSource clock.TimeSource
useDLQ dynamicconfig.BoolPropertyFn
logger log.Logger
metricsHandler metrics.Handler
}
executableDLQWrapperParams struct {
fx.In

HistoryTaskQueueManager persistence.HistoryTaskQueueManager
ClusterMetadata cluster.Metadata
Config *configs.Config
ClusterMetadata cluster.Metadata
TimeSource clock.TimeSource
Logger log.Logger
NamespaceRegistry namespace.Registry
MetricsHandler metrics.Handler
}
dlqToggle struct {
queues.Executable
executableDLQ *queues.ExecutableDLQ
executableDLQ queues.Executable
useDLQ dynamicconfig.BoolPropertyFn
}
)

func NewExecutableDLQWrapper(params executableDLQWrapperParams) queues.ExecutableWrapper {
return executableDLQWrapper{
historyTaskQueueManager: params.HistoryTaskQueueManager,
useDLQ: params.Config.TaskDLQEnabled,
numHistoryShards: int(params.Config.NumberOfShards),
clusterName: params.ClusterMetadata.GetCurrentClusterName(),
namespaceRegistry: params.NamespaceRegistry,
timeSource: params.TimeSource,
useDLQ: params.Config.TaskDLQEnabled,
logger: params.Logger,
metricsHandler: params.MetricsHandler,
}
}

Expand All @@ -73,9 +87,17 @@ func (d executableDLQWrapper) Wrap(e queues.Executable) queues.Executable {
d.timeSource,
d.clusterName,
)
executableDLQObserver := queues.NewExecutableDLQObserver(
executableDLQ,
d.numHistoryShards,
d.namespaceRegistry,
d.timeSource,
d.logger,
d.metricsHandler,
)
return &dlqToggle{
Executable: e,
executableDLQ: executableDLQ,
executableDLQ: executableDLQObserver,
useDLQ: d.useDLQ,
}
}
Expand Down
36 changes: 27 additions & 9 deletions service/history/dlq_test.go
Expand Up @@ -28,20 +28,25 @@ import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/golang/mock/gomock"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/service/history/configs"
"go.uber.org/fx"
"go.uber.org/fx/fxtest"

"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/service/history"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/queues/queuestest"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"

"github.com/stretchr/testify/assert"
"go.uber.org/fx"
"go.uber.org/fx/fxtest"
)

type (
Expand Down Expand Up @@ -76,6 +81,7 @@ func TestNewExecutableDLQWrapper(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
tqm := &testHistoryTaskQueueManager{}
var w queues.ExecutableWrapper
fxtest.New(
Expand All @@ -84,23 +90,35 @@ func TestNewExecutableDLQWrapper(t *testing.T) {
func() persistence.HistoryTaskQueueManager {
return tqm
},
func() cluster.Metadata {
return fakeMetadata{}
},
func() *configs.Config {
dc := dynamicconfig.NewCollection(dynamicconfig.StaticClient(map[dynamicconfig.Key]interface{}{
dynamicconfig.HistoryTaskDLQEnabled: tc.enableDLQ,
}), log.NewTestLogger())
return configs.NewConfig(dc, 1, false, false)
},
func() cluster.Metadata {
return fakeMetadata{}
},
func() namespace.Registry {
registry := namespace.NewMockRegistry(ctrl)
registry.EXPECT().GetNamespaceByID(gomock.Any()).Return(tests.GlobalNamespaceEntry, nil).
AnyTimes()
return registry
},
func() clock.TimeSource {
return clock.NewEventTimeSource()
},
func() log.Logger {
return log.NewTestLogger()
},
func() metrics.Handler {
return metrics.NoopMetricsHandler
},
fx.Annotate(history.NewExecutableDLQWrapper, fx.As(new(queues.ExecutableWrapper))),
),
fx.Populate(&w),
)
executable := w.Wrap(queuestest.NewFakeExecutable(nil, errTerminal))
executable := w.Wrap(queuestest.NewFakeExecutable(&tasks.WorkflowTask{}, errTerminal))

err := executable.Execute()
if tc.enableDLQ {
Expand Down
110 changes: 110 additions & 0 deletions service/history/queues/executable_dlq_observer.go
@@ -0,0 +1,110 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues

import (
"errors"

"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/tasks"
)

// ExecutableDLQObserver records telemetry like metrics and logs for an ExecutableDLQ.
type ExecutableDLQObserver struct {
Executable
numHistoryShards int
namespaceRegistry namespace.Registry
timeSource clock.TimeSource
logger log.Logger
metricsHandler metrics.Handler

isDLQ bool
}

func NewExecutableDLQObserver(
executableDLQ Executable,
numHistoryShards int,
namespaceRegistry namespace.Registry,
timeSource clock.TimeSource,
logger log.Logger,
metricsHandler metrics.Handler,
) *ExecutableDLQObserver {
return &ExecutableDLQObserver{
Executable: executableDLQ,
numHistoryShards: numHistoryShards,
namespaceRegistry: namespaceRegistry,
timeSource: timeSource,
logger: logger,
metricsHandler: metricsHandler,
isDLQ: false,
}
}

func (o *ExecutableDLQObserver) Execute() error {
now := o.timeSource.Now()
err := o.Executable.Execute()
if errors.Is(err, ErrTerminalTaskFailure) {
o.getMetricsHandler().Counter(metrics.TaskTerminalFailures.GetMetricName()).Record(1)
logger := o.getLogger()
logger.Error("A terminal error occurred while processing this task", tag.Error(err))
o.isDLQ = true
} else if errors.Is(err, ErrSendTaskToDLQ) {
o.getMetricsHandler().Counter(metrics.TaskDLQFailures.GetMetricName()).Record(1)
logger := o.getLogger()
logger.Error("Failed to send history task to the DLQ", tag.Error(err))
} else if err == nil && o.isDLQ {
latency := o.timeSource.Now().Sub(now)
o.getMetricsHandler().Timer(metrics.TaskDLQSendLatency.GetMetricName()).Record(latency)
logger := o.getLogger()
logger.Info("Task sent to DLQ")
}
return err
}

func (o *ExecutableDLQObserver) getMetricsHandler() metrics.Handler {
handler := o.metricsHandler.WithTags(metrics.TaskTypeTag(o.GetTask().GetType().String()))
namespaceID := o.GetTask().GetNamespaceID()
ns, err := o.namespaceRegistry.GetNamespaceByID(namespace.ID(namespaceID))
if err != nil {
o.getLogger().Error("Unable to get namespace", tag.Error(err), tag.WorkflowNamespaceID(namespaceID))
return handler
}
return handler.WithTags(
metrics.NamespaceTag(string(ns.Name())),
)
}

func (o *ExecutableDLQObserver) getLogger() log.Logger {
task := o.Executable.GetTask()
tags := tasks.Tags(task)
shardID := tasks.GetShardIDForTask(task, o.numHistoryShards)
tags = append(tags, tag.ShardID(int32(shardID)))
logger := log.With(o.logger, tags...)
return logger
}

0 comments on commit 984fe66

Please sign in to comment.