Skip to content

Commit

Permalink
Fix mutable state stale check for recordChildExecutionCompleted (#2821)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 10, 2022
1 parent c118f6b commit 2f43b88
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 1 deletion.
22 changes: 21 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,25 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(
return e.updateWorkflow(
ctx,
completionRequest.Clock,
api.HistoryEventConsistencyPredicate(parentInitiatedID, parentInitiatedVersion),
func(mutableState workflow.MutableState) bool {
if !mutableState.IsWorkflowExecutionRunning() {
// current branch already closed, we won't perform any operation, pass the check
return true
}

onCurrentBranch, err := historyEventOnCurrentBranch(mutableState, parentInitiatedID, parentInitiatedVersion)
if err != nil {
// can't find initiated event, potential stale mutable, fail the predicate check
return false
}
if !onCurrentBranch {
// found on different branch, since we don't record completion on a different branch, pass the check
return true
}

ci, isRunning := mutableState.GetChildExecutionInfo(parentInitiatedID)
return !(isRunning && ci.StartedId == common.EmptyEventID) // !(potential stale)
},
definition.NewWorkflowKey(
completionRequest.NamespaceId,
completionRequest.WorkflowExecution.WorkflowId,
Expand All @@ -2163,6 +2181,8 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(
// Check mutable state to make sure child execution is in pending child executions
ci, isRunning := mutableState.GetChildExecutionInfo(parentInitiatedID)
if !isRunning || ci.StartedId == common.EmptyEventID {
// note we already checked if startedID is empty (in consistency predicate)
// and reloaded mutable state
return nil, consts.ErrChildExecutionNotFound
}

Expand Down
69 changes: 69 additions & 0 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,75 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlread
s.NotNil(err)
}

func (s *engine2Suite) TestRecordChildExecutionCompleted() {
childWorkflowID := "some random child workflow ID"
childRunID := uuid.New()
childWorkflowType := "some random child workflow type"
childTaskQueueName := "some random child task queue"

request := &historyservice.RecordChildExecutionCompletedRequest{
NamespaceId: tests.NamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
},
CompletedExecution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
RunId: childRunID,
},
CompletionEvent: &historypb.HistoryEvent{
EventId: 456,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{
WorkflowExecutionCompletedEventAttributes: &historypb.WorkflowExecutionCompletedEventAttributes{},
},
},
ParentInitiatedId: 123,
ParentInitiatedVersion: 100,
}

msBuilder := workflow.TestGlobalMutableState(s.historyEngine.shard, s.mockEventsCache, log.NewTestLogger(), tests.Version, tests.RunID)
addWorkflowExecutionStartedEvent(msBuilder, commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
}, "wType", "testTaskQueue", payloads.EncodeString("input"), 25*time.Second, 20*time.Second, 200*time.Second, "identity")
ms := workflow.TestCloneToProto(msBuilder)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

// reload mutable state due to potential stale mutable state (initiated event not found)
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil).Times(2)
err := s.historyEngine.RecordChildExecutionCompleted(metrics.AddMetricsContext(context.Background()), request)
s.IsType(&serviceerror.NotFound{}, err)

// add child init event
di := addWorkflowTaskScheduledEvent(msBuilder)
workflowTasksStartEvent := addWorkflowTaskStartedEvent(msBuilder, di.ScheduleID, "testTaskQueue", uuid.New())
di.StartedID = workflowTasksStartEvent.GetEventId()
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(msBuilder, di.ScheduleID, di.StartedID, "some random identity")

initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(msBuilder, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
tests.ChildNamespace, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)
request.ParentInitiatedId = initiatedEvent.GetEventId()
request.ParentInitiatedVersion = initiatedEvent.GetVersion()

// reload mutable state due to potential stale mutable state (started event not found)
ms = workflow.TestCloneToProto(msBuilder)
gwmsResponse = &persistence.GetWorkflowExecutionResponse{State: ms}
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil).Times(2)
err = s.historyEngine.RecordChildExecutionCompleted(metrics.AddMetricsContext(context.Background()), request)
s.IsType(&serviceerror.NotFound{}, err)

// add child started event
addChildWorkflowExecutionStartedEvent(msBuilder, initiatedEvent.GetEventId(), tests.ChildNamespace, childWorkflowID, childRunID, childWorkflowType, nil)

ms = workflow.TestCloneToProto(msBuilder)
gwmsResponse = &persistence.GetWorkflowExecutionResponse{State: ms}
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)
err = s.historyEngine.RecordChildExecutionCompleted(metrics.AddMetricsContext(context.Background()), request)
s.NoError(err)
}

func (s *engine2Suite) TestVerifyChildExecutionCompletionRecorded_WorkflowNotExist() {

request := &historyservice.VerifyChildExecutionCompletionRecordedRequest{
Expand Down

0 comments on commit 2f43b88

Please sign in to comment.