Skip to content

Commit

Permalink
Update parent clock on child during creation of first workflow task (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed Feb 21, 2023
1 parent b313b7f commit 4d5b279
Show file tree
Hide file tree
Showing 10 changed files with 544 additions and 400 deletions.
787 changes: 428 additions & 359 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

Expand Up @@ -320,7 +320,8 @@ message ScheduleWorkflowTaskRequest {
string namespace_id = 1;
temporal.api.common.v1.WorkflowExecution workflow_execution = 2;
bool is_first_workflow_task = 3;
temporal.server.api.clock.v1.VectorClock clock = 4;
temporal.server.api.clock.v1.VectorClock child_clock = 4;
temporal.server.api.clock.v1.VectorClock parent_clock = 5;
}

message ScheduleWorkflowTaskResponse {
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/create_workflow_util.go
Expand Up @@ -166,7 +166,7 @@ func GenerateFirstWorkflowTask(
) (int64, error) {
if parentInfo == nil {
// WorkflowTask is only created when it is not a Child Workflow and no backoff is needed
return mutableState.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
return mutableState.AddFirstWorkflowTaskScheduled(nil, startEvent, bypassTaskGeneration)
}
return 0, nil
}
Expand Down
20 changes: 15 additions & 5 deletions service/history/transferQueueActiveTaskExecutor.go
Expand Up @@ -776,7 +776,12 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
// release the context lock since we no longer need mutable state and
// the rest of logic is making RPC call, which takes time.
release(nil)
return t.createFirstWorkflowTask(ctx, task.TargetNamespaceID, childExecution, childClock)

parentClock, err := t.shard.NewVectorClock()
if err != nil {
return err
}
return t.createFirstWorkflowTask(ctx, task.TargetNamespaceID, childExecution, parentClock, childClock)
}

// remaining 2 cases:
Expand Down Expand Up @@ -865,10 +870,14 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
// Release the context lock since we no longer need mutable state and
// the rest of logic is making RPC call, which takes time.
release(nil)
parentClock, err := t.shard.NewVectorClock()
if err != nil {
return err
}
return t.createFirstWorkflowTask(ctx, task.TargetNamespaceID, &commonpb.WorkflowExecution{
WorkflowId: task.TargetWorkflowID,
RunId: childRunID,
}, childClock)
}, parentClock, childClock)
}

func (t *transferQueueActiveTaskExecutor) processResetWorkflow(
Expand Down Expand Up @@ -1064,15 +1073,16 @@ func (t *transferQueueActiveTaskExecutor) createFirstWorkflowTask(
ctx context.Context,
namespaceID string,
execution *commonpb.WorkflowExecution,
clock *clockspb.VectorClock,
parentClock *clockspb.VectorClock,
childClock *clockspb.VectorClock,
) error {
_, err := t.historyClient.ScheduleWorkflowTask(ctx, &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: namespaceID,
WorkflowExecution: execution,
IsFirstWorkflowTask: true,
Clock: clock,
ParentClock: parentClock,
ChildClock: childClock,
})

return err
}

Expand Down
95 changes: 71 additions & 24 deletions service/history/transferQueueActiveTaskExecutor_test.go
Expand Up @@ -1996,6 +1996,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su
VisibilityTimestamp: time.Now().UTC(),
}

childClock := vclock.NewVectorClock(rand.Int63(), rand.Int31(), rand.Int63())
persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), s.createChildWorkflowExecutionRequest(
Expand All @@ -2004,17 +2005,33 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su
transferTask,
mutableState,
ci,
)).Return(&historyservice.StartWorkflowExecutionResponse{RunId: childRunID}, nil)
)).Return(&historyservice.StartWorkflowExecutionResponse{RunId: childRunID, Clock: childClock}, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: tests.ChildNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
RunId: childRunID,
currentShardClock := s.mockShard.CurrentVectorClock()
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.ScheduleWorkflowTaskRequest, _ ...grpc.CallOption) (*historyservice.ScheduleWorkflowTaskResponse, error) {
parentClock := request.ParentClock
request.ParentClock = nil
s.Equal(&historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: tests.ChildNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
RunId: childRunID,
},
IsFirstWorkflowTask: true,
ParentClock: nil,
ChildClock: childClock,
}, request)
cmpResult, err := vclock.Compare(currentShardClock, parentClock)
if err != nil {
return nil, err
}
s.NoError(err)
s.True(cmpResult <= 0)
return &historyservice.ScheduleWorkflowTaskResponse{}, nil
},
IsFirstWorkflowTask: true,
}).Return(nil, nil)
)

_, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
Expand Down Expand Up @@ -2250,15 +2267,30 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: tests.ChildNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
RunId: childRunID,
currentShardClock := s.mockShard.CurrentVectorClock()
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.ScheduleWorkflowTaskRequest, _ ...grpc.CallOption) (*historyservice.ScheduleWorkflowTaskResponse, error) {
parentClock := request.ParentClock
request.ParentClock = nil
s.Equal(&historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: tests.ChildNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
RunId: childRunID,
},
IsFirstWorkflowTask: true,
ParentClock: nil,
ChildClock: childClock,
}, request)
cmpResult, err := vclock.Compare(currentShardClock, parentClock)
if err != nil {
return nil, err
}
s.NoError(err)
s.True(cmpResult <= 0)
return &historyservice.ScheduleWorkflowTaskResponse{}, nil
},
IsFirstWorkflowTask: true,
Clock: childClock,
}).Return(nil, nil)
)

_, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
Expand Down Expand Up @@ -2428,15 +2460,30 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessorStartChildExecution_

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), &historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: s.childNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childExecution.WorkflowId,
RunId: childExecution.RunId,
currentShardClock := s.mockShard.CurrentVectorClock()
s.mockHistoryClient.EXPECT().ScheduleWorkflowTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.ScheduleWorkflowTaskRequest, _ ...grpc.CallOption) (*historyservice.ScheduleWorkflowTaskResponse, error) {
parentClock := request.ParentClock
request.ParentClock = nil
s.Equal(&historyservice.ScheduleWorkflowTaskRequest{
NamespaceId: s.childNamespaceID.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: childExecution.WorkflowId,
RunId: childExecution.RunId,
},
IsFirstWorkflowTask: true,
ParentClock: nil,
ChildClock: childClock,
}, request)
cmpResult, err := vclock.Compare(currentShardClock, parentClock)
if err != nil {
return nil, err
}
s.NoError(err)
s.True(cmpResult <= 0)
return &historyservice.ScheduleWorkflowTaskResponse{}, nil
},
IsFirstWorkflowTask: true,
Clock: childClock,
}).Return(&historyservice.ScheduleWorkflowTaskResponse{}, nil).Times(1)
)

_, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state.go
Expand Up @@ -118,7 +118,7 @@ type (
AddWorkflowTaskCompletedEvent(*WorkflowTaskInfo, *workflowservice.RespondWorkflowTaskCompletedRequest, int) (*historypb.HistoryEvent, error)
AddWorkflowTaskFailedEvent(workflowTask *WorkflowTaskInfo, cause enumspb.WorkflowTaskFailedCause, failure *failurepb.Failure, identity, binChecksum, baseRunID, newRunID string, forkEventVersion int64) (*historypb.HistoryEvent, error)
AddWorkflowTaskScheduleToStartTimeoutEvent(int64) (*historypb.HistoryEvent, error)
AddFirstWorkflowTaskScheduled(event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error)
AddFirstWorkflowTaskScheduled(parentClock *clockspb.VectorClock, event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error)
AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *time.Time, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
Expand Down
16 changes: 14 additions & 2 deletions service/history/workflow/mutable_state_impl.go
Expand Up @@ -1521,7 +1521,11 @@ func (ms *MutableStateImpl) addWorkflowExecutionStartedEventForContinueAsNew(
if err != nil {
return nil, err
}
if _, err = ms.AddFirstWorkflowTaskScheduled(event, false); err != nil {
var parentClock *clockspb.VectorClock
if parentExecutionInfo != nil {
parentClock = parentExecutionInfo.Clock
}
if _, err = ms.AddFirstWorkflowTaskScheduled(parentClock, event, false); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1727,14 +1731,22 @@ func (ms *MutableStateImpl) ReplicateWorkflowExecutionStartedEvent(
// by the startEvent's FirstWorkflowTaskBackoff.
// Returns the workflow task's scheduled event ID if a task was scheduled, 0 otherwise.
func (ms *MutableStateImpl) AddFirstWorkflowTaskScheduled(
parentClock *clockspb.VectorClock,
startEvent *historypb.HistoryEvent,
bypassTaskGeneration bool,
) (int64, error) {
opTag := tag.WorkflowActionWorkflowTaskScheduled
if err := ms.checkMutability(opTag); err != nil {
return common.EmptyEventID, err
}
return ms.workflowTaskManager.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
scheduleEventID, err := ms.workflowTaskManager.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
if err != nil {
return 0, err
}
if parentClock != nil {
ms.executionInfo.ParentClock = parentClock
}
return scheduleEventID, nil
}

func (ms *MutableStateImpl) AddWorkflowTaskScheduledEvent(
Expand Down
8 changes: 4 additions & 4 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion service/history/workflow/retry.go
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/api/workflowservice/v1"
"golang.org/x/exp/slices"

"go.temporal.io/server/api/clock/v1"
"go.temporal.io/server/api/historyservice/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -266,7 +267,11 @@ func SetupNewWorkflowForRetryOrCron(
if err != nil {
return serviceerror.NewInternal("Failed to add workflow execution started event.")
}
if _, err = newMutableState.AddFirstWorkflowTaskScheduled(event, false); err != nil {
var parentClock *clock.VectorClock
if parentInfo != nil {
parentClock = parentInfo.Clock
}
if _, err = newMutableState.AddFirstWorkflowTaskScheduled(parentClock, event, false); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/workflowTaskHandlerCallbacks.go
Expand Up @@ -123,7 +123,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled(

return api.GetAndUpdateWorkflowWithNew(
ctx,
req.Clock,
req.ChildClock,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
req.NamespaceId,
Expand All @@ -146,7 +146,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled(
if err != nil {
return nil, err
}
if _, err := mutableState.AddFirstWorkflowTaskScheduled(startEvent, false); err != nil {
if _, err := mutableState.AddFirstWorkflowTaskScheduled(req.ParentClock, startEvent, false); err != nil {
return nil, err
}

Expand Down

0 comments on commit 4d5b279

Please sign in to comment.