Skip to content

Commit

Permalink
Check visibility ack level in standby cluster for DeleteWorkflowExecu…
Browse files Browse the repository at this point in the history
…tion (#2870)
  • Loading branch information
alexshtin committed May 20, 2022
1 parent faa215f commit 548dd6c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 20 deletions.
4 changes: 2 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2027,9 +2027,9 @@ func (e *historyEngineImpl) DeleteWorkflowExecution(
// Open and Close workflow executions are deleted differently.
// Open workflow execution is deleted by terminating with special flag `deleteAfterTerminate` set to true.
// This flag will be carried over with CloseExecutionTask and workflow will be deleted as the last step while processing the task.

//
// Close workflow execution is deleted using DeleteExecutionTask.

//
// DeleteWorkflowExecution is not replicated automatically. Workflow executions must be deleted separately in each cluster.
// Although running workflows in active cluster are terminated first and the termination event might be replicated.
// In passive cluster, workflow executions are just deleted in regardless of its state.
Expand Down
27 changes: 15 additions & 12 deletions service/history/workflow/delete_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,24 @@ func (m *DeleteManagerImpl) AddDeleteWorkflowExecutionTask(
visibilityQueueAckLevel int64,
) error {

// Create `DeleteWorkflowExecutionTask` only if workflow is closed successfully and all pending tasks are completed (if in active cluster).
// Otherwise, mutable state might be deleted before close tasks are executed (race condition between close and delete tasks).
// In active cluster, create `DeleteWorkflowExecutionTask` only if workflow is closed successfully
// and all pending transfer and visibility tasks are completed.
// This check is required to avoid race condition between close and delete tasks.
// Otherwise, mutable state might be deleted before close task is executed, and therefore close task will be dropped.
//
// In passive cluster, transfer task queue check can be ignored but not visibility task queue.
// If visibility close task is executed after visibility record is deleted then it will resurrect record in closed state.
//
// Unfortunately, queue ack levels are updated with delay (default 30s),
// therefore this API will return error if workflow is deleted within 30 seconds after close.
// The check is on API call side, not on task processor side, because delete visibility task doesn't have access to mutable state.
if (ms.GetExecutionInfo().CloseTransferTaskId != 0 && // Workflow execution still might be running in passive cluster.
ms.GetExecutionInfo().CloseTransferTaskId > transferQueueAckLevel) || // Transfer close task wasn't executed.
(ms.GetExecutionInfo().CloseVisibilityTaskId != 0 && // Workflow execution still might be running in passive cluster.
ms.GetExecutionInfo().CloseVisibilityTaskId > visibilityQueueAckLevel) {

// The logic above is only for active cluster. Standby cluster bypasses ack level check,
// because race condition doesn't break anything there.
if ms.GetNamespaceEntry().ActiveInCluster(m.shard.GetClusterMetadata().GetCurrentClusterName()) {
return consts.ErrWorkflowNotReady
}
if (ms.GetExecutionInfo().CloseTransferTaskId != 0 && // Workflow execution still might be running in passive cluster or closed before this field was added (v1.17).
ms.GetExecutionInfo().CloseTransferTaskId > transferQueueAckLevel && // Transfer close task wasn't executed.
ms.GetNamespaceEntry().ActiveInCluster(m.shard.GetClusterMetadata().GetCurrentClusterName())) ||
(ms.GetExecutionInfo().CloseVisibilityTaskId != 0 && // Workflow execution still might be running in passive cluster or closed before this field was added (v1.17).
ms.GetExecutionInfo().CloseVisibilityTaskId > visibilityQueueAckLevel) { // Visibility close task wasn't executed.

return consts.ErrWorkflowNotReady
}

taskGenerator := taskGeneratorProvider.NewTaskGenerator(m.shard, ms)
Expand Down
50 changes: 44 additions & 6 deletions service/history/workflow/delete_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
s.mockShardContext.EXPECT().GetShardID().Return(int32(1)).AnyTimes()
s.mockShardContext.EXPECT().AddTasks(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

// Both queues are right at the minimum level.
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: 1000,
CloseVisibilityTaskId: 1001}).
Expand All @@ -259,6 +260,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
)
s.NoError(err)

// Workflow execution is not closed.
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: 0,
CloseVisibilityTaskId: 0}).
Expand All @@ -268,11 +270,12 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
tests.NamespaceID,
we,
mockMutableState,
2000,
2000,
1000,
1001,
)
s.NoError(err)

// Visibility close task is not processed.
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: 1000,
CloseVisibilityTaskId: 0}).
Expand All @@ -282,11 +285,12 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
tests.NamespaceID,
we,
mockMutableState,
1000,
1001,
2000,
)
s.NoError(err)

// Both queues are behind in active cluster.
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: 1000,
CloseVisibilityTaskId: 1001}).
Expand All @@ -303,12 +307,11 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
)
s.ErrorIs(err, consts.ErrWorkflowNotReady)

// Only visibility queue is behind (cluster doesn't matter).
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: 1000,
CloseVisibilityTaskId: 1001}).
Times(4)
mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry)
s.mockMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName)
err = s.deleteManager.AddDeleteWorkflowExecutionTask(
context.Background(),
tests.NamespaceID,
Expand All @@ -319,21 +322,56 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
)
s.ErrorIs(err, consts.ErrWorkflowNotReady)

// Only transfer queue is behind in active cluster.
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: 1000,
CloseVisibilityTaskId: 1001}).
Times(2)
mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry)
s.mockMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName)
err = s.deleteManager.AddDeleteWorkflowExecutionTask(
context.Background(),
tests.NamespaceID,
we,
mockMutableState,
999,
1001,
)
s.ErrorIs(err, consts.ErrWorkflowNotReady)

// Only transfer queue is behind in standby cluster.
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: 1000,
CloseVisibilityTaskId: 1001}).
Times(4)
mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry)
s.mockMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestAlternativeClusterName)
err = s.deleteManager.AddDeleteWorkflowExecutionTask(
context.Background(),
tests.NamespaceID,
we,
mockMutableState,
999,
1000,
1001,
)
s.NoError(err)

// Both queues are behind in standby cluster.
mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: 1000,
CloseVisibilityTaskId: 1001}).
Times(4)
mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry)
s.mockMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestAlternativeClusterName)
err = s.deleteManager.AddDeleteWorkflowExecutionTask(
context.Background(),
tests.NamespaceID,
we,
mockMutableState,
999,
1000,
)
s.ErrorIs(err, consts.ErrWorkflowNotReady)
}

func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_ArchivalNotInline() {
Expand Down

0 comments on commit 548dd6c

Please sign in to comment.