Skip to content

Commit

Permalink
Revert "Retry attempts to delete open visibility records" (#3420)
Browse files Browse the repository at this point in the history
Revert "Retry attempts to delete open visibility records (#3402)"

This reverts commit be727b3.
  • Loading branch information
MichaelSnowden committed Sep 21, 2022
1 parent be727b3 commit 8f24d1f
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 98 deletions.
27 changes: 0 additions & 27 deletions service/history/visibilityQueueTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ import (
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
Expand Down Expand Up @@ -438,34 +436,9 @@ func (t *visibilityQueueTaskExecutor) processDeleteExecution(
StartTime: task.StartTime,
CloseTime: task.CloseTime,
}
if t.shard.GetConfig().VisibilityProcessorEnsureCloseBeforeDelete() {
if t.isCloseExecutionVisibilityTaskPending(task) {
return consts.ErrDependencyTaskNotCompleted
}
}
return t.visibilityMgr.DeleteWorkflowExecution(ctx, request)
}

func (t *visibilityQueueTaskExecutor) isCloseExecutionVisibilityTaskPending(task *tasks.DeleteExecutionVisibilityTask) bool {
closeVisibilityTaskId := task.CloseVisibilityTaskID
// taskID == 0 if workflow still running in passive cluster or closed before this field was added (v1.17).
if closeVisibilityTaskId == 0 {
return false
}
// check if close execution visibility task is completed
visibilityQueueState, ok := t.shard.GetQueueState(tasks.CategoryVisibility)
if !ok {
// !ok means multi-cursor is not available, so we have to revert to using acks
visibilityQueueAckLevel := t.shard.GetQueueAckLevel(tasks.CategoryVisibility).TaskID
return closeVisibilityTaskId > visibilityQueueAckLevel
}
queryTask := &tasks.CloseExecutionVisibilityTask{
WorkflowKey: definition.NewWorkflowKey(task.GetNamespaceID(), task.GetWorkflowID(), task.GetRunID()),
TaskID: closeVisibilityTaskId,
}
return !queues.IsTaskAcked(queryTask, visibilityQueueState)
}

func getWorkflowMemo(
memoFields map[string]*commonpb.Payload,
) *commonpb.Memo {
Expand Down
71 changes: 0 additions & 71 deletions service/history/visibilityQueueTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"testing"
"time"

"go.temporal.io/server/service/history/consts"

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -412,75 +410,6 @@ func (s *visibilityQueueTaskExecutorSuite) TestProcessModifyWorkflowProperties()
s.NoError(err)
}

func (s *visibilityQueueTaskExecutorSuite) TestProcessorDeleteExecution() {
s.mockShard.GetConfig().VisibilityProcessorEnsureCloseBeforeDelete = func() bool {
return true
}
workflowKey := definition.WorkflowKey{
NamespaceID: s.namespaceID.String(),
}
s.Run("TaskID=0", func() {
s.mockVisibilityMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any())
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseVisibilityTaskID: 0,
})
s.Assert().NoError(err)
})
s.Run("SingleCursorQueue", func() {
const ackLevel int64 = 5
s.mockShard.Resource.ShardMgr.EXPECT().UpdateShard(gomock.Any(), gomock.Any())
s.NoError(s.mockShard.UpdateQueueAckLevel(tasks.CategoryVisibility,
tasks.NewImmediateKey(ackLevel),
))
s.Run("NotAcked", func() {
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseVisibilityTaskID: ackLevel + 1,
})
s.ErrorIs(err, consts.ErrDependencyTaskNotCompleted)
})
s.Run("Acked", func() {
s.mockVisibilityMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any())
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseVisibilityTaskID: ackLevel - 1,
})
s.NoError(err)
})
})
s.Run("MultiCursorQueue", func() {
const highWatermark int64 = 5
s.NoError(s.mockShard.UpdateQueueState(tasks.CategoryVisibility, &persistencespb.QueueState{
ReaderStates: nil,
ExclusiveReaderHighWatermark: &persistencespb.TaskKey{
TaskId: highWatermark,
FireTime: timestamp.TimePtr(tasks.DefaultFireTime),
},
}))
s.Run("NotAcked", func() {
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseVisibilityTaskID: highWatermark + 1,
})
s.ErrorIs(err, consts.ErrDependencyTaskNotCompleted)
})
s.Run("Acked", func() {
s.mockVisibilityMgr.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any())
err := s.execute(&tasks.DeleteExecutionVisibilityTask{
WorkflowKey: workflowKey,
CloseVisibilityTaskID: highWatermark - 1,
})
s.NoError(err)
})
})
}

func (s *visibilityQueueTaskExecutorSuite) execute(task tasks.Task) error {
_, _, err := s.visibilityQueueTaskExecutor.Execute(context.Background(), s.newTaskExecutable(task))
return err
}

func (s *visibilityQueueTaskExecutorSuite) createRecordWorkflowExecutionStartedRequest(
namespaceName namespace.Name,
startEvent *historypb.HistoryEvent,
Expand Down

0 comments on commit 8f24d1f

Please sign in to comment.