diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index aadbb629f7a..7f073106e16 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -158,6 +158,7 @@ func NewEngineWithShardContext( historyCache, config, archivalClient, + shard.GetTimeSource(), ) historyEngImpl := &historyEngineImpl{ @@ -2267,31 +2268,23 @@ func (e *historyEngineImpl) TerminateWorkflowExecution( func (e *historyEngineImpl) DeleteWorkflowExecution( ctx context.Context, - deleteRequest *historyservice.DeleteWorkflowExecutionRequest, -) error { - - return e.updateWorkflow( - ctx, - namespace.ID(deleteRequest.NamespaceId), - *deleteRequest.GetWorkflowExecution(), - func(weCtx workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) { - if mutableState.IsWorkflowExecutionRunning() { - return nil, consts.ErrWorkflowNotCompleted // workflow is running, cannot be deleted - } - - taskGenerator := workflow.NewTaskGenerator( - e.shard.GetNamespaceRegistry(), - e.logger, - mutableState, - ) + request *historyservice.DeleteWorkflowExecutionRequest, +) (retError error) { + nsID := namespace.ID(request.GetNamespaceId()) - err := taskGenerator.GenerateDeleteExecutionTask(e.timeSource.Now()) - if err != nil { - return nil, err - } + wfCtx, err := e.loadWorkflow(ctx, nsID, request.GetWorkflowExecution().GetWorkflowId(), request.GetWorkflowExecution().GetRunId()) + if err != nil { + return err + } + defer func() { wfCtx.getReleaseFn()(retError) }() - return updateWorkflowWithoutWorkflowTask, nil - }) + return e.workflowDeleteManager.AddDeleteWorkflowExecutionTask( + nsID, + commonpb.WorkflowExecution{ + WorkflowId: request.GetWorkflowExecution().GetWorkflowId(), + RunId: request.GetWorkflowExecution().GetRunId(), + }, + wfCtx.getMutableState()) } // RecordChildExecutionCompleted records the completion of child execution into parent execution history diff --git a/service/history/nDCHistoryReplicator.go b/service/history/nDCHistoryReplicator.go index 338cab1dda9..055af49493f 100644 --- a/service/history/nDCHistoryReplicator.go +++ b/service/history/nDCHistoryReplicator.go @@ -176,7 +176,7 @@ func newNDCHistoryReplicator( logger, state, func(mutableState workflow.MutableState) workflow.TaskGenerator { - return workflow.NewTaskGenerator(shard.GetNamespaceRegistry(), logger, mutableState) + return workflow.NewTaskGenerator(shard.GetNamespaceRegistry(), mutableState) }, ) }, diff --git a/service/history/nDCStateRebuilder.go b/service/history/nDCStateRebuilder.go index f5e3638b79b..1f615bb3090 100644 --- a/service/history/nDCStateRebuilder.go +++ b/service/history/nDCStateRebuilder.go @@ -213,7 +213,7 @@ func (r *nDCStateRebuilderImpl) initializeBuilders( r.logger, resetMutableStateBuilder, func(mutableState workflow.MutableState) workflow.TaskGenerator { - return workflow.NewTaskGenerator(r.shard.GetNamespaceRegistry(), r.logger, mutableState) + return workflow.NewTaskGenerator(r.shard.GetNamespaceRegistry(), mutableState) }, ) return resetMutableStateBuilder, stateBuilder diff --git a/service/history/workflow/delete_manager.go b/service/history/workflow/delete_manager.go index 462d0aa47b1..afdbd20bb9c 100644 --- a/service/history/workflow/delete_manager.go +++ b/service/history/workflow/delete_manager.go @@ -33,20 +33,24 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/server/common" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/worker/archiver" ) type ( DeleteManager interface { - DeleteWorkflowExecution(namespaceID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error - DeleteWorkflowExecutionByRetention(namespaceID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error + AddDeleteWorkflowExecutionTask(nsID namespace.ID, we commonpb.WorkflowExecution, ms MutableState) error + DeleteWorkflowExecution(nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error + DeleteWorkflowExecutionByRetention(nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error } DeleteManagerImpl struct { @@ -55,6 +59,7 @@ type ( config *configs.Config metricsClient metrics.Client archivalClient archiver.Client + timeSource clock.TimeSource } ) @@ -65,6 +70,7 @@ func NewDeleteManager( cache Cache, config *configs.Config, archiverClient archiver.Client, + timeSource clock.TimeSource, ) *DeleteManagerImpl { deleteManager := &DeleteManagerImpl{ shard: shard, @@ -72,13 +78,51 @@ func NewDeleteManager( metricsClient: shard.GetMetricsClient(), config: config, archivalClient: archiverClient, + timeSource: timeSource, } return deleteManager } +func (m *DeleteManagerImpl) AddDeleteWorkflowExecutionTask( + nsID namespace.ID, + we commonpb.WorkflowExecution, + ms MutableState, +) error { + + if ms.IsWorkflowExecutionRunning() { + // Running workflow cannot be deleted. Close or terminate it first. + return consts.ErrWorkflowNotCompleted + } + + taskGenerator := NewTaskGenerator( + m.shard.GetNamespaceRegistry(), + ms, + ) + + deleteTask, err := taskGenerator.GenerateDeleteExecutionTask(m.timeSource.Now()) + if err != nil { + return err + } + + err = m.shard.AddTasks(&persistence.AddTasksRequest{ + ShardID: m.shard.GetShardID(), + // RangeID is set by shard + NamespaceID: nsID.String(), + WorkflowID: we.GetWorkflowId(), + RunID: we.GetRunId(), + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryTransfer: {deleteTask}, + }, + }) + if err != nil { + return err + } + + return nil +} func (m *DeleteManagerImpl) DeleteWorkflowExecution( - namespaceID namespace.ID, + nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, @@ -94,7 +138,7 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution( } err := m.deleteWorkflowExecutionInternal( - namespaceID, + nsID, we, weCtx, ms, @@ -107,7 +151,7 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution( } func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention( - namespaceID namespace.ID, + nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, @@ -122,7 +166,7 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention( } err := m.deleteWorkflowExecutionInternal( - namespaceID, + nsID, we, weCtx, ms, diff --git a/service/history/workflow/delete_manager_mock.go b/service/history/workflow/delete_manager_mock.go index 021d2182158..a638c4d6ec5 100644 --- a/service/history/workflow/delete_manager_mock.go +++ b/service/history/workflow/delete_manager_mock.go @@ -59,30 +59,44 @@ func (m *MockDeleteManager) EXPECT() *MockDeleteManagerMockRecorder { return m.recorder } +// AddDeleteWorkflowExecutionTask mocks base method. +func (m *MockDeleteManager) AddDeleteWorkflowExecutionTask(nsID namespace.ID, we v1.WorkflowExecution, ms MutableState) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddDeleteWorkflowExecutionTask", nsID, we, ms) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddDeleteWorkflowExecutionTask indicates an expected call of AddDeleteWorkflowExecutionTask. +func (mr *MockDeleteManagerMockRecorder) AddDeleteWorkflowExecutionTask(nsID, we, ms interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDeleteWorkflowExecutionTask", reflect.TypeOf((*MockDeleteManager)(nil).AddDeleteWorkflowExecutionTask), nsID, we, ms) +} + // DeleteWorkflowExecution mocks base method. -func (m *MockDeleteManager) DeleteWorkflowExecution(namespaceID namespace.ID, we v1.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error { +func (m *MockDeleteManager) DeleteWorkflowExecution(nsID namespace.ID, we v1.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteWorkflowExecution", namespaceID, we, weCtx, ms, sourceTaskVersion) + ret := m.ctrl.Call(m, "DeleteWorkflowExecution", nsID, we, weCtx, ms, sourceTaskVersion) ret0, _ := ret[0].(error) return ret0 } // DeleteWorkflowExecution indicates an expected call of DeleteWorkflowExecution. -func (mr *MockDeleteManagerMockRecorder) DeleteWorkflowExecution(namespaceID, we, weCtx, ms, sourceTaskVersion interface{}) *gomock.Call { +func (mr *MockDeleteManagerMockRecorder) DeleteWorkflowExecution(nsID, we, weCtx, ms, sourceTaskVersion interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkflowExecution", reflect.TypeOf((*MockDeleteManager)(nil).DeleteWorkflowExecution), namespaceID, we, weCtx, ms, sourceTaskVersion) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkflowExecution", reflect.TypeOf((*MockDeleteManager)(nil).DeleteWorkflowExecution), nsID, we, weCtx, ms, sourceTaskVersion) } // DeleteWorkflowExecutionByRetention mocks base method. -func (m *MockDeleteManager) DeleteWorkflowExecutionByRetention(namespaceID namespace.ID, we v1.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error { +func (m *MockDeleteManager) DeleteWorkflowExecutionByRetention(nsID namespace.ID, we v1.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteWorkflowExecutionByRetention", namespaceID, we, weCtx, ms, sourceTaskVersion) + ret := m.ctrl.Call(m, "DeleteWorkflowExecutionByRetention", nsID, we, weCtx, ms, sourceTaskVersion) ret0, _ := ret[0].(error) return ret0 } // DeleteWorkflowExecutionByRetention indicates an expected call of DeleteWorkflowExecutionByRetention. -func (mr *MockDeleteManagerMockRecorder) DeleteWorkflowExecutionByRetention(namespaceID, we, weCtx, ms, sourceTaskVersion interface{}) *gomock.Call { +func (mr *MockDeleteManagerMockRecorder) DeleteWorkflowExecutionByRetention(nsID, we, weCtx, ms, sourceTaskVersion interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkflowExecutionByRetention", reflect.TypeOf((*MockDeleteManager)(nil).DeleteWorkflowExecutionByRetention), namespaceID, we, weCtx, ms, sourceTaskVersion) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkflowExecutionByRetention", reflect.TypeOf((*MockDeleteManager)(nil).DeleteWorkflowExecutionByRetention), nsID, we, weCtx, ms, sourceTaskVersion) } diff --git a/service/history/workflow/delete_manager_test.go b/service/history/workflow/delete_manager_test.go index 6bece03e004..33502ad1c54 100644 --- a/service/history/workflow/delete_manager_test.go +++ b/service/history/workflow/delete_manager_test.go @@ -41,6 +41,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" carchiver "go.temporal.io/server/common/archiver" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" @@ -59,6 +60,7 @@ type ( mockCache *MockCache mockArchivalClient *archiver.MockClient mockShardContext *shard.MockContext + mockClock *clock.EventTimeSource deleteManager DeleteManager } @@ -83,6 +85,7 @@ func (s *deleteManagerWorkflowSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.mockCache = NewMockCache(s.controller) s.mockArchivalClient = archiver.NewMockClient(s.controller) + s.mockClock = clock.NewEventTimeSource() config := tests.NewDynamicConfig() s.mockShardContext = shard.NewMockContext(s.controller) @@ -93,6 +96,7 @@ func (s *deleteManagerWorkflowSuite) SetupTest() { s.mockCache, config, s.mockArchivalClient, + s.mockClock, ) } diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 0589663345e..58f7beb8359 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -260,7 +260,7 @@ func NewMutableState( common.FirstEventID, s.bufferEventsInDB, ) - s.taskGenerator = NewTaskGenerator(shard.GetNamespaceRegistry(), s.logger, s) + s.taskGenerator = NewTaskGenerator(shard.GetNamespaceRegistry(), s) s.workflowTaskManager = newWorkflowTaskStateMachine(s) return s diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index ba50843fa8a..95adcd43236 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -36,7 +36,6 @@ import ( enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/common/clock" - "go.temporal.io/server/common/log" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" @@ -54,7 +53,7 @@ type ( ) error GenerateDeleteExecutionTask( now time.Time, - ) error + ) (*tasks.DeleteExecutionTask, error) GenerateRecordWorkflowStartedTasks( now time.Time, startEvent *historypb.HistoryEvent, @@ -120,9 +119,7 @@ type ( TaskGeneratorImpl struct { namespaceRegistry namespace.Registry - logger log.Logger - - mutableState MutableState + mutableState MutableState } ) @@ -132,15 +129,12 @@ var _ TaskGenerator = (*TaskGeneratorImpl)(nil) func NewTaskGenerator( namespaceRegistry namespace.Registry, - logger log.Logger, mutableState MutableState, ) *TaskGeneratorImpl { mstg := &TaskGeneratorImpl{ namespaceRegistry: namespaceRegistry, - logger: logger, - - mutableState: mutableState, + mutableState: mutableState, } return mstg @@ -212,18 +206,16 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( func (r *TaskGeneratorImpl) GenerateDeleteExecutionTask( now time.Time, -) error { +) (*tasks.DeleteExecutionTask, error) { currentVersion := r.mutableState.GetCurrentVersion() - r.mutableState.AddTasks(&tasks.DeleteExecutionTask{ + return &tasks.DeleteExecutionTask{ // TaskID is set by shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: now, Version: currentVersion, - }) - - return nil + }, nil } func (r *TaskGeneratorImpl) GenerateDelayedWorkflowTasks( diff --git a/service/history/workflow/task_generator_mock.go b/service/history/workflow/task_generator_mock.go index b0d0906d2db..1c6c70d01a6 100644 --- a/service/history/workflow/task_generator_mock.go +++ b/service/history/workflow/task_generator_mock.go @@ -131,11 +131,12 @@ func (mr *MockTaskGeneratorMockRecorder) GenerateDelayedWorkflowTasks(now, start } // GenerateDeleteExecutionTask mocks base method. -func (m *MockTaskGenerator) GenerateDeleteExecutionTask(now time.Time) error { +func (m *MockTaskGenerator) GenerateDeleteExecutionTask(now time.Time) (*tasks.DeleteExecutionTask, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GenerateDeleteExecutionTask", now) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(*tasks.DeleteExecutionTask) + ret1, _ := ret[1].(error) + return ret0, ret1 } // GenerateDeleteExecutionTask indicates an expected call of GenerateDeleteExecutionTask. diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index bae7cc82412..541d8745d54 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -77,7 +77,6 @@ func (r *TaskRefresherImpl) RefreshTasks( taskGenerator := NewTaskGenerator( r.namespaceRegistry, - r.logger, mutableState, )