Skip to content

Commit

Permalink
Handle NDC active -> passive transition with transient workflow & no …
Browse files Browse the repository at this point in the history
…buffered event
  • Loading branch information
wxing1292 authored and meiliang86 committed Jun 1, 2022
1 parent bd87aa3 commit 79102bf
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 0 deletions.
6 changes: 6 additions & 0 deletions service/history/nDCBranchMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ func (r *nDCBranchMgrImpl) flushBufferedEvents(
// check whether there are buffered events, if so, flush it
// NOTE: buffered events does not show in version history or next event id
if !r.mutableState.HasBufferedEvents() {
if r.mutableState.HasTransientWorkflowTask() {
if err := r.mutableState.ClearTransientWorkflowTask(); err != nil {
return nil, 0, err
}
// now transient task is gone
}
return lcaVersionHistoryItem, versionHistoryIndex, nil
}

Expand Down
45 changes: 45 additions & 0 deletions service/history/nDCBranchMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/stretchr/testify/suite"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"

historyspb "go.temporal.io/server/api/history/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -173,6 +174,46 @@ func (s *nDCBranchMgrSuite) TestCreateNewBranch() {
s.True(compareVersionHistory.Equal(newVersionHistory))
}

func (s *nDCBranchMgrSuite) TestClearTransientWorkflowTask() {

lastWriteVersion := int64(300)
versionHistory := versionhistory.NewVersionHistory([]byte("some random base branch token"), []*historyspb.VersionHistoryItem{
versionhistory.NewVersionHistoryItem(10, 0),
versionhistory.NewVersionHistoryItem(50, 100),
versionhistory.NewVersionHistoryItem(100, 200),
versionhistory.NewVersionHistoryItem(150, 300),
})
versionHistories := versionhistory.NewVersionHistories(versionHistory)

incomingVersionHistory := versionhistory.CopyVersionHistory(versionHistory)
err := versionhistory.AddOrUpdateVersionHistoryItem(
incomingVersionHistory,
versionhistory.NewVersionHistoryItem(200, 300),
)
s.NoError(err)

s.mockMutableState.EXPECT().GetLastWriteVersion().Return(lastWriteVersion, nil).AnyTimes()
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(true).AnyTimes()
s.mockMutableState.EXPECT().ClearTransientWorkflowTask().Return(nil).AnyTimes()

s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
WorkflowId: s.workflowID,
VersionHistories: versionHistories,
}).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: s.runID,
}).AnyTimes()

_, _, err = s.nDCBranchMgr.prepareVersionHistory(
context.Background(),
incomingVersionHistory,
150+2,
300)
s.IsType(&serviceerrors.RetryReplication{}, err)
}

func (s *nDCBranchMgrSuite) TestFlushBufferedEvents() {

lastWriteVersion := int64(300)
Expand Down Expand Up @@ -244,6 +285,7 @@ func (s *nDCBranchMgrSuite) TestPrepareVersionHistory_BranchAppendable_NoMissing

s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes()
s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()

doContinue, index, err := s.nDCBranchMgr.prepareVersionHistory(
context.Background(),
Expand Down Expand Up @@ -274,6 +316,7 @@ func (s *nDCBranchMgrSuite) TestPrepareVersionHistory_BranchAppendable_MissingEv
s.NoError(err)

s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
WorkflowId: s.workflowID,
Expand Down Expand Up @@ -314,6 +357,7 @@ func (s *nDCBranchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_NoMiss
newBranchToken := []byte("some random new branch token")

s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
WorkflowId: s.workflowID,
Expand Down Expand Up @@ -372,6 +416,7 @@ func (s *nDCBranchMgrSuite) TestPrepareVersionHistory_BranchNotAppendable_Missin
})

s.mockMutableState.EXPECT().HasBufferedEvents().Return(false).AnyTimes()
s.mockMutableState.EXPECT().HasTransientWorkflowTask().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: s.namespaceID,
WorkflowId: s.workflowID,
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ type (
GetWorkflowType() *commonpb.WorkflowType
GetWorkflowStateStatus() (enumsspb.WorkflowExecutionState, enumspb.WorkflowExecutionStatus)
GetQueryRegistry() QueryRegistry
HasTransientWorkflowTask() bool
ClearTransientWorkflowTask() error
HasBufferedEvents() bool
HasInFlightWorkflowTask() bool
HasParentExecution() bool
Expand Down
40 changes: 40 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,46 @@ func (e *MutableStateImpl) GetInFlightWorkflowTask() (*WorkflowTaskInfo, bool) {
return e.workflowTaskManager.GetInFlightWorkflowTask()
}

func (e *MutableStateImpl) HasTransientWorkflowTask() bool {
workflowTask, ok := e.GetInFlightWorkflowTask()
if !ok {
return false
}
return workflowTask.ScheduleID >= e.GetNextEventID()
}

func (e *MutableStateImpl) ClearTransientWorkflowTask() error {
workflowTask, ok := e.GetInFlightWorkflowTask()
if !ok {
return serviceerror.NewInternal("cannot clear transient workflow task when task is missing")
}

if workflowTask.ScheduleID < e.GetNextEventID() {
return serviceerror.NewInternal("cannot clear transient workflow task when task is not transient")
}
// workflowTask.ScheduleID >= e.GetNextEventID()
// this is transient workflow
if e.HasBufferedEvents() {
return serviceerror.NewInternal("cannot clear transient workflow task when there are buffered events")
}
// no buffered event
resetWorkflowTaskInfo := &WorkflowTaskInfo{
Version: common.EmptyVersion,
ScheduleID: common.EmptyEventID,
StartedID: common.EmptyEventID,
RequestID: emptyUUID,
WorkflowTaskTimeout: timestamp.DurationFromSeconds(0),
Attempt: 1,
StartedTime: timestamp.UnixOrZeroTimePtr(0),
ScheduledTime: timestamp.UnixOrZeroTimePtr(0),

TaskQueue: nil,
OriginalScheduledTime: timestamp.UnixOrZeroTimePtr(0),
}
e.workflowTaskManager.UpdateWorkflowTask(resetWorkflowTaskInfo)
return nil
}

func (e *MutableStateImpl) HasBufferedEvents() bool {
return e.hBuilder.HasBufferEvents()
}
Expand Down
28 changes: 28 additions & 0 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 79102bf

Please sign in to comment.