Skip to content

Commit

Permalink
Schedule a new workflow task after force flush buffer events (#4490)
Browse files Browse the repository at this point in the history
* Schedule a new workflow task after force flush buffer events
  • Loading branch information
wxing1292 committed Jun 14, 2023
1 parent 8795c6c commit 2db855d
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 0 deletions.
1 change: 1 addition & 0 deletions service/history/ndc/branch_manager.go
Expand Up @@ -190,6 +190,7 @@ func (r *BranchMgrImpl) flushBufferedEvents(
if err := targetWorkflow.FlushBufferedEvents(); err != nil {
return nil, 0, err
}

// the workflow must be updated as active, to send out replication tasks
if err := targetWorkflow.context.UpdateWorkflowExecutionAsActive(
ctx,
Expand Down
5 changes: 5 additions & 0 deletions service/history/ndc/branch_manager_test.go
Expand Up @@ -35,6 +35,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
historyspb "go.temporal.io/server/api/history/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -254,6 +255,10 @@ func (s *branchMgrSuite) TestFlushBufferedEvents() {
"",
int64(0),
).Return(&historypb.HistoryEvent{}, nil)
s.mockMutableState.EXPECT().AddWorkflowTaskScheduledEvent(
false,
enumsspb.WORKFLOW_TASK_TYPE_NORMAL,
).Return(&workflow.WorkflowTaskInfo{}, nil)
s.mockMutableState.EXPECT().FlushBufferedEvents()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, lastWriteVersion).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
Expand Down
6 changes: 6 additions & 0 deletions service/history/ndc/workflow.go
Expand Up @@ -222,6 +222,12 @@ func (r *WorkflowImpl) FlushBufferedEvents() error {
}

_, err = r.failWorkflowTask(lastWriteVersion)
if _, err := r.mutableState.AddWorkflowTaskScheduledEvent(
false,
enumsspb.WORKFLOW_TASK_TYPE_NORMAL,
); err != nil {
return err
}
return err
}

Expand Down

0 comments on commit 2db855d

Please sign in to comment.