Skip to content

Commit

Permalink
Update parent clock on child workflow during child workflow's first w…
Browse files Browse the repository at this point in the history
…orkflow task

NOTE this is a bug fix for rare event, which will only underload & shard movement

when parent start child:
case 1: child not created, to be created
1. parent start child & send parent’s clock, child record parent clock for step 4 || 5
2. start child resp will contain the child’s clock
3. child’s clock will be recorded by parent
4. parent schedule child’s 1st workflow task with child's clock (step 3, for consistency check)
case 2: child created & need to schedule the first workflow task for child:
5. parent schedule child’s 1st workflow task with child’s clock (step 3, for consistency check)

child respond back to parent when finished:
6. child will only be able to run after parent record start child success, then schedule first workflow task (see above 3 && (4 || 5))
7. when child respond finish, child need to include parent clock for parent shard consistency check

before this PR, step 7 will use parent clock from step 1, which does not guarantee parent has reached step 3 (if shard moves && stale cache)
with this PR, step 4 || 5 will let child update child's view of parent clock.
  • Loading branch information
wxing1292 committed Feb 21, 2023
1 parent b313b7f commit f13ef76
Show file tree
Hide file tree
Showing 10 changed files with 544 additions and 400 deletions.
787 changes: 428 additions & 359 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

Expand Up @@ -320,7 +320,8 @@ message ScheduleWorkflowTaskRequest {
string namespace_id = 1;
temporal.api.common.v1.WorkflowExecution workflow_execution = 2;
bool is_first_workflow_task = 3;
temporal.server.api.clock.v1.VectorClock clock = 4;
temporal.server.api.clock.v1.VectorClock child_clock = 4;
temporal.server.api.clock.v1.VectorClock parent_clock = 5;
}

message ScheduleWorkflowTaskResponse {
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/create_workflow_util.go
Expand Up @@ -166,7 +166,7 @@ func GenerateFirstWorkflowTask(
) (int64, error) {
if parentInfo == nil {
// WorkflowTask is only created when it is not a Child Workflow and no backoff is needed
return mutableState.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
return mutableState.AddFirstWorkflowTaskScheduled(nil, startEvent, bypassTaskGeneration)
}
return 0, nil
}
Expand Down
20 changes: 15 additions & 5 deletions service/history/transferQueueActiveTaskExecutor.go
Expand Up @@ -776,7 +776,12 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
// release the context lock since we no longer need mutable state and
// the rest of logic is making RPC call, which takes time.
release(nil)
return t.createFirstWorkflowTask(ctx, task.TargetNamespaceID, childExecution, childClock)

parentClock, err := t.shard.NewVectorClock()
if err != nil {
return err
}
return t.createFirstWorkflowTask(ctx, task.TargetNamespaceID, childExecution, parentClock, childClock)
}

// remaining 2 cases:
Expand Down Expand Up @@ -865,10 +870,14 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
// Release the context lock since we no longer need mutable state and
// the rest of logic is making RPC call, which takes time.
release(nil)
parentClock, err := t.shard.NewVectorClock()
if err != nil {
return err
}
return t.createFirstWorkflowTask(ctx, task.TargetNamespaceID, &commonpb.WorkflowExecution{
WorkflowId: task.TargetWorkflowID,
RunId: childRunID,
}, childClock)
}, parentClock, childClock)
}

func (t *transferQueueActiveTaskExecutor) processResetWorkflow(
Expand Down Expand Up @@ -1064,15 +1073,16 @@ func (t *transferQueueActiveTaskExecutor) createFirstWorkflowTask(
ctx context.Context,
namespaceID string,
execution *commonpb.WorkflowExecution,
clock *clockspb.VectorClock,
parentClock *clockspb.VectorClock,
childClock *clockspb.VectorClock,
) error {
_, err := t.historyClient.ScheduleWorkflowTask(ctx, &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: namespaceID,
WorkflowExecution: execution,
IsFirstWorkflowTask: true,
Clock: clock,
ParentClock: parentClock,
ChildClock: childClock,
})

return err
}

Expand Down
95 changes: 71 additions & 24 deletions service/history/transferQueueActiveTaskExecutor_test.go
Expand Up @@ -1996,6 +1996,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su
VisibilityTimestamp: time.Now().UTC(),
}

childClock := vclock.NewVectorClock(rand.Int63(), rand.Int31(), rand.Int63())
persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), s.createChildWorkflowExecutionRequest(
Expand All @@ -2004,17 +2005,33 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su
transferTask,
mutableState,
ci,
)).Return(&historyservice.StartWorkflowExecutionResponse{RunId: childRunID}, nil)
)).Return(&historyservice.StartWorkflowExecutionResponse{RunId: childRunID, Clock: childClock}, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: tests.ChildNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
RunId: childRunID,
currentShardClock := s.mockShard.CurrentVectorClock()
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.ScheduleWorkflowTaskRequest, _ ...grpc.CallOption) (*historyservice.ScheduleWorkflowTaskResponse, error) {
parentClock := request.ParentClock
request.ParentClock = nil
s.Equal(&historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: tests.ChildNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
RunId: childRunID,
},
IsFirstWorkflowTask: true,
ParentClock: nil,
ChildClock: childClock,
}, request)
cmpResult, err := vclock.Compare(currentShardClock, parentClock)
if err != nil {
return nil, err
}
s.NoError(err)
s.True(cmpResult <= 0)
return &historyservice.ScheduleWorkflowTaskResponse{}, nil
},
IsFirstWorkflowTask: true,
}).Return(nil, nil)
)

_, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
Expand Down Expand Up @@ -2250,15 +2267,30 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: tests.ChildNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
RunId: childRunID,
currentShardClock := s.mockShard.CurrentVectorClock()
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.ScheduleWorkflowTaskRequest, _ ...grpc.CallOption) (*historyservice.ScheduleWorkflowTaskResponse, error) {
parentClock := request.ParentClock
request.ParentClock = nil
s.Equal(&historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: tests.ChildNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
RunId: childRunID,
},
IsFirstWorkflowTask: true,
ParentClock: nil,
ChildClock: childClock,
}, request)
cmpResult, err := vclock.Compare(currentShardClock, parentClock)
if err != nil {
return nil, err
}
s.NoError(err)
s.True(cmpResult <= 0)
return &historyservice.ScheduleWorkflowTaskResponse{}, nil
},
IsFirstWorkflowTask: true,
Clock: childClock,
}).Return(nil, nil)
)

_, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
Expand Down Expand Up @@ -2428,15 +2460,30 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessorStartChildExecution_

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: s.childNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childExecution.WorkflowId,
RunId: childExecution.RunId,
currentShardClock := s.mockShard.CurrentVectorClock()
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.ScheduleWorkflowTaskRequest, _ ...grpc.CallOption) (*historyservice.ScheduleWorkflowTaskResponse, error) {
parentClock := request.ParentClock
request.ParentClock = nil
s.Equal(&historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: s.childNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childExecution.WorkflowId,
RunId: childExecution.RunId,
},
IsFirstWorkflowTask: true,
ParentClock: nil,
ChildClock: childClock,
}, request)
cmpResult, err := vclock.Compare(currentShardClock, parentClock)
if err != nil {
return nil, err
}
s.NoError(err)
s.True(cmpResult <= 0)
return &historyservice.ScheduleWorkflowTaskResponse{}, nil
},
IsFirstWorkflowTask: true,
Clock: childClock,
}).Return(&historyservice.ScheduleWorkflowTaskResponse{}, nil).Times(1)
)

_, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state.go
Expand Up @@ -118,7 +118,7 @@ type (
AddWorkflowTaskCompletedEvent(*WorkflowTaskInfo, *workflowservice.RespondWorkflowTaskCompletedRequest, int) (*historypb.HistoryEvent, error)
AddWorkflowTaskFailedEvent(workflowTask *WorkflowTaskInfo, cause enumspb.WorkflowTaskFailedCause, failure *failurepb.Failure, identity, binChecksum, baseRunID, newRunID string, forkEventVersion int64) (*historypb.HistoryEvent, error)
AddWorkflowTaskScheduleToStartTimeoutEvent(int64) (*historypb.HistoryEvent, error)
AddFirstWorkflowTaskScheduled(event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error)
AddFirstWorkflowTaskScheduled(parentClock *clockspb.VectorClock, event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error)
AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *time.Time, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
Expand Down
16 changes: 14 additions & 2 deletions service/history/workflow/mutable_state_impl.go
Expand Up @@ -1521,7 +1521,11 @@ func (ms *MutableStateImpl) addWorkflowExecutionStartedEventForContinueAsNew(
if err != nil {
return nil, err
}
if _, err = ms.AddFirstWorkflowTaskScheduled(event, false); err != nil {
var parentClock *clockspb.VectorClock
if parentExecutionInfo != nil {
parentClock = parentExecutionInfo.Clock
}
if _, err = ms.AddFirstWorkflowTaskScheduled(parentClock, event, false); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1727,14 +1731,22 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionStartedEvent(
// by the startEvent's FirstWorkflowTaskBackoff.
// Returns the workflow task's scheduled event ID if a task was scheduled, 0 otherwise.
func (ms *MutableStateImpl) AddFirstWorkflowTaskScheduled(
parentClock *clockspb.VectorClock,
startEvent *historypb.HistoryEvent,
bypassTaskGeneration bool,
) (int64, error) {
opTag := tag.WorkflowActionWorkflowTaskScheduled
if err := ms.checkMutability(opTag); err != nil {
return common.EmptyEventID, err
}
return ms.workflowTaskManager.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
scheduleEventID, err := ms.workflowTaskManager.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
if err != nil {
return 0, err
}
if parentClock != nil {
ms.executionInfo.ParentClock = parentClock
}
return scheduleEventID, nil
}

func (ms *MutableStateImpl) AddWorkflowTaskScheduledEvent(
Expand Down
8 changes: 4 additions & 4 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion service/history/workflow/retry.go
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/api/workflowservice/v1"
"golang.org/x/exp/slices"

"go.temporal.io/server/api/clock/v1"
"go.temporal.io/server/api/historyservice/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -266,7 +267,11 @@ func SetupNewWorkflowForRetryOrCron(
if err != nil {
return serviceerror.NewInternal("Failed to add workflow execution started event.")
}
if _, err = newMutableState.AddFirstWorkflowTaskScheduled(event, false); err != nil {
var parentClock *clock.VectorClock
if parentInfo != nil {
parentClock = parentInfo.Clock
}
if _, err = newMutableState.AddFirstWorkflowTaskScheduled(parentClock, event, false); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/workflowTaskHandlerCallbacks.go
Expand Up @@ -123,7 +123,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled(

return api.GetAndUpdateWorkflowWithNew(
ctx,
req.Clock,
req.ChildClock,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
req.NamespaceId,
Expand All @@ -146,7 +146,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled(
if err != nil {
return nil, err
}
if _, err := mutableState.AddFirstWorkflowTaskScheduled(startEvent, false); err != nil {
if _, err := mutableState.AddFirstWorkflowTaskScheduled(req.ParentClock, startEvent, false); err != nil {
return nil, err
}

Expand Down

0 comments on commit f13ef76

Please sign in to comment.