Skip to content

Commit

Permalink
Guarantee history task execution (#2864)
Browse files Browse the repository at this point in the history
New Single cursor mode for transfer and timer processor
  • Loading branch information
yycptt committed May 20, 2022
1 parent 2758e29 commit a40214d
Show file tree
Hide file tree
Showing 40 changed files with 730 additions and 402 deletions.
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ const (
TimerTaskWorkerCount = "history.timerTaskWorkerCount"
// TimerTaskMaxRetryCount is max retry count for timer processor
TimerTaskMaxRetryCount = "history.timerTaskMaxRetryCount"
// TimerProcessorEnableSingleCursor indicates if only one timer processor should be started for both active and standby tasks
// NOTE: this is an experimental flag for guarantee task execution and will be deprecated after multicursor solution is ready
TimerProcessorEnableSingleCursor = "history.timerProcessorEnableSingleCursor"
// TimerProcessorEnablePriorityTaskScheduler indicates whether host level priority task scheduler should be used for timer processor
TimerProcessorEnablePriorityTaskScheduler = "history.timerProcessorEnablePriorityTaskScheduler"
// TimerProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for timer processor
Expand Down Expand Up @@ -365,6 +368,9 @@ const (
TransferTaskWorkerCount = "history.transferTaskWorkerCount"
// TransferTaskMaxRetryCount is max times of retry for transferQueueProcessor
TransferTaskMaxRetryCount = "history.transferTaskMaxRetryCount"
// TransferProcessorEnableSingleCursor indicates if only one transfer processor should be started for both active and standby tasks
// NOTE: this is an experimental flag for guarantee task execution and will be deprecated after multicursor solution is ready
TransferProcessorEnableSingleCursor = "history.transferProcessorEnableSingleCursor"
// TransferProcessorEnablePriorityTaskScheduler indicates whether host level priority task scheduler should be used for transferQueueProcessor
TransferProcessorEnablePriorityTaskScheduler = "history.transferProcessorEnablePriorityTaskScheduler"
// TransferProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for transferQueueProcessor
Expand Down
4 changes: 4 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Config struct {
TimerTaskBatchSize dynamicconfig.IntPropertyFn
TimerTaskWorkerCount dynamicconfig.IntPropertyFn
TimerTaskMaxRetryCount dynamicconfig.IntPropertyFn
TimerProcessorEnableSingleCursor dynamicconfig.BoolPropertyFn
TimerProcessorEnablePriorityTaskScheduler dynamicconfig.BoolPropertyFn
TimerProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn
TimerProcessorSchedulerQueueSize dynamicconfig.IntPropertyFn
Expand All @@ -110,6 +111,7 @@ type Config struct {
TransferTaskBatchSize dynamicconfig.IntPropertyFn
TransferTaskWorkerCount dynamicconfig.IntPropertyFn
TransferTaskMaxRetryCount dynamicconfig.IntPropertyFn
TransferProcessorEnableSingleCursor dynamicconfig.BoolPropertyFn
TransferProcessorEnablePriorityTaskScheduler dynamicconfig.BoolPropertyFn
TransferProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn
TransferProcessorSchedulerQueueSize dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -312,6 +314,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100),
TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10),
TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 100),
TimerProcessorEnableSingleCursor: dc.GetBoolProperty(dynamicconfig.TimerProcessorEnableSingleCursor, false),
TimerProcessorEnablePriorityTaskScheduler: dc.GetBoolProperty(dynamicconfig.TimerProcessorEnablePriorityTaskScheduler, false),
TimerProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TimerProcessorSchedulerWorkerCount, 200),
TimerProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TimerProcessorSchedulerQueueSize, 10000),
Expand All @@ -336,6 +339,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TransferTaskBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskBatchSize, 100),
TransferTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TransferTaskWorkerCount, 10),
TransferTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TransferTaskMaxRetryCount, 100),
TransferProcessorEnableSingleCursor: dc.GetBoolProperty(dynamicconfig.TransferProcessorEnableSingleCursor, false),
TransferProcessorEnablePriorityTaskScheduler: dc.GetBoolProperty(dynamicconfig.TransferProcessorEnablePriorityTaskScheduler, false),
TransferProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TransferProcessorSchedulerWorkerCount, 200),
TransferProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TransferProcessorSchedulerQueueSize, 10000),
Expand Down
3 changes: 3 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (e *historyEngineImpl) Start() {
// can't be retrieved before the processor is started. If failover callback is registered
// before queue processor is started, it may result in a deadline as to create the failover queue,
// queue processor need to be started.
//
// Ideally, when both timer and transfer queues enabled single cursor mode, we don't have to register
// the callback. However, currently namespace migration is relying on the callback to UpdateHandoverNamespaces
e.registerNamespaceFailoverCallback()
}

Expand Down
2 changes: 0 additions & 2 deletions service/history/nDCActivityReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"

"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -196,7 +195,6 @@ func (r *nDCActivityReplicatorImpl) SyncActivity(
// passive logic need to explicitly call create timer
now := eventTime
if _, err := workflow.NewTimerSequence(
clock.NewEventTimeSource().Update(now),
mutableState,
).CreateNextActivityTimer(); err != nil {
return err
Expand Down
19 changes: 19 additions & 0 deletions service/history/nDCStandbyTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package history

import (
"context"
"errors"
"time"

commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -253,3 +254,21 @@ func refreshTasks(
})
return err
}

func getRemoteClusterName(
currentCluster string,
registry namespace.Registry,
namespaceID string,
) (string, error) {
namespaceEntry, err := registry.GetNamespaceByID(namespace.ID(namespaceID))
if err != nil {
return "", err
}

remoteClusterName := namespaceEntry.ActiveClusterName()
if remoteClusterName == currentCluster {
// namespace has turned active, retry the task
return "", errors.New("namespace becomes active when processing task as standby")
}
return remoteClusterName, nil
}
12 changes: 12 additions & 0 deletions service/history/nDCTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,15 @@ func getTimerTaskEventIDAndRetryable(

return eventID, retryable
}

func getNamespaceTagByID(
registry namespace.Registry,
namespaceID string,
) metrics.Tag {
namespaceName, err := registry.GetNamespaceName(namespace.ID(namespaceID))
if err != nil {
return metrics.NamespaceUnknownTag()
}

return metrics.NamespaceTag(namespaceName.String())
}
4 changes: 2 additions & 2 deletions service/history/queueAckMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *queueAckMgrSuite) SetupTest() {
0,
s.logger,
func(task tasks.Task) queues.Executable {
return queues.NewExecutable(task, nil, nil, nil, nil, s.mockShard.GetTimeSource(), nil, nil, nil, queues.QueueTypeActiveTransfer, nil)
return queues.NewExecutable(task, nil, nil, nil, nil, s.mockShard.GetTimeSource(), nil, nil, queues.QueueTypeActiveTransfer, nil)
},
)
}
Expand Down Expand Up @@ -355,7 +355,7 @@ func (s *queueFailoverAckMgrSuite) SetupTest() {
0,
s.logger,
func(task tasks.Task) queues.Executable {
return queues.NewExecutable(task, nil, nil, nil, nil, s.mockShard.GetTimeSource(), nil, nil, nil, queues.QueueTypeActiveTransfer, nil)
return queues.NewExecutable(task, nil, nil, nil, nil, s.mockShard.GetTimeSource(), nil, nil, queues.QueueTypeActiveTransfer, nil)
},
)
}
Expand Down
10 changes: 6 additions & 4 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type (
}

Executor interface {
Execute(context.Context, Executable) error
Execute(context.Context, Executable) (metrics.Scope, error)
}

// TaskFilter determines if the given task should be executed
Expand Down Expand Up @@ -117,7 +117,6 @@ func NewExecutable(
rescheduler Rescheduler,
timeSource clock.TimeSource,
logger log.Logger,
scope metrics.Scope,
criticalRetryAttempt dynamicconfig.IntPropertyFn,
queueType QueueType,
namespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn,
Expand All @@ -137,7 +136,7 @@ func NewExecutable(
return tasks.Tags(task)
},
),
scope: scope,
scope: metrics.NoopScope,
queueType: queueType,
criticalRetryAttempt: criticalRetryAttempt,
filter: filter,
Expand All @@ -155,7 +154,10 @@ func (e *executableImpl) Execute() error {

ctx := metrics.AddMetricsContext(context.Background())
startTime := e.timeSource.Now()
err := e.executor.Execute(ctx, e)

var err error
e.scope, err = e.executor.Execute(ctx, e)

var userLatency time.Duration
if duration, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency); ok {
userLatency = time.Duration(duration)
Expand Down
8 changes: 5 additions & 3 deletions service/history/queues/executable_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ func (s *executableSuite) TestExecute_TaskExecuted() {
return true
})

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(errors.New("some random error"))
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(metrics.NoopScope, errors.New("some random error"))
s.Error(executable.Execute())

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(nil)
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(metrics.NoopScope, nil)
s.NoError(executable.Execute())
}

Expand All @@ -112,7 +112,7 @@ func (s *executableSuite) TestExecute_UserLatency() {
metrics.ContextCounterAdd(ctx, metrics.HistoryWorkflowExecutionCacheLatency, expectedUserLatency)
}

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(updateContext).Return(nil)
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(updateContext).Return(metrics.NoopScope, nil)
s.NoError(executable.Execute())
s.Equal(time.Duration(expectedUserLatency), executable.(*executableImpl).userLatency)
}
Expand Down Expand Up @@ -219,7 +219,6 @@ func (s *executableSuite) newTestExecutable(
s.mockRescheduler,
s.timeSource,
log.NewTestLogger(),
metrics.NoopScope,
dynamicconfig.GetIntPropertyFn(100),
QueueTypeActiveTransfer,
dynamicconfig.GetDurationPropertyFn(namespaceCacheRefreshInterval),
Expand Down
92 changes: 92 additions & 0 deletions service/history/queues/executor_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues

import (
"context"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
)

type (
executorWrapper struct {
currentClusterName string
registry namespace.Registry
activeExecutor Executor
standbyExecutor Executor
logger log.Logger
}
)

func NewExecutorWrapper(
currentClusterName string,
registry namespace.Registry,
activeExecutor Executor,
standbyExecutor Executor,
logger log.Logger,
) Executor {
return &executorWrapper{
currentClusterName: currentClusterName,
registry: registry,
activeExecutor: activeExecutor,
standbyExecutor: standbyExecutor,
logger: logger,
}
}

func (e *executorWrapper) Execute(
ctx context.Context,
executable Executable,
) (metrics.Scope, error) {
if e.isActiveTask(executable) {
return e.activeExecutor.Execute(ctx, executable)
}

return e.standbyExecutor.Execute(ctx, executable)
}

func (e *executorWrapper) isActiveTask(
executable Executable,
) bool {
// Following is the existing task allocator logic for verifying active task

namespaceID := executable.GetNamespaceID()
entry, err := e.registry.GetNamespaceByID(namespace.ID(namespaceID))
if err != nil {
e.logger.Warn("Unable to find namespace, process task as active.", tag.WorkflowNamespaceID(namespaceID), tag.Value(executable.GetTask()))
return true
}

if !entry.ActiveInCluster(e.currentClusterName) {
e.logger.Debug("Process task as standby.", tag.WorkflowNamespaceID(namespaceID), tag.Value(executable.GetTask()))
return false
}

e.logger.Debug("Process task as active.", tag.WorkflowNamespaceID(namespaceID), tag.Value(executable.GetTask()))
return true
}
11 changes: 9 additions & 2 deletions service/history/queues/priority_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,15 @@ func (a *priorityAssignerImpl) Assign(executable Executable) error {
// TODO: remove QueueType() and the special logic for assgining high priority to no-op tasks
// after merging active/standby queue processor or performing task filtering before submitting
// tasks to worker pool
taskActive := executable.QueueType() != QueueTypeStandbyTransfer &&
executable.QueueType() != QueueTypeStandbyTimer
var taskActive bool
switch executable.QueueType() {
case QueueTypeActiveTransfer, QueueTypeActiveTimer:
taskActive = true
case QueueTypeStandbyTransfer, QueueTypeStandbyTimer:
taskActive = false
default:
taskActive = namespaceActive
}

if !taskActive && !namespaceActive {
// standby tasks
Expand Down
6 changes: 6 additions & 0 deletions service/history/queues/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,14 @@ type (

const (
QueueTypeUnknown QueueType = iota
// QueueTypeTransfer is used by single cursor transfer queue, which
// processes both active and standby task
QueueTypeTransfer
QueueTypeActiveTransfer
QueueTypeStandbyTransfer
// QueueTypeTimer is used by single cursor timer queue, which
// processes both active and standby task
QueueTypeTimer
QueueTypeActiveTimer
QueueTypeStandbyTimer
QueueTypeVisibility
Expand Down

0 comments on commit a40214d

Please sign in to comment.