From be727b37a157e97f300579177af47c435043ed9b Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Tue, 20 Sep 2022 12:50:54 -0700 Subject: [PATCH] Retry attempts to delete open visibility records (#3402) --- .../history/visibilityQueueTaskExecutor.go | 27 +++++++ .../visibilityQueueTaskExecutor_test.go | 71 +++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/service/history/visibilityQueueTaskExecutor.go b/service/history/visibilityQueueTaskExecutor.go index eb8a9dd570a..5becdb13208 100644 --- a/service/history/visibilityQueueTaskExecutor.go +++ b/service/history/visibilityQueueTaskExecutor.go @@ -33,11 +33,13 @@ import ( "go.temporal.io/api/serviceerror" "go.temporal.io/server/common" + "go.temporal.io/server/common/definition" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" @@ -436,9 +438,34 @@ func (t *visibilityQueueTaskExecutor) processDeleteExecution( StartTime: task.StartTime, CloseTime: task.CloseTime, } + if t.shard.GetConfig().VisibilityProcessorEnsureCloseBeforeDelete() { + if t.isCloseExecutionVisibilityTaskPending(task) { + return consts.ErrDependencyTaskNotCompleted + } + } return t.visibilityMgr.DeleteWorkflowExecution(ctx, request) } +func (t *visibilityQueueTaskExecutor) isCloseExecutionVisibilityTaskPending(task *tasks.DeleteExecutionVisibilityTask) bool { + closeVisibilityTaskId := task.CloseVisibilityTaskID + // taskID == 0 if workflow still running in passive cluster or closed before this field was added (v1.17). + if closeVisibilityTaskId == 0 { + return false + } + // check if close execution visibility task is completed + visibilityQueueState, ok := t.shard.GetQueueState(tasks.CategoryVisibility) + if !ok { + // !ok means multi-cursor is not available, so we have to revert to using acks + visibilityQueueAckLevel := t.shard.GetQueueAckLevel(tasks.CategoryVisibility).TaskID + return closeVisibilityTaskId > visibilityQueueAckLevel + } + queryTask := &tasks.CloseExecutionVisibilityTask{ + WorkflowKey: definition.NewWorkflowKey(task.GetNamespaceID(), task.GetWorkflowID(), task.GetRunID()), + TaskID: closeVisibilityTaskId, + } + return !queues.IsTaskAcked(queryTask, visibilityQueueState) +} + func getWorkflowMemo( memoFields map[string]*commonpb.Payload, ) *commonpb.Memo { diff --git a/service/history/visibilityQueueTaskExecutor_test.go b/service/history/visibilityQueueTaskExecutor_test.go index 4c4935965f8..af4346c09a4 100644 --- a/service/history/visibilityQueueTaskExecutor_test.go +++ b/service/history/visibilityQueueTaskExecutor_test.go @@ -29,6 +29,8 @@ import ( "testing" "time" + "go.temporal.io/server/service/history/consts" + "github.com/golang/mock/gomock" "github.com/pborman/uuid" "github.com/stretchr/testify/require" @@ -410,6 +412,75 @@ func (s *visibilityQueueTaskExecutorSuite) TestProcessModifyWorkflowProperties() s.NoError(err) } +func (s *visibilityQueueTaskExecutorSuite) TestProcessorDeleteExecution() { + s.mockShard.GetConfig().VisibilityProcessorEnsureCloseBeforeDelete = func() bool { + return true + } + workflowKey := definition.WorkflowKey{ + NamespaceID: s.namespaceID.String(), + } + s.Run("TaskID=0", func() { + s.mockVisibilityMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()) + err := s.execute(&tasks.DeleteExecutionVisibilityTask{ + WorkflowKey: workflowKey, + CloseVisibilityTaskID: 0, + }) + s.Assert().NoError(err) + }) + s.Run("SingleCursorQueue", func() { + const ackLevel int64 = 5 + s.mockShard.Resource.ShardMgr.EXPECT().UpdateShard(gomock.Any(), gomock.Any()) + s.NoError(s.mockShard.UpdateQueueAckLevel(tasks.CategoryVisibility, + tasks.NewImmediateKey(ackLevel), + )) + s.Run("NotAcked", func() { + err := s.execute(&tasks.DeleteExecutionVisibilityTask{ + WorkflowKey: workflowKey, + CloseVisibilityTaskID: ackLevel + 1, + }) + s.ErrorIs(err, consts.ErrDependencyTaskNotCompleted) + }) + s.Run("Acked", func() { + s.mockVisibilityMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()) + err := s.execute(&tasks.DeleteExecutionVisibilityTask{ + WorkflowKey: workflowKey, + CloseVisibilityTaskID: ackLevel - 1, + }) + s.NoError(err) + }) + }) + s.Run("MultiCursorQueue", func() { + const highWatermark int64 = 5 + s.NoError(s.mockShard.UpdateQueueState(tasks.CategoryVisibility, &persistencespb.QueueState{ + ReaderStates: nil, + ExclusiveReaderHighWatermark: &persistencespb.TaskKey{ + TaskId: highWatermark, + FireTime: timestamp.TimePtr(tasks.DefaultFireTime), + }, + })) + s.Run("NotAcked", func() { + err := s.execute(&tasks.DeleteExecutionVisibilityTask{ + WorkflowKey: workflowKey, + CloseVisibilityTaskID: highWatermark + 1, + }) + s.ErrorIs(err, consts.ErrDependencyTaskNotCompleted) + }) + s.Run("Acked", func() { + s.mockVisibilityMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()) + err := s.execute(&tasks.DeleteExecutionVisibilityTask{ + WorkflowKey: workflowKey, + CloseVisibilityTaskID: highWatermark - 1, + }) + s.NoError(err) + }) + }) +} + +func (s *visibilityQueueTaskExecutorSuite) execute(task tasks.Task) error { + _, _, err := s.visibilityQueueTaskExecutor.Execute(context.Background(), s.newTaskExecutable(task)) + return err +} + func (s *visibilityQueueTaskExecutorSuite) createRecordWorkflowExecutionStartedRequest( namespaceName namespace.Name, startEvent *historypb.HistoryEvent,