Skip to content

Commit

Permalink
Resend history for pending standby activity workflow task (#2796)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 12, 2022
1 parent cf4153c commit 2b86009
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 132 deletions.
50 changes: 36 additions & 14 deletions service/history/nDCStandbyTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,16 @@ type (
lastEventVersion int64
}

pushActivityTaskToMatchingInfo struct {
activityTaskPostActionInfo struct {
*historyResendInfo

taskQueue string
activityTaskScheduleToStartTimeout time.Duration
}

pushWorkflowTaskToMatchingInfo struct {
workflowTaskPostActionInfo struct {
*historyResendInfo

workflowTaskScheduleToStartTimeout int64
taskqueue taskqueuepb.TaskQueue
}
Expand Down Expand Up @@ -134,35 +138,53 @@ func newHistoryResendInfo(
}
}

func newPushActivityToMatchingInfo(
func newActivityTaskPostActionInfo(
mutableState workflow.MutableState,
activityScheduleToStartTimeout time.Duration,
) *pushActivityTaskToMatchingInfo {
) (*activityTaskPostActionInfo, error) {
resendInfo, err := getHistoryResendInfo(mutableState)
if err != nil {
return nil, err
}

return &pushActivityTaskToMatchingInfo{
return &activityTaskPostActionInfo{
historyResendInfo: resendInfo,
activityTaskScheduleToStartTimeout: activityScheduleToStartTimeout,
}
}, nil
}

func newActivityRetryTimerToMatchingInfo(
func newActivityRetryTimePostActionInfo(
mutableState workflow.MutableState,
taskQueue string,
activityScheduleToStartTimeout time.Duration,
) *pushActivityTaskToMatchingInfo {
) (*activityTaskPostActionInfo, error) {
resendInfo, err := getHistoryResendInfo(mutableState)
if err != nil {
return nil, err
}

return &pushActivityTaskToMatchingInfo{
return &activityTaskPostActionInfo{
historyResendInfo: resendInfo,
taskQueue: taskQueue,
activityTaskScheduleToStartTimeout: activityScheduleToStartTimeout,
}
}, nil
}

func newPushWorkflowTaskToMatchingInfo(
func newWorkflowTaskPostActionInfo(
mutableState workflow.MutableState,
workflowTaskScheduleToStartTimeout int64,
taskqueue taskqueuepb.TaskQueue,
) *pushWorkflowTaskToMatchingInfo {
) (*workflowTaskPostActionInfo, error) {
resendInfo, err := getHistoryResendInfo(mutableState)
if err != nil {
return nil, err
}

return &pushWorkflowTaskToMatchingInfo{
return &workflowTaskPostActionInfo{
historyResendInfo: resendInfo,
workflowTaskScheduleToStartTimeout: workflowTaskScheduleToStartTimeout,
taskqueue: taskqueue,
}
}, nil
}

func getHistoryResendInfo(
Expand Down
20 changes: 13 additions & 7 deletions service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask(
return nil, nil
}

return newActivityRetryTimerToMatchingInfo(activityInfo.TaskQueue, *activityInfo.ScheduleToStartTimeout), nil
return newActivityRetryTimePostActionInfo(mutableState, activityInfo.TaskQueue, *activityInfo.ScheduleToStartTimeout)
}

return t.processTimer(
Expand All @@ -302,7 +302,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask(
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.pushActivity,
t.fetchHistoryFromRemote,
t.pushActivity,
),
)
Expand Down Expand Up @@ -492,15 +492,21 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote(
ctx context.Context,
taskInfo tasks.Task,
postActionInfo interface{},
_ log.Logger,
logger log.Logger,
) error {

if postActionInfo == nil {
var resendInfo *historyResendInfo
switch postActionInfo := postActionInfo.(type) {
case nil:
return nil
case *historyResendInfo:
resendInfo = postActionInfo
case *activityTaskPostActionInfo:
resendInfo = postActionInfo.historyResendInfo
default:
logger.Fatal("unknown post action info for fetching remote history", tag.Value(postActionInfo))
}

resendInfo := postActionInfo.(*historyResendInfo)

t.metricsClient.IncCounter(metrics.HistoryRereplicationByTimerTaskScope, metrics.ClientRequests)
stopwatch := t.metricsClient.StartTimer(metrics.HistoryRereplicationByTimerTaskScope, metrics.ClientLatency)
defer stopwatch.Stop()
Expand Down Expand Up @@ -562,7 +568,7 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity(
return nil
}

pushActivityInfo := postActionInfo.(*pushActivityTaskToMatchingInfo)
pushActivityInfo := postActionInfo.(*activityTaskPostActionInfo)
activityScheduleToStartTimeout := &pushActivityInfo.activityTaskScheduleToStartTimeout
activityTask := task.(*tasks.ActivityRetryTimerTask)

Expand Down
27 changes: 26 additions & 1 deletion service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_Activ
s.Nil(err)
}

func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_PushToMatching() {
func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_Pending() {

execution := commonpb.WorkflowExecution{
WorkflowId: "some random workflow ID",
Expand Down Expand Up @@ -1472,7 +1472,32 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_PushT

persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)

// no-op post action
s.mockShard.SetCurrentTime(s.clusterName, s.now)
err = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask))
s.Equal(consts.ErrTaskRetry, err)

// resend history post action
s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration))
s.mockAdminClient.EXPECT().RefreshWorkflowTasks(gomock.Any(), &adminservice.RefreshWorkflowTasksRequest{
Namespace: s.namespaceEntry.Name().String(),
Execution: &execution,
}).Return(&adminservice.RefreshWorkflowTasksResponse{}, nil)
s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory(
s.namespaceID,
execution.WorkflowId,
execution.RunId,
scheduledEvent.GetEventId(),
s.version,
int64(0),
int64(0),
).Return(nil)
err = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask))
s.Equal(consts.ErrTaskRetry, err)

// push to matching post action
s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.discardDuration))
s.mockMatchingClient.EXPECT().AddActivityTask(
gomock.Any(),
&matchingservice.AddActivityTaskRequest{
Expand Down
31 changes: 20 additions & 11 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (t *transferQueueStandbyTaskExecutor) processActivityTask(
}

if activityInfo.StartedId == common.EmptyEventID {
return newPushActivityToMatchingInfo(*activityInfo.ScheduleToStartTimeout), nil
return newActivityTaskPostActionInfo(mutableState, *activityInfo.ScheduleToStartTimeout)
}

return nil, nil
Expand All @@ -149,7 +149,7 @@ func (t *transferQueueStandbyTaskExecutor) processActivityTask(
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.pushActivity,
t.fetchHistoryFromRemote,
t.pushActivity,
),
)
Expand Down Expand Up @@ -189,10 +189,11 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(
}

if wtInfo.StartedID == common.EmptyEventID {
return newPushWorkflowTaskToMatchingInfo(
return newWorkflowTaskPostActionInfo(
mutableState,
taskScheduleToStartTimeoutSeconds,
*taskQueue,
), nil
)
}

return nil, nil
Expand All @@ -208,7 +209,7 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.pushWorkflowTask,
t.fetchHistoryFromRemote,
t.pushWorkflowTask,
),
)
Expand Down Expand Up @@ -517,7 +518,7 @@ func (t *transferQueueStandbyTaskExecutor) pushActivity(
return nil
}

pushActivityInfo := postActionInfo.(*pushActivityTaskToMatchingInfo)
pushActivityInfo := postActionInfo.(*activityTaskPostActionInfo)
timeout := pushActivityInfo.activityTaskScheduleToStartTimeout
return t.transferQueueTaskExecutorBase.pushActivity(
ctx,
Expand All @@ -537,7 +538,7 @@ func (t *transferQueueStandbyTaskExecutor) pushWorkflowTask(
return nil
}

pushwtInfo := postActionInfo.(*pushWorkflowTaskToMatchingInfo)
pushwtInfo := postActionInfo.(*workflowTaskPostActionInfo)
timeout := pushwtInfo.workflowTaskScheduleToStartTimeout
return t.transferQueueTaskExecutorBase.pushWorkflowTask(
ctx,
Expand Down Expand Up @@ -569,15 +570,23 @@ func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote(
ctx context.Context,
taskInfo tasks.Task,
postActionInfo interface{},
log log.Logger,
logger log.Logger,
) error {

if postActionInfo == nil {
var resendInfo *historyResendInfo
switch postActionInfo := postActionInfo.(type) {
case nil:
return nil
case *historyResendInfo:
resendInfo = postActionInfo
case *activityTaskPostActionInfo:
resendInfo = postActionInfo.historyResendInfo
case *workflowTaskPostActionInfo:
resendInfo = postActionInfo.historyResendInfo
default:
logger.Fatal("unknown post action info for fetching remote history", tag.Value(postActionInfo))
}

resendInfo := postActionInfo.(*historyResendInfo)

t.metricsClient.IncCounter(metrics.HistoryRereplicationByTransferTaskScope, metrics.ClientRequests)
stopwatch := t.metricsClient.StartTimer(metrics.HistoryRereplicationByTransferTaskScope, metrics.ClientLatency)
defer stopwatch.Stop()
Expand Down

0 comments on commit 2b86009

Please sign in to comment.