Skip to content

Commit

Permalink
Add version check for delete workflow transfer task (#3159)
Browse files Browse the repository at this point in the history
* Add version check to delete workflow transfer task
  • Loading branch information
yux0 authored and yycptt committed Aug 12, 2022
1 parent bd22047 commit cdb9ae0
Show file tree
Hide file tree
Showing 14 changed files with 510 additions and 309 deletions.
645 changes: 366 additions & 279 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ func (s *TaskSerializer) transferDeleteExecutionTaskToProto(
WorkflowId: deleteExecutionTask.WorkflowKey.WorkflowID,
RunId: deleteExecutionTask.WorkflowKey.RunID,
TaskType: enumsspb.TASK_TYPE_TRANSFER_DELETE_EXECUTION,
Version: deleteExecutionTask.Version,
TaskId: deleteExecutionTask.TaskID,
VisibilityTime: timestamp.TimePtr(deleteExecutionTask.VisibilityTimestamp),
}
Expand All @@ -588,6 +589,7 @@ func (s *TaskSerializer) transferDeleteExecutionTaskFromProto(
),
VisibilityTimestamp: *deleteExecutionTask.VisibilityTime,
TaskID: deleteExecutionTask.TaskId,
Version: deleteExecutionTask.Version,
}
}

Expand Down Expand Up @@ -929,6 +931,7 @@ func (s *TaskSerializer) visibilityDeleteTaskToProto(
WorkflowId: deleteVisibilityTask.WorkflowKey.WorkflowID,
RunId: deleteVisibilityTask.WorkflowKey.RunID,
TaskType: enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION,
Version: deleteVisibilityTask.Version,
TaskId: deleteVisibilityTask.TaskID,
VisibilityTime: &deleteVisibilityTask.VisibilityTimestamp,
StartTime: deleteVisibilityTask.StartTime,
Expand All @@ -947,6 +950,7 @@ func (s *TaskSerializer) visibilityDeleteTaskFromProto(
),
VisibilityTimestamp: *deleteVisibilityTask.VisibilityTime,
TaskID: deleteVisibilityTask.TaskId,
Version: deleteVisibilityTask.Version,
StartTime: deleteVisibilityTask.StartTime,
CloseTime: deleteVisibilityTask.CloseTime,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ message TerminateWorkflowExecutionResponse {
message DeleteWorkflowExecutionRequest {
string namespace_id = 1;
temporal.api.common.v1.WorkflowExecution workflow_execution = 2;
int64 workflow_version = 3;
bool closed_workflow_only = 4;
}

message DeleteWorkflowExecutionResponse {
Expand Down
6 changes: 4 additions & 2 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,10 @@ func (h *OperatorHandlerImpl) DeleteWorkflowExecution(ctx context.Context, reque
}

_, err = h.historyClient.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{
NamespaceId: namespaceID.String(),
WorkflowExecution: request.GetWorkflowExecution(),
NamespaceId: namespaceID.String(),
WorkflowExecution: request.GetWorkflowExecution(),
WorkflowVersion: common.EmptyVersion,
ClosedWorkflowOnly: false,
})
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,10 @@ func (e *historyEngineImpl) DeleteWorkflowExecution(
// In passive cluster, workflow executions are just deleted in regardless of its state.

if weCtx.GetMutableState().IsWorkflowExecutionRunning() {
if request.GetClosedWorkflowOnly() {
// skip delete open workflow
return nil
}
ns, err := e.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(request.GetNamespaceId()))
if err != nil {
return err
Expand Down Expand Up @@ -2127,6 +2131,7 @@ func (e *historyEngineImpl) DeleteWorkflowExecution(
e.shard.GetQueueClusterAckLevel(tasks.CategoryTransfer, e.shard.GetClusterMetadata().GetCurrentClusterName()).TaskID,
// Use global ack level visibility queue ack level because cluster level is not updated.
e.shard.GetQueueAckLevel(tasks.CategoryVisibility).TaskID,
request.GetWorkflowVersion(),
)
}

Expand Down
9 changes: 4 additions & 5 deletions service/history/tasks/delete_execution_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"time"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
)

Expand All @@ -39,6 +38,7 @@ type (
definition.WorkflowKey
VisibilityTimestamp time.Time
TaskID int64
Version int64
}
)

Expand All @@ -47,12 +47,11 @@ func (a *DeleteExecutionTask) GetKey() Key {
}

func (a *DeleteExecutionTask) GetVersion() int64 {
// Version is not used for DeleteExecutionTask transfer task because it is created only for
// explicit API call, and in this case execution needs to be deleted regardless of the version.
return common.EmptyVersion
return a.Version
}

func (a *DeleteExecutionTask) SetVersion(_ int64) {
func (a *DeleteExecutionTask) SetVersion(version int64) {
a.Version = version
}

func (a *DeleteExecutionTask) GetTaskID() int64 {
Expand Down
11 changes: 4 additions & 7 deletions service/history/tasks/delete_visibility_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"time"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
)

Expand All @@ -39,6 +38,7 @@ type (
definition.WorkflowKey
VisibilityTimestamp time.Time
TaskID int64
Version int64
// These two fields are needed for cassandra standard visibility.
// TODO (alex): Remove them when cassandra standard visibility is removed.
StartTime *time.Time
Expand All @@ -51,14 +51,11 @@ func (t *DeleteExecutionVisibilityTask) GetKey() Key {
}

func (t *DeleteExecutionVisibilityTask) GetVersion() int64 {
// Version is not used for DeleteExecutionVisibilityTask visibility task because:
// 1. It is created from parent task which either check version itself (DeleteHistoryEventTask) or
// doesn't check version at all (DeleteExecutionTask).
// 2. Delete visibility task processor doesn't have access to mutable state (it is already gone).
return common.EmptyVersion
return t.Version
}

func (t *DeleteExecutionVisibilityTask) SetVersion(_ int64) {
func (t *DeleteExecutionVisibilityTask) SetVersion(version int64) {
t.Version = version
}

func (t *DeleteExecutionVisibilityTask) GetTaskID() int64 {
Expand Down
18 changes: 11 additions & 7 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,17 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
return nil
}

lastWriteVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return err
}
ok := VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), lastWriteVersion, task.Version, task)
if !ok {
return nil
// DeleteAfterClose is set to true when this close execution task was generated as part of delete open workflow execution procedure.
// Delete workflow execution is started by user API call and should be done regardless of current workflow version.
if !task.DeleteAfterClose {
lastWriteVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return err
}
ok := VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), lastWriteVersion, task.Version, task)
if !ok {
return nil
}
}

workflowExecution := commonpb.WorkflowExecution{
Expand Down
58 changes: 58 additions & 0 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,64 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
s.NoError(err)
}

func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_DeleteAfterClose() {
execution := commonpb.WorkflowExecution{
WorkflowId: "some random workflow ID",
RunId: uuid.New(),
}
workflowType := "some random workflow type"
taskQueueName := "some random task queue"

mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId())
_, err := mutableState.AddWorkflowExecutionStartedEvent(
execution,
&historyservice.StartWorkflowExecutionRequest{
Attempt: 1,
NamespaceId: s.namespaceID.String(),
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
WorkflowExecutionTimeout: timestamp.DurationPtr(2 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
},
},
)
s.Nil(err)

wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

taskID := int64(59)
event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil)

transferTask := &tasks.CloseExecutionTask{
WorkflowKey: definition.NewWorkflowKey(
s.namespaceID.String(),
execution.GetWorkflowId(),
execution.GetRunId(),
),
Version: s.version + 1,
TaskID: taskID,
VisibilityTimestamp: time.Now().UTC(),
DeleteAfterClose: true,
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("enabled"), dc.GetBoolPropertyFn(true), "disabled", "random URI"))
s.mockArchivalClient.EXPECT().Archive(gomock.Any(), gomock.Any()).Return(nil, nil)
s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false)

_, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.NoError(err)

transferTask.DeleteAfterClose = false
_, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.NoError(err)
}

func (s *transferQueueActiveTaskExecutorSuite) TestProcessCancelExecution_Success() {
execution := commonpb.WorkflowExecution{
WorkflowId: "some random workflow ID",
Expand Down
15 changes: 13 additions & 2 deletions service/history/transferQueueTaskExecutorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,19 @@ func (t *transferQueueTaskExecutorBase) deleteExecution(
return err
}

// Do not validate version here because DeleteExecutionTask transfer task is created only for
// explicit API call, and in this case execution needs to be deleted regardless of the version.
// If task version is EmptyVersion it means "don't check task version".
// This can happen when task was created from explicit user API call.
// Or the namespace is a local namespace which will not have version conflict.
if task.GetVersion() != common.EmptyVersion {
lastWriteVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return err
}
ok := VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), lastWriteVersion, task.GetVersion(), task)
if !ok {
return nil
}
}

return t.workflowDeleteManager.DeleteWorkflowExecution(
ctx,
Expand Down
29 changes: 26 additions & 3 deletions service/history/workflow/delete_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,30 @@ import (

type (
DeleteManager interface {
AddDeleteWorkflowExecutionTask(ctx context.Context, nsID namespace.ID, we commonpb.WorkflowExecution, ms MutableState, transferQueueAckLevel int64, visibilityQueueAckLevel int64) error
DeleteWorkflowExecution(ctx context.Context, nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, forceDeleteFromOpenVisibility bool) error
DeleteWorkflowExecutionByRetention(ctx context.Context, nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState) error
AddDeleteWorkflowExecutionTask(
ctx context.Context,
nsID namespace.ID,
we commonpb.WorkflowExecution,
ms MutableState,
transferQueueAckLevel int64,
visibilityQueueAckLevel int64,
workflowClosedVersion int64,
) error
DeleteWorkflowExecution(
ctx context.Context,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
forceDeleteFromOpenVisibility bool,
) error
DeleteWorkflowExecutionByRetention(
ctx context.Context,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
) error
}

DeleteManagerImpl struct {
Expand Down Expand Up @@ -94,6 +115,7 @@ func (m *DeleteManagerImpl) AddDeleteWorkflowExecutionTask(
ms MutableState,
transferQueueAckLevel int64,
visibilityQueueAckLevel int64,
workflowClosedVersion int64,
) error {

// In active cluster, create `DeleteWorkflowExecutionTask` only if workflow is closed successfully
Expand Down Expand Up @@ -123,6 +145,7 @@ func (m *DeleteManagerImpl) AddDeleteWorkflowExecutionTask(
return err
}

deleteTask.Version = workflowClosedVersion
return m.shard.AddTasks(ctx, &persistence.AddHistoryTasksRequest{
ShardID: m.shard.GetShardID(),
// RangeID is set by shard
Expand Down
8 changes: 4 additions & 4 deletions service/history/workflow/delete_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions service/history/workflow/delete_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
mockMutableState,
1000,
1001,
0,
)
s.NoError(err)

Expand All @@ -254,6 +255,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
mockMutableState,
1000,
1001,
0,
)
s.NoError(err)

Expand All @@ -269,6 +271,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
mockMutableState,
1000,
1001,
0,
)
s.NoError(err)

Expand All @@ -286,6 +289,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
mockMutableState,
200,
201,
0,
)
s.ErrorIs(err, consts.ErrWorkflowNotReady)

Expand All @@ -301,6 +305,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
mockMutableState,
1000,
1000,
0,
)
s.ErrorIs(err, consts.ErrWorkflowNotReady)

Expand All @@ -318,6 +323,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
mockMutableState,
999,
1001,
0,
)
s.ErrorIs(err, consts.ErrWorkflowNotReady)

Expand All @@ -335,6 +341,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
mockMutableState,
999,
1001,
0,
)
s.NoError(err)

Expand All @@ -352,6 +359,7 @@ func (s *deleteManagerWorkflowSuite) TestAddDeleteWorkflowExecutionTask() {
mockMutableState,
999,
1000,
0,
)
s.ErrorIs(err, consts.ErrWorkflowNotReady)
}
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
if err != nil {
return err
}

closeTasks := []tasks.Task{
&tasks.CloseExecutionTask{
// TaskID is set by shard
Expand Down

0 comments on commit cdb9ae0

Please sign in to comment.