Skip to content

Commit

Permalink
Allow delete open workflows from passive sides (#2636)
Browse files Browse the repository at this point in the history
* Allow delete open workflows
  • Loading branch information
yux0 committed Mar 24, 2022
1 parent 8a8f0aa commit f0d20df
Show file tree
Hide file tree
Showing 14 changed files with 423 additions and 300 deletions.
607 changes: 337 additions & 270 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/persistence/serialization/task_serializer.go
Expand Up @@ -929,6 +929,7 @@ func (s *TaskSerializer) visibilityDeleteTaskToProto(
Version: deleteVisibilityTask.Version,
TaskId: deleteVisibilityTask.TaskID,
VisibilityTime: &deleteVisibilityTask.VisibilityTimestamp,
StartTime: deleteVisibilityTask.StartTime,
CloseTime: deleteVisibilityTask.CloseTime,
}
}
Expand All @@ -945,6 +946,7 @@ func (s *TaskSerializer) visibilityDeleteTaskFromProto(
VisibilityTimestamp: *deleteVisibilityTask.VisibilityTime,
TaskID: deleteVisibilityTask.TaskId,
Version: deleteVisibilityTask.Version,
StartTime: deleteVisibilityTask.StartTime,
CloseTime: deleteVisibilityTask.CloseTime,
}
}
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/visibility/manager/visibility_manager.go
Expand Up @@ -168,7 +168,8 @@ type (
RunID string
WorkflowID string
TaskID int64
CloseTime time.Time
StartTime *time.Time // if start time is not empty, delete record from open_execution for cassandra db
CloseTime *time.Time // if end time is not empty, delete record from closed_execution for cassandra db
}
)

Expand Down
Expand Up @@ -514,16 +514,19 @@ func (s *VisibilityPersistenceSuite) TestFilteringByStatus() {
}

// TestDelete test
func (s *VisibilityPersistenceSuite) TestDelete() {
nRows := 5
func (s *VisibilityPersistenceSuite) TestDeleteWorkflow() {
openRows := 10
closedRows := 5
testNamespaceUUID := namespace.ID(uuid.New())
closeTime := time.Now().UTC()
startTime := closeTime.Add(-5 * time.Second)
for i := 0; i < nRows; i++ {
var pendingExecutions []commonpb.WorkflowExecution
for i := 0; i < openRows; i++ {
workflowExecution := commonpb.WorkflowExecution{
WorkflowId: uuid.New(),
RunId: uuid.New(),
}
pendingExecutions = append(pendingExecutions, workflowExecution)
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(s.ctx, &manager.RecordWorkflowExecutionStartedRequest{
VisibilityRequestBase: &manager.VisibilityRequestBase{
NamespaceID: testNamespaceUUID,
Expand All @@ -533,10 +536,13 @@ func (s *VisibilityPersistenceSuite) TestDelete() {
},
})
s.Nil(err0)
}

for i := 0; i < closedRows; i++ {
closeReq := &manager.RecordWorkflowExecutionClosedRequest{
VisibilityRequestBase: &manager.VisibilityRequestBase{
NamespaceID: testNamespaceUUID,
Execution: workflowExecution,
Execution: pendingExecutions[i],
WorkflowTypeName: "visibility-workflow",
StartTime: startTime,
Status: enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
Expand All @@ -555,27 +561,53 @@ func (s *VisibilityPersistenceSuite) TestDelete() {
PageSize: 10,
})
s.Nil(err3)
s.Equal(nRows, len(resp.Executions))
s.Equal(closedRows, len(resp.Executions))

remaining := nRows
// Delete closed workflow
for _, row := range resp.Executions {
err4 := s.VisibilityMgr.DeleteWorkflowExecution(s.ctx, &manager.VisibilityDeleteWorkflowExecutionRequest{
NamespaceID: testNamespaceUUID,
WorkflowID: row.GetExecution().GetWorkflowId(),
RunID: row.GetExecution().GetRunId(),
CloseTime: closeTime,
CloseTime: &closeTime,
})
s.Nil(err4)
remaining--
resp, err5 := s.VisibilityMgr.ListClosedWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
EarliestStartTime: startTime,
LatestStartTime: closeTime,
PageSize: 10,
}
resp, err5 := s.VisibilityMgr.ListClosedWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
EarliestStartTime: startTime,
LatestStartTime: closeTime,
PageSize: 10,
})
s.Nil(err5)
s.Equal(0, len(resp.Executions))

resp, err6 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
EarliestStartTime: startTime,
LatestStartTime: closeTime,
PageSize: 10,
})
s.Nil(err6)
s.Equal(openRows-closedRows, len(resp.Executions))
// Delete open workflow
for _, row := range resp.Executions {
err7 := s.VisibilityMgr.DeleteWorkflowExecution(s.ctx, &manager.VisibilityDeleteWorkflowExecutionRequest{
NamespaceID: testNamespaceUUID,
WorkflowID: row.GetExecution().GetWorkflowId(),
RunID: row.GetExecution().GetRunId(),
StartTime: &startTime,
})
s.Nil(err5)
s.Equal(remaining, len(resp.Executions))
s.Nil(err7)
}
resp, err8 := s.VisibilityMgr.ListOpenWorkflowExecutions(s.ctx, &manager.ListWorkflowExecutionsRequest{
NamespaceID: testNamespaceUUID,
EarliestStartTime: startTime,
LatestStartTime: closeTime,
PageSize: 10,
})
s.Nil(err8)
s.Equal(0, len(resp.Executions))
}

// TestUpsertWorkflowExecution test
Expand Down
Expand Up @@ -433,13 +433,24 @@ func (v *visibilityStore) ListClosedWorkflowExecutionsByStatus(
}

func (v *visibilityStore) DeleteWorkflowExecution(request *manager.VisibilityDeleteWorkflowExecutionRequest) error {
query := v.session.Query(templateDeleteWorkflowExecutionClosed,
request.NamespaceID.String(),
namespacePartition,
persistence.UnixMilliseconds(request.CloseTime),
request.RunID).
Consistency(v.lowConslevel)
if err := query.Exec(); err != nil {
var query gocql.Query
if request.StartTime != nil {
query = v.session.Query(templateDeleteWorkflowExecutionStarted,
request.NamespaceID.String(),
namespacePartition,
persistence.UnixMilliseconds(*request.StartTime),
request.RunID)
} else if request.CloseTime != nil {
query = v.session.Query(templateDeleteWorkflowExecutionClosed,
request.NamespaceID.String(),
namespacePartition,
persistence.UnixMilliseconds(*request.CloseTime),
request.RunID)
} else {
panic("both StartTime and CloseTime are nil")
}

if err := query.Consistency(v.lowConslevel).Exec(); err != nil {
return gocql.ConvertError("DeleteWorkflowExecution", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion proto/api
Expand Up @@ -191,6 +191,7 @@ message VisibilityTaskInfo {
int64 task_id = 6;
google.protobuf.Timestamp visibility_time = 7 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp close_time = 8 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp start_time = 9 [(gogoproto.stdtime) = true];
}

// timer column
Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/context.go
Expand Up @@ -104,7 +104,7 @@ type (
SetWorkflowExecution(ctx context.Context, request *persistence.SetWorkflowExecutionRequest) (*persistence.SetWorkflowExecutionResponse, error)
// DeleteWorkflowExecution deletes workflow execution, current workflow execution, and add task to delete visibility.
// If branchToken != nil, then delete history also, otherwise leave history.
DeleteWorkflowExecution(ctx context.Context, workflowKey definition.WorkflowKey, branchToken []byte, version int64, closeTime *time.Time) error
DeleteWorkflowExecution(ctx context.Context, workflowKey definition.WorkflowKey, branchToken []byte, version int64, startTime *time.Time, closeTime *time.Time) error

GetRemoteAdminClient(cluster string) adminservice.AdminServiceClient
GetHistoryClient() historyservice.HistoryServiceClient
Expand Down
2 changes: 2 additions & 0 deletions service/history/shard/context_impl.go
Expand Up @@ -819,6 +819,7 @@ func (s *ContextImpl) DeleteWorkflowExecution(
key definition.WorkflowKey,
branchToken []byte,
newTaskVersion int64,
startTime *time.Time,
closeTime *time.Time,
) error {
// DeleteWorkflowExecution is a 4-steps process (order is very important and should not be changed):
Expand Down Expand Up @@ -871,6 +872,7 @@ func (s *ContextImpl) DeleteWorkflowExecution(
WorkflowKey: key,
VisibilityTimestamp: s.timeSource.Now(),
Version: newTaskVersion,
StartTime: startTime,
CloseTime: closeTime,
},
},
Expand Down
8 changes: 4 additions & 4 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions service/history/tasks/delete_visibility_task.go
Expand Up @@ -39,6 +39,7 @@ type (
VisibilityTimestamp time.Time
TaskID int64
Version int64
StartTime *time.Time
CloseTime *time.Time
}
)
Expand Down
3 changes: 2 additions & 1 deletion service/history/visibilityQueueTaskExecutor.go
Expand Up @@ -462,7 +462,8 @@ func (t *visibilityQueueTaskExecutor) processDeleteExecution(
WorkflowID: task.WorkflowID,
RunID: task.RunID,
TaskID: task.TaskID,
CloseTime: *task.CloseTime,
StartTime: task.StartTime,
CloseTime: task.CloseTime,
}
return t.visibilityMgr.DeleteWorkflowExecution(ctx, request)
}
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/delete_manager.go
Expand Up @@ -83,6 +83,7 @@ func NewDeleteManager(

return deleteManager
}

func (m *DeleteManagerImpl) AddDeleteWorkflowExecutionTask(
ctx context.Context,
nsID namespace.ID,
Expand Down Expand Up @@ -223,6 +224,7 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
},
currentBranchToken,
newTaskVersion,
nil,
completionEvent.GetEventTime(),
); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions service/history/workflow/delete_manager_test.go
Expand Up @@ -132,6 +132,7 @@ func (s *deleteManagerWorkflowSuite) TestDeleteDeletedWorkflowExecution() {
},
[]byte{22, 8, 78},
int64(1),
nil,
&closeTime,
).Return(nil)
mockWeCtx.EXPECT().Clear()
Expand Down Expand Up @@ -174,6 +175,7 @@ func (s *deleteManagerWorkflowSuite) TestDeleteDeletedWorkflowExecution_Error()
},
[]byte{22, 8, 78},
int64(1),
nil,
&closeTime,
).Return(serviceerror.NewInternal("test error"))

Expand Down Expand Up @@ -248,6 +250,7 @@ func (s *deleteManagerWorkflowSuite) TestDeleteWorkflowExecutionRetention_Archiv
},
nil,
int64(1),
nil,
&closeTime,
).Return(nil)
mockWeCtx.EXPECT().Clear()
Expand Down

0 comments on commit f0d20df

Please sign in to comment.