Skip to content

Commit

Permalink
Remove unneeded duration/seconds conversion (#4321)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 12, 2023
1 parent 6e7c669 commit 40e7627
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 18 deletions.
4 changes: 2 additions & 2 deletions service/history/nDCStandbyTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type (
workflowTaskPostActionInfo struct {
*historyResendInfo

workflowTaskScheduleToStartTimeout int64
workflowTaskScheduleToStartTimeout *time.Duration
taskqueue taskqueuepb.TaskQueue
}

Expand Down Expand Up @@ -171,7 +171,7 @@ func newActivityRetryTimePostActionInfo(

func newWorkflowTaskPostActionInfo(
mutableState workflow.MutableState,
workflowTaskScheduleToStartTimeout int64,
workflowTaskScheduleToStartTimeout *time.Duration,
taskqueue taskqueuepb.TaskQueue,
) (*workflowTaskPostActionInfo, error) {
resendInfo, err := getHistoryResendInfo(mutableState)
Expand Down
12 changes: 6 additions & 6 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,22 +243,22 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask(
// the correct logic should check whether the workflow task is a sticky workflowTask
// task or not.
var taskQueue *taskqueuepb.TaskQueue
taskScheduleToStartTimeoutSeconds := int64(0)
var taskScheduleToStartTimeout *time.Duration
if mutableState.GetExecutionInfo().TaskQueue != task.TaskQueue {
// this workflowTask is an sticky workflowTask
// there shall already be an timer set
taskQueue = &taskqueuepb.TaskQueue{
Name: task.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
}
taskScheduleToStartTimeoutSeconds = int64(timestamp.DurationValue(executionInfo.StickyScheduleToStartTimeout).Seconds())
taskScheduleToStartTimeout = executionInfo.StickyScheduleToStartTimeout
} else {
taskQueue = &taskqueuepb.TaskQueue{
Name: task.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
workflowRunTimeout := timestamp.DurationValue(executionInfo.WorkflowRunTimeout)
taskScheduleToStartTimeoutSeconds = int64(workflowRunTimeout.Round(time.Second).Seconds())
workflowRunTimeout := executionInfo.WorkflowRunTimeout
taskScheduleToStartTimeout = workflowRunTimeout
}

originalTaskQueue := mutableState.GetExecutionInfo().TaskQueue
Expand All @@ -267,7 +267,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask(
// the rest of logic is making RPC call, which takes time.
release(nil)

err = t.pushWorkflowTask(ctx, task, taskQueue, timestamp.DurationFromSeconds(taskScheduleToStartTimeoutSeconds))
err = t.pushWorkflowTask(ctx, task, taskQueue, taskScheduleToStartTimeout)

if _, ok := err.(*serviceerrors.StickyWorkerUnavailable); ok {
// sticky worker is unavailable, switch to original task queue
Expand All @@ -282,7 +282,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask(
// There is no need to reset sticky, because if this task is picked by new worker, the new worker will reset
// the sticky queue to a new one. However, if worker is completely down, that schedule_to_start timeout task
// will re-create a new non-sticky task and reset sticky.
err = t.pushWorkflowTask(ctx, task, taskQueue, timestamp.DurationFromSeconds(taskScheduleToStartTimeoutSeconds))
err = t.pushWorkflowTask(ctx, task, taskQueue, taskScheduleToStartTimeout)
}
return err
}
Expand Down
6 changes: 3 additions & 3 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2638,10 +2638,10 @@ func (s *transferQueueActiveTaskExecutorSuite) createAddWorkflowTaskRequest(
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
executionInfo := mutableState.GetExecutionInfo()
timeout := timestamp.DurationValue(executionInfo.WorkflowRunTimeout)
timeout := executionInfo.WorkflowRunTimeout
if mutableState.GetExecutionInfo().TaskQueue != task.TaskQueue {
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
timeout = timestamp.DurationValue(executionInfo.StickyScheduleToStartTimeout)
timeout = executionInfo.StickyScheduleToStartTimeout
}

return &matchingservice.AddWorkflowTaskRequest{
Expand All @@ -2652,7 +2652,7 @@ func (s *transferQueueActiveTaskExecutorSuite) createAddWorkflowTaskRequest(
},
TaskQueue: taskQueue,
ScheduledEventId: task.ScheduledEventID,
ScheduleToStartTimeout: &timeout,
ScheduleToStartTimeout: timeout,
Clock: vclock.NewVectorClock(s.mockClusterMetadata.GetClusterID(), s.mockShard.GetShardID(), task.TaskID),
}
}
Expand Down
13 changes: 6 additions & 7 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(
ctx context.Context,
transferTask *tasks.WorkflowTask,
) error {
processTaskIfClosed := false
actionFn := func(_ context.Context, wfContext workflow.Context, mutableState workflow.MutableState) (interface{}, error) {
wtInfo := mutableState.GetWorkflowTaskByID(transferTask.ScheduledEventID)
if wtInfo == nil {
Expand All @@ -189,13 +188,13 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(
Name: mutableState.GetExecutionInfo().TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
workflowRunTimeout := timestamp.DurationValue(executionInfo.WorkflowRunTimeout)
taskScheduleToStartTimeoutSeconds := int64(workflowRunTimeout.Round(time.Second).Seconds())
workflowRunTimeout := executionInfo.WorkflowRunTimeout
taskScheduleToStartTimeout := workflowRunTimeout
if mutableState.GetExecutionInfo().TaskQueue != transferTask.TaskQueue {
// Experimental: try to push sticky task as regular task with sticky timeout as TTL.
// workflow might be sticky before namespace become standby
// there shall already be a schedule_to_start timer created
taskScheduleToStartTimeoutSeconds = int64(timestamp.DurationValue(executionInfo.StickyScheduleToStartTimeout).Seconds())
taskScheduleToStartTimeout = executionInfo.StickyScheduleToStartTimeout
}
err := CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), wtInfo.Version, transferTask.Version, transferTask)
if err != nil {
Expand All @@ -205,7 +204,7 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(
if wtInfo.StartedEventID == common.EmptyEventID {
return newWorkflowTaskPostActionInfo(
mutableState,
taskScheduleToStartTimeoutSeconds,
taskScheduleToStartTimeout,
*taskQueue,
)
}
Expand All @@ -215,7 +214,7 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(

return t.processTransfer(
ctx,
processTaskIfClosed,
false,
transferTask,
actionFn,
getStandbyPostActionFn(
Expand Down Expand Up @@ -575,7 +574,7 @@ func (t *transferQueueStandbyTaskExecutor) pushWorkflowTask(
ctx,
task.(*tasks.WorkflowTask),
&pushwtInfo.taskqueue,
timestamp.DurationFromSeconds(timeout),
timeout,
)
}

Expand Down

0 comments on commit 40e7627

Please sign in to comment.