Skip to content

Commit

Permalink
Fix DeleteWorkflowExecution API when delete non current execution (#2484
Browse files Browse the repository at this point in the history
)
  • Loading branch information
alexshtin committed Feb 17, 2022
1 parent 1d33c3e commit cbd1b5e
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 58 deletions.
39 changes: 16 additions & 23 deletions service/history/historyEngine.go
Expand Up @@ -158,6 +158,7 @@ func NewEngineWithShardContext(
historyCache,
config,
archivalClient,
shard.GetTimeSource(),
)

historyEngImpl := &historyEngineImpl{
Expand Down Expand Up @@ -2267,31 +2268,23 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(

func (e *historyEngineImpl) DeleteWorkflowExecution(
ctx context.Context,
deleteRequest *historyservice.DeleteWorkflowExecutionRequest,
) error {

return e.updateWorkflow(
ctx,
namespace.ID(deleteRequest.NamespaceId),
*deleteRequest.GetWorkflowExecution(),
func(weCtx workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
if mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowNotCompleted // workflow is running, cannot be deleted
}

taskGenerator := workflow.NewTaskGenerator(
e.shard.GetNamespaceRegistry(),
e.logger,
mutableState,
)
request *historyservice.DeleteWorkflowExecutionRequest,
) (retError error) {
nsID := namespace.ID(request.GetNamespaceId())

err := taskGenerator.GenerateDeleteExecutionTask(e.timeSource.Now())
if err != nil {
return nil, err
}
wfCtx, err := e.loadWorkflow(ctx, nsID, request.GetWorkflowExecution().GetWorkflowId(), request.GetWorkflowExecution().GetRunId())
if err != nil {
return err
}
defer func() { wfCtx.getReleaseFn()(retError) }()

return updateWorkflowWithoutWorkflowTask, nil
})
return e.workflowDeleteManager.AddDeleteWorkflowExecutionTask(
nsID,
commonpb.WorkflowExecution{
WorkflowId: request.GetWorkflowExecution().GetWorkflowId(),
RunId: request.GetWorkflowExecution().GetRunId(),
},
wfCtx.getMutableState())
}

// RecordChildExecutionCompleted records the completion of child execution into parent execution history
Expand Down
2 changes: 1 addition & 1 deletion service/history/nDCHistoryReplicator.go
Expand Up @@ -176,7 +176,7 @@ func newNDCHistoryReplicator(
logger,
state,
func(mutableState workflow.MutableState) workflow.TaskGenerator {
return workflow.NewTaskGenerator(shard.GetNamespaceRegistry(), logger, mutableState)
return workflow.NewTaskGenerator(shard.GetNamespaceRegistry(), mutableState)
},
)
},
Expand Down
2 changes: 1 addition & 1 deletion service/history/nDCStateRebuilder.go
Expand Up @@ -213,7 +213,7 @@ func (r *nDCStateRebuilderImpl) initializeBuilders(
r.logger,
resetMutableStateBuilder,
func(mutableState workflow.MutableState) workflow.TaskGenerator {
return workflow.NewTaskGenerator(r.shard.GetNamespaceRegistry(), r.logger, mutableState)
return workflow.NewTaskGenerator(r.shard.GetNamespaceRegistry(), mutableState)
},
)
return resetMutableStateBuilder, stateBuilder
Expand Down
56 changes: 50 additions & 6 deletions service/history/workflow/delete_manager.go
Expand Up @@ -33,20 +33,24 @@ import (
enumspb "go.temporal.io/api/enums/v1"

"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/worker/archiver"
)

type (
DeleteManager interface {
DeleteWorkflowExecution(namespaceID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
DeleteWorkflowExecutionByRetention(namespaceID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
AddDeleteWorkflowExecutionTask(nsID namespace.ID, we commonpb.WorkflowExecution, ms MutableState) error
DeleteWorkflowExecution(nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
DeleteWorkflowExecutionByRetention(nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
}

DeleteManagerImpl struct {
Expand All @@ -55,6 +59,7 @@ type (
config *configs.Config
metricsClient metrics.Client
archivalClient archiver.Client
timeSource clock.TimeSource
}
)

Expand All @@ -65,20 +70,59 @@ func NewDeleteManager(
cache Cache,
config *configs.Config,
archiverClient archiver.Client,
timeSource clock.TimeSource,
) *DeleteManagerImpl {
deleteManager := &DeleteManagerImpl{
shard: shard,
historyCache: cache,
metricsClient: shard.GetMetricsClient(),
config: config,
archivalClient: archiverClient,
timeSource: timeSource,
}

return deleteManager
}
func (m *DeleteManagerImpl) AddDeleteWorkflowExecutionTask(
nsID namespace.ID,
we commonpb.WorkflowExecution,
ms MutableState,
) error {

if ms.IsWorkflowExecutionRunning() {
// Running workflow cannot be deleted. Close or terminate it first.
return consts.ErrWorkflowNotCompleted
}

taskGenerator := NewTaskGenerator(
m.shard.GetNamespaceRegistry(),
ms,
)

deleteTask, err := taskGenerator.GenerateDeleteExecutionTask(m.timeSource.Now())
if err != nil {
return err
}

err = m.shard.AddTasks(&persistence.AddTasksRequest{
ShardID: m.shard.GetShardID(),
// RangeID is set by shard
NamespaceID: nsID.String(),
WorkflowID: we.GetWorkflowId(),
RunID: we.GetRunId(),
Tasks: map[tasks.Category][]tasks.Task{
tasks.CategoryTransfer: {deleteTask},
},
})
if err != nil {
return err
}

return nil
}

func (m *DeleteManagerImpl) DeleteWorkflowExecution(
namespaceID namespace.ID,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
Expand All @@ -94,7 +138,7 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution(
}

err := m.deleteWorkflowExecutionInternal(
namespaceID,
nsID,
we,
weCtx,
ms,
Expand All @@ -107,7 +151,7 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution(
}

func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
namespaceID namespace.ID,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
Expand All @@ -122,7 +166,7 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
}

err := m.deleteWorkflowExecutionInternal(
namespaceID,
nsID,
we,
weCtx,
ms,
Expand Down
30 changes: 22 additions & 8 deletions service/history/workflow/delete_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions service/history/workflow/delete_manager_test.go
Expand Up @@ -41,6 +41,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
carchiver "go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand All @@ -59,6 +60,7 @@ type (
mockCache *MockCache
mockArchivalClient *archiver.MockClient
mockShardContext *shard.MockContext
mockClock *clock.EventTimeSource

deleteManager DeleteManager
}
Expand All @@ -83,6 +85,7 @@ func (s *deleteManagerWorkflowSuite) SetupTest() {
s.controller = gomock.NewController(s.T())
s.mockCache = NewMockCache(s.controller)
s.mockArchivalClient = archiver.NewMockClient(s.controller)
s.mockClock = clock.NewEventTimeSource()

config := tests.NewDynamicConfig()
s.mockShardContext = shard.NewMockContext(s.controller)
Expand All @@ -93,6 +96,7 @@ func (s *deleteManagerWorkflowSuite) SetupTest() {
s.mockCache,
config,
s.mockArchivalClient,
s.mockClock,
)
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Expand Up @@ -260,7 +260,7 @@ func NewMutableState(
common.FirstEventID,
s.bufferEventsInDB,
)
s.taskGenerator = NewTaskGenerator(shard.GetNamespaceRegistry(), s.logger, s)
s.taskGenerator = NewTaskGenerator(shard.GetNamespaceRegistry(), s)
s.workflowTaskManager = newWorkflowTaskStateMachine(s)

return s
Expand Down
20 changes: 6 additions & 14 deletions service/history/workflow/task_generator.go
Expand Up @@ -36,7 +36,6 @@ import (

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
Expand All @@ -54,7 +53,7 @@ type (
) error
GenerateDeleteExecutionTask(
now time.Time,
) error
) (*tasks.DeleteExecutionTask, error)
GenerateRecordWorkflowStartedTasks(
now time.Time,
startEvent *historypb.HistoryEvent,
Expand Down Expand Up @@ -120,9 +119,7 @@ type (

TaskGeneratorImpl struct {
namespaceRegistry namespace.Registry
logger log.Logger

mutableState MutableState
mutableState MutableState
}
)

Expand All @@ -132,15 +129,12 @@ var _ TaskGenerator = (*TaskGeneratorImpl)(nil)

func NewTaskGenerator(
namespaceRegistry namespace.Registry,
logger log.Logger,
mutableState MutableState,
) *TaskGeneratorImpl {

mstg := &TaskGeneratorImpl{
namespaceRegistry: namespaceRegistry,
logger: logger,

mutableState: mutableState,
mutableState: mutableState,
}

return mstg
Expand Down Expand Up @@ -212,18 +206,16 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(

func (r *TaskGeneratorImpl) GenerateDeleteExecutionTask(
now time.Time,
) error {
) (*tasks.DeleteExecutionTask, error) {

currentVersion := r.mutableState.GetCurrentVersion()

r.mutableState.AddTasks(&tasks.DeleteExecutionTask{
return &tasks.DeleteExecutionTask{
// TaskID is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
VisibilityTimestamp: now,
Version: currentVersion,
})

return nil
}, nil
}

func (r *TaskGeneratorImpl) GenerateDelayedWorkflowTasks(
Expand Down
7 changes: 4 additions & 3 deletions service/history/workflow/task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion service/history/workflow/task_refresher.go
Expand Up @@ -77,7 +77,6 @@ func (r *TaskRefresherImpl) RefreshTasks(

taskGenerator := NewTaskGenerator(
r.namespaceRegistry,
r.logger,
mutableState,
)

Expand Down

0 comments on commit cbd1b5e

Please sign in to comment.