Skip to content

Commit

Permalink
Log NotFound error for transfer task processing (#2723)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Apr 14, 2022
1 parent 490d18f commit d541fc0
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 25 deletions.
4 changes: 4 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,10 @@ func TaskVersion(taskVersion int64) ZapTag {
return NewInt64("queue-task-version", taskVersion)
}

func TaskType(taskType enumsspb.TaskType) ZapTag {
return NewStringTag("queue-task-type", taskType.String())
}

// TaskVisibilityTimestamp returns tag for task visibilityTimestamp
func TaskVisibilityTimestamp(timestamp time.Time) ZapTag {
return NewTimeTag("queue-task-visibility-timestamp", timestamp)
Expand Down
81 changes: 63 additions & 18 deletions service/history/nDCTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"time"

"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -78,14 +79,22 @@ func loadMutableStateForTransferTask(
metricsClient metrics.Client,
logger log.Logger,
) (workflow.MutableState, error) {
return LoadMutableStateForTask(
logger = initializeLoggerForTask(transferTask, logger)
mutableState, err := LoadMutableStateForTask(
ctx,
wfContext,
transferTask,
getTransferTaskEventIDAndRetryable,
metricsClient.Scope(metrics.TransferQueueProcessorScope),
logger,
)
if _, ok := err.(*serviceerror.NotFound); ok {
// NotFound error will be ignored by task error handling logic, so log it here
// for transfer tasks, mutable state should always be available
logger.Error("Transfer Task Processor: workflow mutable state not found, skip.")
}

return mutableState, err
}

// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
Expand All @@ -97,6 +106,7 @@ func loadMutableStateForTimerTask(
metricsClient metrics.Client,
logger log.Logger,
) (workflow.MutableState, error) {
logger = initializeLoggerForTask(timerTask, logger)
return LoadMutableStateForTask(
ctx,
wfContext,
Expand Down Expand Up @@ -140,10 +150,6 @@ func LoadMutableStateForTask(
if eventID >= mutableState.GetNextEventID() {
scope.IncCounter(metrics.TaskSkipped)
logger.Info("Task Processor: task event ID >= MS NextEventID, skip.",
tag.WorkflowNamespaceID(task.GetNamespaceID()),
tag.WorkflowID(task.GetWorkflowID()),
tag.WorkflowRunID(task.GetRunID()),
tag.WorkflowEventID(eventID),
tag.WorkflowNextEventID(mutableState.GetNextEventID()),
)
return nil, nil
Expand All @@ -152,33 +158,47 @@ func LoadMutableStateForTask(
}

func initializeLoggerForTask(
shardID int32,
task tasks.Task,
logger log.Logger,
) log.Logger {
var taskEventID func(task tasks.Task) int64
taskCategory := task.GetCategory()
switch taskCategory.ID() {
case tasks.CategoryIDTransfer:
taskEventID = getTransferTaskEventID
case tasks.CategoryIDTimer:
taskEventID = getTimerTaskEventID
case tasks.CategoryIDVisibility:
// visibility tasks don't have task eventID
taskEventID = func(task tasks.Task) int64 { return 0 }
default:
// replication task won't reach here
panic(serviceerror.NewInternal("unknown task category"))
}

taskLogger := log.With(
logger,
tag.ShardID(shardID),
tag.WorkflowNamespaceID(task.GetNamespaceID()),
tag.WorkflowID(task.GetWorkflowID()),
tag.WorkflowRunID(task.GetRunID()),
tag.TaskID(task.GetTaskID()),
tag.TaskVisibilityTimestamp(task.GetVisibilityTime()),
tag.TaskType(task.GetType()),
tag.Task(task),
tag.WorkflowEventID(taskEventID(task)),
)
return taskLogger
}

func getTransferTaskEventIDAndRetryable(
func getTransferTaskEventID(
transferTask tasks.Task,
executionInfo *persistencespb.WorkflowExecutionInfo,
) (int64, bool) {
) int64 {
eventID := int64(0)
retryable := true

switch task := transferTask.(type) {
case *tasks.ActivityTask:
eventID = task.ScheduleID
case *tasks.WorkflowTask:
eventID = task.ScheduleID
retryable = !(executionInfo.WorkflowTaskScheduleId == task.ScheduleID && executionInfo.WorkflowTaskAttempt > 1)
case *tasks.CloseExecutionTask:
eventID = common.FirstEventID
case *tasks.DeleteExecutionTask:
Expand All @@ -194,24 +214,35 @@ func getTransferTaskEventIDAndRetryable(
default:
panic(errUnknownTransferTask)
}
return eventID, retryable
return eventID
}

func getTimerTaskEventIDAndRetryable(
timerTask tasks.Task,
func getTransferTaskEventIDAndRetryable(
transferTask tasks.Task,
executionInfo *persistencespb.WorkflowExecutionInfo,
) (int64, bool) {
eventID := int64(0)
eventID := getTransferTaskEventID(transferTask)
retryable := true

if task, ok := transferTask.(*tasks.WorkflowTask); ok {
retryable = !(executionInfo.WorkflowTaskScheduleId == task.ScheduleID && executionInfo.WorkflowTaskAttempt > 1)
}

return eventID, retryable
}

func getTimerTaskEventID(
timerTask tasks.Task,
) int64 {
eventID := int64(0)

switch task := timerTask.(type) {
case *tasks.UserTimerTask:
eventID = task.EventID
case *tasks.ActivityTimeoutTask:
eventID = task.EventID
case *tasks.WorkflowTaskTimeoutTask:
eventID = task.EventID
retryable = !(executionInfo.WorkflowTaskScheduleId == task.EventID && executionInfo.WorkflowTaskAttempt > 1)
case *tasks.WorkflowBackoffTimerTask:
eventID = common.FirstEventID
case *tasks.ActivityRetryTimerTask:
Expand All @@ -223,5 +254,19 @@ func getTimerTaskEventIDAndRetryable(
default:
panic(errUnknownTimerTask)
}
return eventID
}

func getTimerTaskEventIDAndRetryable(
timerTask tasks.Task,
executionInfo *persistencespb.WorkflowExecutionInfo,
) (int64, bool) {
eventID := getTimerTaskEventID(timerTask)
retryable := true

if task, ok := timerTask.(*tasks.WorkflowTaskTimeoutTask); ok {
retryable = !(executionInfo.WorkflowTaskScheduleId == task.EventID && executionInfo.WorkflowTaskAttempt > 1)
}

return eventID, retryable
}
2 changes: 1 addition & 1 deletion service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (p *queueProcessorBase) submitTask(
NewTaskInfo(
p.processor,
taskInfo,
initializeLoggerForTask(p.shard.GetShardID(), taskInfo, p.logger),
initializeLoggerForTask(taskInfo, p.logger),
),
)
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (t *timerQueueProcessorBase) submitTask(
NewTaskInfo(
t.timerProcessor,
taskInfo,
initializeLoggerForTask(t.shard.GetShardID(), taskInfo, t.logger),
initializeLoggerForTask(taskInfo, t.logger),
),
)
}
Expand Down
13 changes: 13 additions & 0 deletions service/history/transferQueueTaskExecutorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"

"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/searchattribute"
Expand Down Expand Up @@ -121,6 +123,11 @@ func (t *transferQueueTaskExecutorBase) pushActivity(
ScheduleId: task.ScheduleID,
ScheduleToStartTimeout: activityScheduleToStartTimeout,
})
if _, ok := err.(*serviceerror.NotFound); ok {
// NotFound error is not expected for AddTasks calls
// but will be ignored by task error handling logic, so log it here
initializeLoggerForTask(task, t.logger).Error("Matching returned not found error for AddActivityTask", tag.Error(err))
}

return err
}
Expand All @@ -144,6 +151,12 @@ func (t *transferQueueTaskExecutorBase) pushWorkflowTask(
ScheduleId: task.ScheduleID,
ScheduleToStartTimeout: workflowTaskScheduleToStartTimeout,
})
if _, ok := err.(*serviceerror.NotFound); ok {
// NotFound error is not expected for AddTasks calls
// but will be ignored by task error handling logic, so log it here
initializeLoggerForTask(task, t.logger).Error("Matching returned not found error for AddWorkflowTask", tag.Error(err))
}

return err
}

Expand Down
34 changes: 29 additions & 5 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,21 @@ pollLoop:
resp, err := e.recordWorkflowTaskStarted(hCtx.Context, request, task)
if err != nil {
switch err.(type) {
case *serviceerror.NotFound, *serviceerrors.TaskAlreadyStarted:
e.logger.Debug(fmt.Sprintf("Duplicated workflow task taskQueue=%v, taskID=%v",
taskQueueName, task.event.GetTaskId()))
case *serviceerror.NotFound: // mutable state not found, workflow not running or workflow task not found
e.logger.Info("Workflow task not found",
tag.WorkflowTaskQueueName(taskQueueName),
tag.WorkflowNamespaceID(task.event.Data.GetNamespaceId()),
tag.WorkflowID(task.event.Data.GetWorkflowId()),
tag.WorkflowRunID(task.event.Data.GetRunId()),
tag.WorkflowTaskQueueName(taskQueueName),
tag.TaskID(task.event.GetTaskId()),
tag.TaskVisibilityTimestamp(timestamp.TimeValue(task.event.Data.GetCreateTime())),
tag.WorkflowEventID(task.event.Data.GetScheduleId()),
tag.Error(err),
)
task.finish(nil)
case *serviceerrors.TaskAlreadyStarted:
e.logger.Debug("Duplicated workflow task", tag.WorkflowTaskQueueName(taskQueueName), tag.TaskID(task.event.GetTaskId()))
task.finish(nil)
default:
task.finish(err)
Expand Down Expand Up @@ -480,8 +492,20 @@ pollLoop:
resp, err := e.recordActivityTaskStarted(hCtx.Context, request, task)
if err != nil {
switch err.(type) {
case *serviceerror.NotFound, *serviceerrors.TaskAlreadyStarted:
e.logger.Debug("Duplicated activity task", tag.Name(taskQueueName), tag.TaskID(task.event.GetTaskId()))
case *serviceerror.NotFound: // mutable state not found, workflow not running or activity info not found
e.logger.Info("Activity task not found",
tag.WorkflowNamespaceID(task.event.Data.GetNamespaceId()),
tag.WorkflowID(task.event.Data.GetWorkflowId()),
tag.WorkflowRunID(task.event.Data.GetRunId()),
tag.WorkflowTaskQueueName(taskQueueName),
tag.TaskID(task.event.GetTaskId()),
tag.TaskVisibilityTimestamp(timestamp.TimeValue(task.event.Data.GetCreateTime())),
tag.WorkflowEventID(task.event.Data.GetScheduleId()),
tag.Error(err),
)
task.finish(nil)
case *serviceerrors.TaskAlreadyStarted:
e.logger.Debug("Duplicated activity task", tag.WorkflowTaskQueueName(taskQueueName), tag.TaskID(task.event.GetTaskId()))
task.finish(nil)
default:
task.finish(err)
Expand Down

0 comments on commit d541fc0

Please sign in to comment.