Skip to content

Commit

Permalink
Handle NDC active -> passive transition with transient workflow & no …
Browse files Browse the repository at this point in the history
…buffered event (#2845)

* Handle NDC active -> passive transition with transient workflow task & no buffered event

NOTE: When workflow transient from active to passive while workflow has transient workflow task & no buffered event, the the transient workflow task should be cleared since <- is not persisted in history.
  • Loading branch information
wxing1292 committed May 25, 2022
1 parent c39cfb7 commit 4ac6e3f
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 0 deletions.
6 changes: 6 additions & 0 deletions service/history/nDCBranchMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ func (r *nDCBranchMgrImpl) flushBufferedEvents(
// check whether there are buffered events, if so, flush it
// NOTE: buffered events does not show in version history or next event id
if !r.mutableState.HasBufferedEvents() {
if r.mutableState.HasTransientWorkflowTask() {
if err := r.mutableState.ClearTransientWorkflowTask(); err != nil {
return nil, 0, err
}
// now transient task is gone
}
return lcaVersionHistoryItem, versionHistoryIndex, nil
}

Expand Down
44 changes: 44 additions & 0 deletions service/history/nDCBranchMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,46 @@ func (s *nDCBranchMgrSuite) TestCreateNewBranch() {
s.True(compareVersionHistory.Equal(newVersionHistory))
}

func (s *nDCBranchMgrSuite) TestClearTransientWorkflowTask() {

lastWriteVersion := int64(300)
versionHistory := versionhistory.NewVersionHistory([]byte("some random base branch token"), []*historyspb.VersionHistoryItem{
versionhistory.NewVersionHistoryItem(10, 0),
versionhistory.NewVersionHistoryItem(50, 100),
versionhistory.NewVersionHistoryItem(100, 200),
versionhistory.NewVersionHistoryItem(150, 300),
})
versionHistories := versionhistory.NewVersionHistories(versionHistory)

incomingVersionHistory := versionhistory.CopyVersionHistory(versionHistory)
err := versionhistory.AddOrUpdateVersionHistoryItem(
incomingVersionHistory,
versionhistory.NewVersionHistoryItem(200, 300),
)
s.NoError(err)

s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastWriteVersion, nil).AnyTimes()
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().ClearTransientWorkflowTask().Return(nil).AnyTimes()

s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
WorkflowId: s.workflowID,
VersionHistories: versionHistories,
}).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: s.runID,
}).AnyTimes()

_, _, err = s.nDCBranchMgr.prepareVersionHistory(
context.Background(),
incomingVersionHistory,
150+2,
300)
s.IsType(&serviceerrors.RetryReplication{}, err)
}

func (s *nDCBranchMgrSuite) TestFlushBufferedEvents() {

lastWriteVersion := int64(300)
Expand Down Expand Up @@ -244,6 +284,7 @@ func (s *nDCBranchMgrSuite) TestPrepareVersionHistory_BranchAppendable_NoMissing

s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes()
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()

doContinue, index, err := s.nDCBranchMgr.prepareVersionHistory(
context.Background(),
Expand Down Expand Up @@ -274,6 +315,7 @@ func (s *nDCBranchMgrSuite) TestPrepareVersionHistory_BranchAppendable_MissingEv
s.NoError(err)

s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
WorkflowId: s.workflowID,
Expand Down Expand Up @@ -314,6 +356,7 @@ func (s *nDCBranchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_NoMiss
newBranchToken := []byte("some random new branch token")

s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
WorkflowId: s.workflowID,
Expand Down Expand Up @@ -372,6 +415,7 @@ func (s *nDCBranchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_Missin
})

s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
WorkflowId: s.workflowID,
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ type (
GetWorkflowType() *commonpb.WorkflowType
GetWorkflowStateStatus() (enumsspb.WorkflowExecutionState, enumspb.WorkflowExecutionStatus)
GetQueryRegistry() QueryRegistry
HasTransientWorkflowTask() bool
ClearTransientWorkflowTask() error
HasBufferedEvents() bool
HasInFlightWorkflowTask() bool
HasParentExecution() bool
Expand Down
40 changes: 40 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,46 @@ func (e *MutableStateImpl) GetInFlightWorkflowTask() (*WorkflowTaskInfo, bool) {
return e.workflowTaskManager.GetInFlightWorkflowTask()
}

func (e *MutableStateImpl) HasTransientWorkflowTask() bool {
workflowTask, ok := e.GetInFlightWorkflowTask()
if !ok {
return false
}
return workflowTask.ScheduleID >= e.GetNextEventID()
}

func (e *MutableStateImpl) ClearTransientWorkflowTask() error {
workflowTask, ok := e.GetInFlightWorkflowTask()
if !ok {
return serviceerror.NewInternal("cannot clear transient workflow task when task is missing")
}

if workflowTask.ScheduleID < e.GetNextEventID() {
return serviceerror.NewInternal("cannot clear transient workflow task when task is not transient")
}
// workflowTask.ScheduleID >= e.GetNextEventID()
// this is transient workflow
if e.HasBufferedEvents() {
return serviceerror.NewInternal("cannot clear transient workflow task when there are buffered events")
}
// no buffered event
resetWorkflowTaskInfo := &WorkflowTaskInfo{
Version: common.EmptyVersion,
ScheduleID: common.EmptyEventID,
StartedID: common.EmptyEventID,
RequestID: emptyUUID,
WorkflowTaskTimeout: timestamp.DurationFromSeconds(0),
Attempt: 1,
StartedTime: timestamp.UnixOrZeroTimePtr(0),
ScheduledTime: timestamp.UnixOrZeroTimePtr(0),

TaskQueue: nil,
OriginalScheduledTime: timestamp.UnixOrZeroTimePtr(0),
}
e.workflowTaskManager.UpdateWorkflowTask(resetWorkflowTaskInfo)
return nil
}

func (e *MutableStateImpl) HasBufferedEvents() bool {
return e.hBuilder.HasBufferEvents()
}
Expand Down
28 changes: 28 additions & 0 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4ac6e3f

Please sign in to comment.