Skip to content

Commit

Permalink
Fix parent close policy processor for deleted namespace (#2664)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Mar 29, 2022
1 parent c6a0cfc commit b87a3ea
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 6 deletions.
16 changes: 10 additions & 6 deletions service/history/transferQueueActiveTaskExecutor.go
Expand Up @@ -1396,15 +1396,17 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
return nil

case enumspb.PARENT_CLOSE_POLICY_TERMINATE:
childNamespaceId, err := t.registry.GetNamespaceID(namespace.Name(childInfo.GetNamespace()))
childNamespaceID, err := t.registry.GetNamespaceID(namespace.Name(childInfo.GetNamespace()))
switch err.(type) {
case nil, *serviceerror.NotFound:
case nil:
case *serviceerror.NotFound:
// If child namespace is deleted there is nothing to close.
return nil
default:
return err
}
_, err = t.historyClient.TerminateWorkflowExecution(ctx, &historyservice.TerminateWorkflowExecutionRequest{
NamespaceId: childNamespaceId.String(),
NamespaceId: childNamespaceID.String(),
TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{
Namespace: childInfo.GetNamespace(),
WorkflowExecution: &commonpb.WorkflowExecution{
Expand All @@ -1422,16 +1424,18 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
return err

case enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL:
childNamespaceId, err := t.registry.GetNamespaceID(namespace.Name(childInfo.GetNamespace()))
childNamespaceID, err := t.registry.GetNamespaceID(namespace.Name(childInfo.GetNamespace()))
switch err.(type) {
case nil, *serviceerror.NotFound:
case nil:
case *serviceerror.NotFound:
// If child namespace is deleted there is nothing to close.
return nil
default:
return err
}

_, err = t.historyClient.RequestCancelWorkflowExecution(ctx, &historyservice.RequestCancelWorkflowExecutionRequest{
NamespaceId: childNamespaceId.String(),
NamespaceId: childNamespaceID.String(),
CancelRequest: &workflowservice.RequestCancelWorkflowExecutionRequest{
Namespace: childInfo.GetNamespace(),
WorkflowExecution: &commonpb.WorkflowExecution{
Expand Down
112 changes: 112 additions & 0 deletions service/history/transferQueueActiveTaskExecutor_test.go
Expand Up @@ -1123,6 +1123,118 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
s.Nil(err)
}

func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParent_ChildInDeletedNamespace() {
execution := commonpb.WorkflowExecution{
WorkflowId: "some random workflow ID",
RunId: uuid.New(),
}
workflowType := "some random workflow type"
taskQueueName := "some random task queue"

s.mockNamespaceCache.EXPECT().GetNamespace(namespace.Name("child namespace1")).Return(tests.GlobalNamespaceEntry, nil).AnyTimes()

mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId())
_, err := mutableState.AddWorkflowExecutionStartedEvent(
execution,
&historyservice.StartWorkflowExecutionRequest{
Attempt: 1,
NamespaceId: s.namespaceID.String(),
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
WorkflowExecutionTimeout: timestamp.DurationPtr(2 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
},
},
)
s.NoError(err)

wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduleID, taskQueueName, uuid.New())
wt.StartedID = event.GetEventId()

event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt.ScheduleID, wt.StartedID, &workflowservice.RespondWorkflowTaskCompletedRequest{
Identity: "some random identity",
Commands: []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{
Namespace: "child namespace1",
WorkflowId: "child workflow1",
WorkflowType: &commonpb.WorkflowType{
Name: "child workflow type",
},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
Input: payloads.EncodeString("random input"),
ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_TERMINATE,
}},
},
{
CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{
Namespace: "child namespace1",
WorkflowId: "child workflow2",
WorkflowType: &commonpb.WorkflowType{
Name: "child workflow type",
},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
Input: payloads.EncodeString("random input"),
ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
}},
},
},
}, configs.DefaultHistoryMaxAutoResetPoints)

_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
Namespace: "child namespace1",
WorkflowId: "child workflow1",
WorkflowType: &commonpb.WorkflowType{
Name: "child workflow type",
},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
Input: payloads.EncodeString("random input"),
ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_TERMINATE,
})
s.NoError(err)

_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
Namespace: "child namespace1",
WorkflowId: "child workflow2",
WorkflowType: &commonpb.WorkflowType{
Name: "child workflow type",
},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
Input: payloads.EncodeString("random input"),
ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
})
s.NoError(err)

mutableState.FlushBufferedEvents()

taskID := int64(22)
event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil)

transferTask := &tasks.CloseExecutionTask{
WorkflowKey: definition.NewWorkflowKey(
s.namespaceID.String(),
execution.GetWorkflowId(),
execution.GetRunId(),
),
Version: s.version,
TaskID: taskID,
VisibilityTimestamp: time.Now().UTC(),
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig())

s.mockNamespaceCache.EXPECT().GetNamespaceID(namespace.Name("child namespace1")).Return(namespace.EmptyID, serviceerror.NewNotFound("namespace not found")).AnyTimes()

err = s.transferQueueActiveTaskExecutor.execute(context.Background(), transferTask, true)
s.NoError(err)
}

func (s *transferQueueActiveTaskExecutorSuite) TestProcessCancelExecution_Success() {

execution := commonpb.WorkflowExecution{
Expand Down

0 comments on commit b87a3ea

Please sign in to comment.