Skip to content

Commit

Permalink
Check workflow task after reapply events (#2840)
Browse files Browse the repository at this point in the history
* Check workflow task after reapply events
  • Loading branch information
yux0 authored and meiliang86 committed Jun 1, 2022
1 parent 3fff3f0 commit b7562a4
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 4 deletions.
5 changes: 2 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,9 +1976,8 @@ func (e *historyEngineImpl) SignalWorkflowExecution(

executionInfo := mutableState.GetExecutionInfo()
createWorkflowTask := true
// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
workflowTaskBackoff := timestamp.TimeValue(executionInfo.GetExecutionTime()).After(timestamp.TimeValue(executionInfo.GetStartTime()))
if workflowTaskBackoff && !mutableState.HasProcessedOrPendingWorkflowTask() {
if mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() {
// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
createWorkflowTask = false
}

Expand Down
15 changes: 14 additions & 1 deletion service/history/nDCEventsReapplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func newNDCEventsReapplier(
}

func (r *nDCEventsReapplierImpl) reapplyEvents(
_ context.Context,
ctx context.Context,
msBuilder workflow.MutableState,
historyEvents []*historypb.HistoryEvent,
runID string,
Expand Down Expand Up @@ -108,5 +108,18 @@ func (r *nDCEventsReapplierImpl) reapplyEvents(
deDupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion())
msBuilder.UpdateDuplicatedResource(deDupResource)
}

// After reapply event, checking if we should schedule a workflow task
if msBuilder.IsWorkflowPendingOnWorkflowTaskBackoff() {
// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
return reappliedEvents, nil
}
if !msBuilder.HasPendingWorkflowTask() {
if _, err := msBuilder.AddWorkflowTaskScheduledEvent(
false,
); err != nil {
return nil, err
}
}
return reappliedEvents, nil
}
2 changes: 2 additions & 0 deletions service/history/nDCEventsReapplier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent() {
attr.GetIdentity(),
attr.GetHeader(),
).Return(event, nil)
msBuilderCurrent.EXPECT().IsWorkflowPendingOnWorkflowTaskBackoff().Return(true)
dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion())
msBuilderCurrent.EXPECT().IsResourceDuplicated(dedupResource).Return(false)
msBuilderCurrent.EXPECT().UpdateDuplicatedResource(dedupResource)
Expand Down Expand Up @@ -178,6 +179,7 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_PartialAppliedEvent() {
attr1.GetIdentity(),
attr1.GetHeader(),
).Return(event1, nil)
msBuilderCurrent.EXPECT().IsWorkflowPendingOnWorkflowTaskBackoff().Return(true)
dedupResource1 := definition.NewEventReappliedID(runID, event1.GetEventId(), event1.GetVersion())
msBuilderCurrent.EXPECT().IsResourceDuplicated(dedupResource1).Return(false)
dedupResource2 := definition.NewEventReappliedID(runID, event2.GetEventId(), event2.GetVersion())
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ type (
IsStickyTaskQueueEnabled() bool
IsWorkflowExecutionRunning() bool
IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool
IsWorkflowPendingOnWorkflowTaskBackoff() bool
UpdateDuplicatedResource(resourceDedupKey definition.DeduplicationID)
ReplicateActivityInfo(*historyservice.SyncActivityRequest, bool) error
ReplicateActivityTaskCancelRequestedEvent(*historypb.HistoryEvent) error
Expand Down
9 changes: 9 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,15 @@ func (e *MutableStateImpl) IsSignalRequested(
return false
}

func (e *MutableStateImpl) IsWorkflowPendingOnWorkflowTaskBackoff() bool {

workflowTaskBackoff := timestamp.TimeValue(e.executionInfo.GetExecutionTime()).After(timestamp.TimeValue(e.executionInfo.GetStartTime()))
if workflowTaskBackoff && !e.HasProcessedOrPendingWorkflowTask() {
return true
}
return false
}

func (e *MutableStateImpl) AddSignalRequested(
requestID string,
) {
Expand Down
28 changes: 28 additions & 0 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b7562a4

Please sign in to comment.