Skip to content

Commit

Permalink
Refactor task queue related methods of mutable state (#4325)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored May 12, 2023
1 parent a343bea commit 62611e2
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 177 deletions.
2 changes: 1 addition & 1 deletion service/history/api/get_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func MutableStateToGetResponse(
CurrentBranchToken: currentBranchToken,
WorkflowState: workflowState,
WorkflowStatus: workflowStatus,
IsStickyTaskQueueEnabled: mutableState.IsStickyTaskQueueEnabled(),
IsStickyTaskQueueEnabled: mutableState.IsStickyTaskQueueSet(),
VersionHistories: versionhistory.CopyVersionHistories(
mutableState.GetExecutionInfo().GetVersionHistories(),
),
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/resetstickytaskqueue/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Invoke(
return nil, consts.ErrWorkflowCompleted
}

mutableState.ClearStickyness()
mutableState.ClearStickyTaskQueue()
return &api.UpdateWorkflowAction{
Noop: true,
CreateWorkflowTask: false,
Expand Down
13 changes: 3 additions & 10 deletions service/history/api/updateworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ package updateworkflow
import (
"context"
"fmt"
"time"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/api/workflowservice/v1"

enumspb "go.temporal.io/api/enums/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
Expand Down Expand Up @@ -169,14 +169,7 @@ func addWorkflowTaskToMatching(
wt *workflow.WorkflowTaskInfo,
nsID namespace.ID,
) error {
// TODO (alex): Timeout calculation is copied from somewhere else. Extract func instead?
var taskScheduleToStartTimeout *time.Duration
if ms.IsStickyTaskQueueEnabled() {
taskScheduleToStartTimeout = ms.GetExecutionInfo().StickyScheduleToStartTimeout
} else {
taskScheduleToStartTimeout = ms.GetExecutionInfo().WorkflowRunTimeout
}

_, scheduleToStartTimeout := ms.TaskQueueScheduleToStartTimeout(wt.TaskQueue.Name)
wfKey := ms.GetWorkflowKey()
clock, err := shardCtx.NewVectorClock()
if err != nil {
Expand All @@ -191,7 +184,7 @@ func addWorkflowTaskToMatching(
},
TaskQueue: wt.TaskQueue,
ScheduledEventId: wt.ScheduledEventID,
ScheduleToStartTimeout: taskScheduleToStartTimeout,
ScheduleToStartTimeout: scheduleToStartTimeout,
Clock: clock,
})
if err != nil {
Expand Down
58 changes: 19 additions & 39 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package history
import (
"context"
"fmt"
"time"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -207,73 +206,54 @@ func (t *transferQueueActiveTaskExecutor) processActivityTask(

func (t *transferQueueActiveTaskExecutor) processWorkflowTask(
ctx context.Context,
task *tasks.WorkflowTask,
transferTask *tasks.WorkflowTask,
) (retError error) {
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, task)
weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, transferTask)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTransferTask(ctx, weContext, task, t.metricHandler, t.logger)
mutableState, err := loadMutableStateForTransferTask(ctx, weContext, transferTask, t.metricHandler, t.logger)
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
}

workflowTask := mutableState.GetWorkflowTaskByID(task.ScheduledEventID)
workflowTask := mutableState.GetWorkflowTaskByID(transferTask.ScheduledEventID)
if workflowTask == nil {
return nil
}
err = CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), workflowTask.Version, task.Version, task)
err = CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), workflowTask.Version, transferTask.Version, transferTask)
if err != nil {
return err
}

executionInfo := mutableState.GetExecutionInfo()
// Task queue from transfer task (not current one from mutable state) must be used here.
// If current task queue becomes sticky since this transfer task was created,
// it can't be used here, because timeout timer was not created for it,
// because it used to be non-sticky when this transfer task was created .
taskQueue, scheduleToStartTimeout := mutableState.TaskQueueScheduleToStartTimeout(transferTask.TaskQueue)

// NOTE: previously this section check whether mutable state has enabled
// sticky workflowTask, if so convert the workflowTask to a sticky workflowTask.
// that logic has a bug which timer task for that sticky workflowTask is not generated
// the correct logic should check whether the workflow task is a sticky workflowTask
// task or not.
var taskQueue *taskqueuepb.TaskQueue
var taskScheduleToStartTimeout *time.Duration
if mutableState.GetExecutionInfo().TaskQueue != task.TaskQueue {
// this workflowTask is an sticky workflowTask
// there shall already be an timer set
taskQueue = &taskqueuepb.TaskQueue{
Name: task.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
}
taskScheduleToStartTimeout = executionInfo.StickyScheduleToStartTimeout
} else {
taskQueue = &taskqueuepb.TaskQueue{
Name: task.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
workflowRunTimeout := executionInfo.WorkflowRunTimeout
taskScheduleToStartTimeout = workflowRunTimeout
}
normalTaskQueueName := mutableState.GetExecutionInfo().TaskQueue

originalTaskQueue := mutableState.GetExecutionInfo().TaskQueue
// NOTE: do not access anything related mutable state after this lock release
// release the context lock since we no longer need mutable state and
// the rest of logic is making RPC call, which takes time.
// NOTE: Do not access mutableState after this lock is released.
// It is important to release the workflow lock here, because pushWorkflowTask will call matching,
// which will call history back (with RecordWorkflowTaskStarted), and it will try to get workflow lock again.
release(nil)

err = t.pushWorkflowTask(ctx, task, taskQueue, taskScheduleToStartTimeout)
err = t.pushWorkflowTask(ctx, transferTask, taskQueue, scheduleToStartTimeout)

if _, ok := err.(*serviceerrors.StickyWorkerUnavailable); ok {
// sticky worker is unavailable, switch to original task queue
// sticky worker is unavailable, switch to original normal task queue
taskQueue = &taskqueuepb.TaskQueue{
// do not use task.TaskQueue which is sticky, use original task queue from mutable state
Name: originalTaskQueue,
// do not use task.TaskQueue which is sticky, use original normal task queue from mutable state
Name: normalTaskQueueName,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}

Expand All @@ -282,7 +262,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask(
// There is no need to reset sticky, because if this task is picked by new worker, the new worker will reset
// the sticky queue to a new one. However, if worker is completely down, that schedule_to_start timeout task
// will re-create a new non-sticky task and reset sticky.
err = t.pushWorkflowTask(ctx, task, taskQueue, taskScheduleToStartTimeout)
err = t.pushWorkflowTask(ctx, transferTask, taskQueue, scheduleToStartTimeout)
}
return err
}
Expand Down
23 changes: 9 additions & 14 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,17 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(
return nil, nil
}

executionInfo := mutableState.GetExecutionInfo()

_, scheduleToStartTimeout := mutableState.TaskQueueScheduleToStartTimeout(transferTask.TaskQueue)
// Task queue is ignored here because at standby, always use original normal task queue,
// disregards the transferTask.TaskQueue which could be sticky.
// NOTE: scheduleToStart timeout is respected. If workflow was sticky before namespace become standby,
// transferTask.TaskQueue is sticky, and there is timer already created for this timeout.
// Use this sticky timeout as TTL.
taskQueue := &taskqueuepb.TaskQueue{
// at standby, always use original task queue, disregards the task.TaskQueue which could be sticky
Name: mutableState.GetExecutionInfo().TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
workflowRunTimeout := executionInfo.WorkflowRunTimeout
taskScheduleToStartTimeout := workflowRunTimeout
if mutableState.GetExecutionInfo().TaskQueue != transferTask.TaskQueue {
// Experimental: try to push sticky task as regular task with sticky timeout as TTL.
// workflow might be sticky before namespace become standby
// there shall already be a schedule_to_start timer created
taskScheduleToStartTimeout = executionInfo.StickyScheduleToStartTimeout
}

err := CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), wtInfo.Version, transferTask.Version, transferTask)
if err != nil {
return nil, err
Expand All @@ -204,7 +200,7 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(
if wtInfo.StartedEventID == common.EmptyEventID {
return newWorkflowTaskPostActionInfo(
mutableState,
taskScheduleToStartTimeout,
scheduleToStartTimeout,
*taskQueue,
)
}
Expand Down Expand Up @@ -569,12 +565,11 @@ func (t *transferQueueStandbyTaskExecutor) pushWorkflowTask(
}

pushwtInfo := postActionInfo.(*workflowTaskPostActionInfo)
timeout := pushwtInfo.workflowTaskScheduleToStartTimeout
return t.transferQueueTaskExecutorBase.pushWorkflowTask(
ctx,
task.(*tasks.WorkflowTask),
&pushwtInfo.taskqueue,
timeout,
pushwtInfo.workflowTaskScheduleToStartTimeout,
)
}

Expand Down
10 changes: 7 additions & 3 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ type (
AddWorkflowExecutionUpdateAcceptedEvent(protocolInstanceID string, updAcceptance *updatepb.Acceptance) (*historypb.HistoryEvent, error)
AddWorkflowExecutionUpdateCompletedEvent(updResp *updatepb.Response) (*historypb.HistoryEvent, error)
RejectWorkflowExecutionUpdate(protocolInstanceID string, updRejection *updatepb.Rejection) error
ClearStickyness()
CheckResettable() error
CloneToProto() *persistencespb.WorkflowMutableState
RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error)
Expand Down Expand Up @@ -212,8 +211,13 @@ type (
IsCancelRequested() bool
IsCurrentWorkflowGuaranteed() bool
IsSignalRequested(requestID string) bool
IsStickyTaskQueueEnabled() bool
TaskQueue() *taskqueuepb.TaskQueue

CurrentTaskQueue() *taskqueuepb.TaskQueue
SetStickyTaskQueue(name string, scheduleToStartTimeout *time.Duration)
ClearStickyTaskQueue()
IsStickyTaskQueueSet() bool
TaskQueueScheduleToStartTimeout(name string) (*taskqueuepb.TaskQueue, *time.Duration)

IsWorkflowExecutionRunning() bool
IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool
IsWorkflowPendingOnWorkflowTaskBackoff() bool
Expand Down
53 changes: 37 additions & 16 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,21 +632,47 @@ func (ms *MutableStateImpl) GetNamespaceEntry() *namespace.Namespace {
return ms.namespaceEntry
}

func (ms *MutableStateImpl) IsStickyTaskQueueEnabled() bool {
func (ms *MutableStateImpl) CurrentTaskQueue() *taskqueuepb.TaskQueue {
if ms.IsStickyTaskQueueSet() {
return &taskqueuepb.TaskQueue{
Name: ms.executionInfo.StickyTaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
}
}
return &taskqueuepb.TaskQueue{
Name: ms.executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
}

func (ms *MutableStateImpl) SetStickyTaskQueue(name string, scheduleToStartTimeout *time.Duration) {
ms.executionInfo.StickyTaskQueue = name
ms.executionInfo.StickyScheduleToStartTimeout = scheduleToStartTimeout
}

func (ms *MutableStateImpl) ClearStickyTaskQueue() {
ms.executionInfo.StickyTaskQueue = ""
ms.executionInfo.StickyScheduleToStartTimeout = nil
}

func (ms *MutableStateImpl) IsStickyTaskQueueSet() bool {
return ms.executionInfo.StickyTaskQueue != ""
}

func (ms *MutableStateImpl) TaskQueue() *taskqueuepb.TaskQueue {
if ms.IsStickyTaskQueueEnabled() {
// TaskQueueScheduleToStartTimeout returns TaskQueue struct and corresponding StartToClose timeout.
// Task queue kind (sticky or normal) and timeout are set based on comparison of normal task queue name
// in mutable state and provided name.
func (ms *MutableStateImpl) TaskQueueScheduleToStartTimeout(name string) (*taskqueuepb.TaskQueue, *time.Duration) {
if ms.executionInfo.TaskQueue != name {
return &taskqueuepb.TaskQueue{
Name: ms.executionInfo.StickyTaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
}
}, ms.executionInfo.StickyScheduleToStartTimeout
}
return &taskqueuepb.TaskQueue{
Name: ms.executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
}, ms.executionInfo.WorkflowRunTimeout // No WT ScheduleToStart timeout for normal task queue.
}

func (ms *MutableStateImpl) GetWorkflowType() *commonpb.WorkflowType {
Expand Down Expand Up @@ -1409,11 +1435,6 @@ func (ms *MutableStateImpl) DeleteWorkflowTask() {
ms.workflowTaskManager.DeleteWorkflowTask()
}

func (ms *MutableStateImpl) ClearStickyness() {
ms.executionInfo.StickyTaskQueue = ""
ms.executionInfo.StickyScheduleToStartTimeout = timestamp.DurationFromSeconds(0)
}

// GetLastFirstEventIDTxnID returns last first event ID and corresponding transaction ID
// first event ID is the ID of a batch of events in a single history events record
func (ms *MutableStateImpl) GetLastFirstEventIDTxnID() (int64, int64) {
Expand Down Expand Up @@ -2537,7 +2558,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionCompletedEvent(
ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database
ms.executionInfo.NewExecutionRunId = event.GetWorkflowExecutionCompletedEventAttributes().GetNewExecutionRunId()
ms.executionInfo.CloseTime = event.GetEventTime()
ms.ClearStickyness()
ms.ClearStickyTaskQueue()
ms.writeEventToCache(event)
return nil
}
Expand Down Expand Up @@ -2582,7 +2603,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionFailedEvent(
ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database
ms.executionInfo.NewExecutionRunId = event.GetWorkflowExecutionFailedEventAttributes().GetNewExecutionRunId()
ms.executionInfo.CloseTime = event.GetEventTime()
ms.ClearStickyness()
ms.ClearStickyTaskQueue()
ms.writeEventToCache(event)
return nil
}
Expand Down Expand Up @@ -2626,7 +2647,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionTimedoutEvent(
ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database
ms.executionInfo.NewExecutionRunId = event.GetWorkflowExecutionTimedOutEventAttributes().GetNewExecutionRunId()
ms.executionInfo.CloseTime = event.GetEventTime()
ms.ClearStickyness()
ms.ClearStickyTaskQueue()
ms.writeEventToCache(event)
return nil
}
Expand Down Expand Up @@ -2706,7 +2727,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionCanceledEvent(
ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database
ms.executionInfo.NewExecutionRunId = ""
ms.executionInfo.CloseTime = event.GetEventTime()
ms.ClearStickyness()
ms.ClearStickyTaskQueue()
ms.writeEventToCache(event)
return nil
}
Expand Down Expand Up @@ -3326,7 +3347,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionTerminatedEvent(
ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database
ms.executionInfo.NewExecutionRunId = ""
ms.executionInfo.CloseTime = event.GetEventTime()
ms.ClearStickyness()
ms.ClearStickyTaskQueue()
ms.writeEventToCache(event)
return nil
}
Expand Down Expand Up @@ -3488,7 +3509,7 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionContinuedAsNewEvent(
ms.executionInfo.CompletionEventBatchId = firstEventID // Used when completion event needs to be loaded from database
ms.executionInfo.NewExecutionRunId = continueAsNewEvent.GetWorkflowExecutionContinuedAsNewEventAttributes().GetNewExecutionRunId()
ms.executionInfo.CloseTime = continueAsNewEvent.GetEventTime()
ms.ClearStickyness()
ms.ClearStickyTaskQueue()
ms.writeEventToCache(continueAsNewEvent)
return nil
}
Expand Down
Loading

0 comments on commit 62611e2

Please sign in to comment.