Skip to content

Commit

Permalink
In memory timeout timer queue for speculative workflow task (#4254)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 9, 2023
1 parent 8bc74cb commit d0007b0
Show file tree
Hide file tree
Showing 31 changed files with 1,279 additions and 132 deletions.
87 changes: 46 additions & 41 deletions api/enums/v1/task.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ const (
// RetentionTimerJitterDuration is a time duration jitter to distribute timer from T0 to T0 + jitter duration
RetentionTimerJitterDuration = "history.retentionTimerJitterDuration"

// MemoryTimerProcessorSchedulerWorkerCount is the number of workers in the task scheduler for in memory timer processor.
MemoryTimerProcessorSchedulerWorkerCount = "history.memoryTimerProcessorSchedulerWorkerCount"

// TransferTaskBatchSize is batch size for transferQueueProcessor
TransferTaskBatchSize = "history.transferTaskBatchSize"
// TransferProcessorFailoverMaxPollRPS is max poll rate per second for transferQueueProcessor
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ var (
ComponentVisibilityQueue = component("visibility-queue-processor")
ComponentArchivalQueue = component("archival-queue-processor")
ComponentTimerQueue = component("timer-queue-processor")
ComponentMemoryScheduledQueue = component("memory-scheduled-queue-processor")
ComponentTimerBuilder = component("timer-builder")
ComponentReplicatorQueue = component("replicator-queue-processor")
ComponentShardController = component("shard-controller")
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,8 @@ const (
OperationVisibilityQueueProcessorScope = "VisibilityQueueProcessor"
// OperationArchivalQueueProcessorScope is a scope for archival queue processor
OperationArchivalQueueProcessorScope = "ArchivalQueueProcessor"
// OperationMemoryScheduledQueueProcessorScope is a scope for memory scheduled queue processor.
OperationMemoryScheduledQueueProcessorScope = "MemoryScheduledQueueProcessor"
)

// Matching Scope
Expand Down Expand Up @@ -1196,6 +1198,7 @@ const (
TaskTypeTimerStandbyTaskActivityRetryTimer = "TimerStandbyTaskActivityRetryTimer"
TaskTypeTimerStandbyTaskWorkflowBackoffTimer = "TimerStandbyTaskWorkflowBackoffTimer"
TaskTypeTimerStandbyTaskDeleteHistoryEvent = "TimerStandbyTaskDeleteHistoryEvent"
TaskTypeMemoryScheduledTaskWorkflowTaskTimeout = "MemoryScheduledTaskWorkflowTaskTimeout"
)

// Schedule action types
Expand Down
2 changes: 2 additions & 0 deletions proto/internal/temporal/server/api/enums/v1/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ enum TaskCategory {
TASK_CATEGORY_VISIBILITY = 4;
// Archival is the task type for workflow archival tasks.
TASK_CATEGORY_ARCHIVAL = 5;
// Memory timer is the task type for in memory timer task. Currently used for speculative workflow task timeouts only.
TASK_CATEGORY_MEMORY_TIMER = 6;
}

enum TaskType {
Expand Down
13 changes: 6 additions & 7 deletions service/history/api/updateworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ func Invoke(

if createNewWorkflowTask {
// This will try not to add an event but will create speculative WT in mutable state.
// Task generation will be skipped if WT is created as speculative.
wt, err := ms.AddWorkflowTaskScheduledEvent(true, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE)
wt, err := ms.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -135,12 +134,12 @@ func addWorkflowTaskToMatching(
shardCtx shard.Context,
ms workflow.MutableState,
matchingClient matchingservice.MatchingServiceClient,
task *workflow.WorkflowTaskInfo,
wt *workflow.WorkflowTaskInfo,
nsID namespace.ID,
) error {
// TODO (alex-update): Timeout calculation is copied from somewhere else. Extract func instead?
// TODO (alex): Timeout calculation is copied from somewhere else. Extract func instead?
var taskScheduleToStartTimeout *time.Duration
if ms.TaskQueue().GetName() != task.TaskQueue.GetName() {
if ms.IsStickyTaskQueueEnabled() {
taskScheduleToStartTimeout = ms.GetExecutionInfo().StickyScheduleToStartTimeout
} else {
taskScheduleToStartTimeout = ms.GetExecutionInfo().WorkflowRunTimeout
Expand All @@ -158,8 +157,8 @@ func addWorkflowTaskToMatching(
WorkflowId: wfKey.WorkflowID,
RunId: wfKey.RunID,
},
TaskQueue: task.TaskQueue,
ScheduledEventId: task.ScheduledEventID,
TaskQueue: wt.TaskQueue,
ScheduledEventId: wt.ScheduledEventID,
ScheduleToStartTimeout: taskScheduleToStartTimeout,
Clock: clock,
})
Expand Down
4 changes: 4 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ type Config struct {
TimerProcessorArchivalTimeLimit dynamicconfig.DurationPropertyFn
RetentionTimerJitterDuration dynamicconfig.DurationPropertyFn

MemoryTimerProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn

// TransferQueueProcessor settings
TransferTaskHighPriorityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
TransferTaskBatchSize dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -374,6 +376,8 @@ func NewConfig(
TimerProcessorArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.TimerProcessorArchivalTimeLimit, 1*time.Second),
RetentionTimerJitterDuration: dc.GetDurationProperty(dynamicconfig.RetentionTimerJitterDuration, 30*time.Minute),

MemoryTimerProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.MemoryTimerProcessorSchedulerWorkerCount, 64),

TransferTaskBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskBatchSize, 100),
TransferProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TransferProcessorSchedulerWorkerCount, 512),
TransferProcessorSchedulerActiveRoundRobinWeights: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.TransferProcessorSchedulerActiveRoundRobinWeights, ConvertWeightsToDynamicConfigValue(DefaultActiveTaskPriorityWeight)),
Expand Down
4 changes: 4 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,10 @@ func (e *historyEngineImpl) NotifyNewTasks(
}
}

func (e *historyEngineImpl) AddSpeculativeWorkflowTaskTimeoutTask(task *tasks.WorkflowTaskTimeoutTask) {
e.queueProcessors[tasks.CategoryMemoryTimer].NotifyNewTasks([]tasks.Task{task})
}

func (e *historyEngineImpl) GetReplicationMessages(
ctx context.Context,
pollingCluster string,
Expand Down
127 changes: 127 additions & 0 deletions service/history/memory_scheduled_queue_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"go.uber.org/fx"

"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
wcache "go.temporal.io/server/service/history/workflow/cache"
)

type (
memoryScheduledQueueFactoryParams struct {
fx.In

NamespaceRegistry namespace.Registry
ClusterMetadata cluster.Metadata
Config *configs.Config
TimeSource clock.TimeSource
MetricsHandler metrics.Handler
Logger log.SnTaggedLogger
}

memoryScheduledQueueFactory struct {
scheduler ctasks.Scheduler[ctasks.Task]
priorityAssigner queues.PriorityAssigner

namespaceRegistry namespace.Registry
clusterMetadata cluster.Metadata
timeSource clock.TimeSource
metricsHandler metrics.Handler
logger log.SnTaggedLogger
}
)

func NewMemoryScheduledQueueFactory(
params memoryScheduledQueueFactoryParams,
) QueueFactory {
logger := log.With(params.Logger, tag.ComponentMemoryScheduledQueue)
metricsHandler := params.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationMemoryScheduledQueueProcessorScope))

hostScheduler := ctasks.NewFIFOScheduler[ctasks.Task](
&ctasks.FIFOSchedulerOptions{
QueueSize: 0, // Don't buffer tasks in scheduler. If all workers are busy memoryScheduledQueue reschedules tasks into itself.
WorkerCount: params.Config.MemoryTimerProcessorSchedulerWorkerCount,
},
logger,
)

return &memoryScheduledQueueFactory{
scheduler: hostScheduler,
priorityAssigner: queues.NewPriorityAssigner(),
namespaceRegistry: params.NamespaceRegistry,
clusterMetadata: params.ClusterMetadata,
timeSource: params.TimeSource,
metricsHandler: metricsHandler,
logger: logger,
}
}

func (f *memoryScheduledQueueFactory) Start() {
f.scheduler.Start()
}

func (f *memoryScheduledQueueFactory) Stop() {
f.scheduler.Stop()
}

func (f *memoryScheduledQueueFactory) CreateQueue(
shardCtx shard.Context,
workflowCache wcache.Cache,
) queues.Queue {

// Reuse TimerQueueActiveTaskExecutor only to executeWorkflowTaskTimeoutTask.
// Unused dependencies are nil.
speculativeWorkflowTaskTimeoutExecutor := newTimerQueueActiveTaskExecutor(
shardCtx,
workflowCache,
nil,
f.logger,
f.metricsHandler,
shardCtx.GetConfig(),
nil,
)

return queues.NewSpeculativeWorkflowTaskTimeoutQueue(
f.scheduler,
f.priorityAssigner,
speculativeWorkflowTaskTimeoutExecutor,
f.namespaceRegistry,
f.clusterMetadata,
f.timeSource,
f.metricsHandler,
f.logger,
)
}
3 changes: 2 additions & 1 deletion service/history/nDCTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func getTimerTaskEventIDAndRetryable(
retryable := true

if task, ok := timerTask.(*tasks.WorkflowTaskTimeoutTask); ok {
retryable = !(executionInfo.WorkflowTaskScheduledEventId == task.EventID && executionInfo.WorkflowTaskAttempt > 1)
retryable = !(executionInfo.WorkflowTaskScheduledEventId == task.EventID && executionInfo.WorkflowTaskAttempt > 1) &&
executionInfo.WorkflowTaskType != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE
}

return eventID, retryable
Expand Down
4 changes: 4 additions & 0 deletions service/history/queueFactoryBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ var QueueModule = fx.Options(
Group: QueueFactoryFxGroup,
Target: NewVisibilityQueueFactory,
},
fx.Annotated{
Group: QueueFactoryFxGroup,
Target: NewMemoryScheduledQueueFactory,
},
getOptionalQueueFactories,
),
fx.Invoke(QueueFactoryLifetimeHooks),
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (e *executableImpl) estimateTaskMetricTag() []metrics.Tag {
isActive = namespace.ActiveInCluster(e.clusterMetadata.GetCurrentClusterName())
}

taskType := getTaskTypeTagValue(e.Task, isActive)
taskType := getTaskTypeTagValue(e, isActive)
return []metrics.Tag{
namespaceTag,
metrics.TaskTypeTag(taskType),
Expand Down
Loading

0 comments on commit d0007b0

Please sign in to comment.