Skip to content

Commit

Permalink
Always schedule first workflow task for started abandoned child (#2414)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 5, 2022
1 parent aa257cd commit 3912a3e
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 23 deletions.
15 changes: 11 additions & 4 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5397,10 +5397,16 @@ func addSignaledEvent(builder workflow.MutableState, initiatedID int64, namespac
return event
}

func addStartChildWorkflowExecutionInitiatedEvent(builder workflow.MutableState, workflowTaskCompletedID int64,
createRequestID string, namespace namespace.Name, workflowID, workflowType, taskQueue string, input *commonpb.Payloads,
executionTimeout, runTimeout, taskTimeout time.Duration) (*historypb.HistoryEvent,
*persistencespb.ChildExecutionInfo) {
func addStartChildWorkflowExecutionInitiatedEvent(
builder workflow.MutableState,
workflowTaskCompletedID int64,
createRequestID string,
namespace namespace.Name,
workflowID, workflowType, taskQueue string,
input *commonpb.Payloads,
executionTimeout, runTimeout, taskTimeout time.Duration,
parentClosePolicy enumspb.ParentClosePolicy,
) (*historypb.HistoryEvent, *persistencespb.ChildExecutionInfo) {

event, cei, _ := builder.AddStartChildWorkflowExecutionInitiatedEvent(workflowTaskCompletedID, createRequestID,
&commandpb.StartChildWorkflowExecutionCommandAttributes{
Expand All @@ -5413,6 +5419,7 @@ func addStartChildWorkflowExecutionInitiatedEvent(builder workflow.MutableState,
WorkflowRunTimeout: &runTimeout,
WorkflowTaskTimeout: &taskTimeout,
Control: "",
ParentClosePolicy: parentClosePolicy,
})
return event, cei
}
Expand Down
64 changes: 49 additions & 15 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
if mutableState == nil {
return nil
}

Expand All @@ -645,6 +645,54 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
return nil
}

// workflow running or not, child started or not, parent close policy is abandon or not
// 8 cases in total
workflowRunning := mutableState.IsWorkflowExecutionRunning()
childStarted := childInfo.StartedId != common.EmptyEventID
if !workflowRunning && (!childStarted || childInfo.ParentClosePolicy != enumspb.PARENT_CLOSE_POLICY_ABANDON) {
// three cases here:
// case 1: workflow not running, child started, parent close policy is not abandon
// case 2: workflow not running, child not started, parent close policy is not abandon
// case 3: workflow not running, child not started, parent close policy is abandon
//
// NOTE: ideally for case 3, we should continue to start child. However, with current start child
// and standby start child verification logic, we can't do that because:
// 1. Once workflow is closed, we can't update mutable state or record child started event.
// If the RPC call for scheduling first workflow task times out but the call actually succeeds on child workflow.
// Then the child workflow can run, complete and another unrelated workflow can reuse this workflowID.
// Now when the start child task retries, we can't rely on requestID to dedup the start child call. (We can use runID instead of requestID to dedup)
// 2. No update to mutable state and child started event means we are not able to replicate the information
// to the standby cluster, so standby start child logic won't be able to verify the child has started.
// To resolve the issue above, we need to
// 1. Start child workflow and schedule the first workflow task in one transaction. Use runID to perform deduplication
// 2. Standby start child logic need to verify if child worflow actually started instead of relying on the information
// in parent mutable state.
return nil
}

// ChildExecution already started, just create WorkflowTask and complete transfer task
// If parent already closed, since child workflow started event already written to history,
// still schedule the workflowTask if the parent close policy is Abandon.
// If parent close policy cancel or terminate, parent close policy will be applied in another
// transfer task.
// case 4, 5: workflow started, child started, parent close policy is or is not abandon
// case 6: workflow closed, child started, parent close policy is abandon
if childStarted {
childExecution := &commonpb.WorkflowExecution{
WorkflowId: childInfo.StartedWorkflowId,
RunId: childInfo.StartedRunId,
}
childClock := childInfo.Clock
// NOTE: do not access anything related mutable state after this lock release
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
return t.createFirstWorkflowTask(ctx, task.TargetNamespaceID, childExecution, childClock)
}

// remaining 2 cases:
// case 7, 8: workflow running, child not started, parent close policy is or is not abandon

initiatedEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, task.InitiatedID)
if err != nil {
return err
Expand Down Expand Up @@ -681,20 +729,6 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
targetNamespaceName = namespaceEntry.Name()
}

// ChildExecution already started, just create WorkflowTask and complete transfer task
if childInfo.StartedId != common.EmptyEventID {
childExecution := &commonpb.WorkflowExecution{
WorkflowId: childInfo.StartedWorkflowId,
RunId: childInfo.StartedRunId,
}
childClock := childInfo.Clock
// NOTE: do not access anything related mutable state after this lock release
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
return t.createFirstWorkflowTask(ctx, task.TargetNamespaceID, childExecution, childClock)
}

childRunID, childClock, err := t.startWorkflowWithRetry(
ctx,
task,
Expand Down
116 changes: 114 additions & 2 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,8 +1828,20 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su

taskID := int64(59)

event, ci := addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
s.childNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second)
event, ci := addStartChildWorkflowExecutionInitiatedEvent(
mutableState,
event.GetEventId(),
uuid.New(),
s.childNamespace,
childWorkflowID,
childWorkflowType,
childTaskQueueName,
nil,
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)

transferTask := &tasks.StartChildExecutionTask{
WorkflowKey: definition.NewWorkflowKey(
Expand Down Expand Up @@ -1918,6 +1930,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Fa
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)

transferTask := &tasks.StartChildExecutionTask{
Expand Down Expand Up @@ -1999,6 +2012,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Fa
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)

transferTask := &tasks.StartChildExecutionTask{
Expand Down Expand Up @@ -2073,6 +2087,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)

transferTask := &tasks.StartChildExecutionTask{
Expand Down Expand Up @@ -2164,6 +2179,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Du
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_TERMINATE,
)

transferTask := &tasks.StartChildExecutionTask{
Expand Down Expand Up @@ -2199,6 +2215,102 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Du
s.Nil(err)
}

func (s *transferQueueActiveTaskExecutorSuite) TestProcessorStartChildExecution_ChildStarted_ParentClosed() {
execution := commonpb.WorkflowExecution{
WorkflowId: "some random workflow ID",
RunId: uuid.New(),
}
workflowType := "some random workflow type"
taskQueueName := "some random task queue"

childExecution := commonpb.WorkflowExecution{
WorkflowId: "some random child workflow ID",
RunId: uuid.New(),
}
childWorkflowType := "some random child workflow type"
childTaskQueueName := "some random child task queue"

mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId())
_, err := mutableState.AddWorkflowExecutionStartedEvent(
execution,
&historyservice.StartWorkflowExecutionRequest{
Attempt: 1,
NamespaceId: s.namespaceID.String(),
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
WorkflowExecutionTimeout: timestamp.DurationPtr(2 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
},
},
)
s.Nil(err)

di := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, di.ScheduleID, taskQueueName, uuid.New())
di.StartedID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, "some random identity")

taskID := int64(59)

event, ci := addStartChildWorkflowExecutionInitiatedEvent(
mutableState,
event.GetEventId(),
uuid.New(),
s.childNamespace,
childExecution.GetWorkflowId(),
childWorkflowType,
childTaskQueueName,
nil,
1*time.Second,
1*time.Second,
1*time.Second,
enumspb.PARENT_CLOSE_POLICY_ABANDON,
)

transferTask := &tasks.StartChildExecutionTask{
WorkflowKey: definition.NewWorkflowKey(
s.namespaceID.String(),
execution.GetWorkflowId(),
execution.GetRunId(),
),
Version: s.version,
TargetNamespaceID: tests.ChildNamespaceID.String(),
TargetWorkflowID: childExecution.GetWorkflowId(),
TaskID: taskID,
InitiatedID: event.GetEventId(),
VisibilityTimestamp: time.Now().UTC(),
}
childClock := &clockpb.ShardClock{
Id: rand.Int31(),
Clock: rand.Int63(),
}
event = addChildWorkflowExecutionStartedEvent(mutableState, event.GetEventId(), tests.ChildNamespace, childExecution.GetWorkflowId(), childExecution.GetRunId(), childWorkflowType, childClock)
ci.StartedId = event.GetEventId()
di = addWorkflowTaskScheduledEvent(mutableState)
event = addWorkflowTaskStartedEvent(mutableState, di.ScheduleID, taskQueueName, "some random identity")
di.StartedID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, "some random identity")
event = addCompleteWorkflowEvent(mutableState, event.EventId, nil)
// Flush buffered events so real IDs get assigned
mutableState.FlushBufferedEvents()

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: s.childNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childExecution.WorkflowId,
RunId: childExecution.RunId,
},
IsFirstWorkflowTask: true,
Clock: childClock,
}).Return(&historyservice.ScheduleWorkflowTaskResponse{}, nil).Times(1)

err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
}

func (s *transferQueueActiveTaskExecutorSuite) TestCopySearchAttributes() {
var input map[string]*commonpb.Payload
s.Nil(copySearchAttributes(input))
Expand Down
1 change: 1 addition & 0 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution(
if childWorkflowInfo.StartedId != common.EmptyEventID {
return nil, nil
}
// TODO: standby logic should verify if first workflow task is scheduled or not as well?

return getHistoryResendInfo(mutableState)
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/transferQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P

taskID := int64(59)
event, _ = addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
tests.ChildNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second)
tests.ChildNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_ABANDON)
nextEventID := event.GetEventId()

now := time.Now().UTC()
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_S

taskID := int64(59)
event, childInfo := addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
tests.ChildNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second)
tests.ChildNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_ABANDON)

now := time.Now().UTC()
transferTask := &tasks.StartChildExecutionTask{
Expand Down

0 comments on commit 3912a3e

Please sign in to comment.