Skip to content

Commit

Permalink
Retry attempts to delete open visibility records (#3402) (#3421)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 22, 2022
1 parent 9dfdf75 commit ff40b89
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
27 changes: 27 additions & 0 deletions service/history/visibilityQueueTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ import (
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
Expand Down Expand Up @@ -436,9 +438,34 @@ func (t *visibilityQueueTaskExecutor) processDeleteExecution(
StartTime: task.StartTime,
CloseTime: task.CloseTime,
}
if t.shard.GetConfig().VisibilityProcessorEnsureCloseBeforeDelete() {
if t.isCloseExecutionVisibilityTaskPending(task) {
return consts.ErrDependencyTaskNotCompleted
}
}
return t.visibilityMgr.DeleteWorkflowExecution(ctx, request)
}

func (t *visibilityQueueTaskExecutor) isCloseExecutionVisibilityTaskPending(task *tasks.DeleteExecutionVisibilityTask) bool {
CloseExecutionVisibilityTaskID := task.CloseExecutionVisibilityTaskID
// taskID == 0 if workflow still running in passive cluster or closed before this field was added (v1.17).
if CloseExecutionVisibilityTaskID == 0 {
return false
}
// check if close execution visibility task is completed
visibilityQueueState, ok := t.shard.GetQueueState(tasks.CategoryVisibility)
if !ok {
// !ok means multi-cursor is not available, so we have to revert to using acks
visibilityQueueAckLevel := t.shard.GetQueueAckLevel(tasks.CategoryVisibility).TaskID
return CloseExecutionVisibilityTaskID > visibilityQueueAckLevel
}
queryTask := &tasks.CloseExecutionVisibilityTask{
WorkflowKey: definition.NewWorkflowKey(task.GetNamespaceID(), task.GetWorkflowID(), task.GetRunID()),
TaskID: CloseExecutionVisibilityTaskID,
}
return !queues.IsTaskAcked(queryTask, visibilityQueueState)
}

func getWorkflowMemo(
memoFields map[string]*commonpb.Payload,
) *commonpb.Memo {
Expand Down
70 changes: 70 additions & 0 deletions service/history/visibilityQueueTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -410,6 +411,75 @@ func (s *visibilityQueueTaskExecutorSuite) TestProcessModifyWorkflowProperties()
s.NoError(err)
}

func (s *visibilityQueueTaskExecutorSuite) TestProcessorDeleteExecution() {
s.mockShard.GetConfig().VisibilityProcessorEnsureCloseBeforeDelete = func() bool {
return true
}
workflowKey := definition.WorkflowKey{
NamespaceID: s.namespaceID.String(),
}
s.Run("TaskID=0", func() {
s.mockVisibilityMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any())
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseExecutionVisibilityTaskID: 0,
})
s.Assert().NoError(err)
})
s.Run("SingleCursorQueue", func() {
const ackLevel int64 = 5
s.mockShard.Resource.ShardMgr.EXPECT().UpdateShard(gomock.Any(), gomock.Any())
s.NoError(s.mockShard.UpdateQueueAckLevel(tasks.CategoryVisibility,
tasks.NewImmediateKey(ackLevel),
))
s.Run("NotAcked", func() {
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseExecutionVisibilityTaskID: ackLevel + 1,
})
s.ErrorIs(err, consts.ErrDependencyTaskNotCompleted)
})
s.Run("Acked", func() {
s.mockVisibilityMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any())
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseExecutionVisibilityTaskID: ackLevel - 1,
})
s.NoError(err)
})
})
s.Run("MultiCursorQueue", func() {
const highWatermark int64 = 5
s.NoError(s.mockShard.UpdateQueueState(tasks.CategoryVisibility, &persistencespb.QueueState{
ReaderStates: nil,
ExclusiveReaderHighWatermark: &persistencespb.TaskKey{
TaskId: highWatermark,
FireTime: timestamp.TimePtr(tasks.DefaultFireTime),
},
}))
s.Run("NotAcked", func() {
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseExecutionVisibilityTaskID: highWatermark + 1,
})
s.ErrorIs(err, consts.ErrDependencyTaskNotCompleted)
})
s.Run("Acked", func() {
s.mockVisibilityMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any())
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseExecutionVisibilityTaskID: highWatermark - 1,
})
s.NoError(err)
})
})
}

func (s *visibilityQueueTaskExecutorSuite) execute(task tasks.Task) error {
_, _, err := s.visibilityQueueTaskExecutor.Execute(context.Background(), s.newTaskExecutable(task))
return err
}

func (s *visibilityQueueTaskExecutorSuite) createRecordWorkflowExecutionStartedRequest(
namespaceName namespace.Name,
startEvent *historypb.HistoryEvent,
Expand Down

0 comments on commit ff40b89

Please sign in to comment.