Skip to content

Commit

Permalink
Validation API should use low priority lock (#4140)
Browse files Browse the repository at this point in the history
* Use low priority lock for VerifyFirstWorkflowTaskScheduled API
* Use low priority lock for VerifyChildExecutionCompletionRecorded API
* Rename caller type to lock priority since some internal API use "low priority lock"
  • Loading branch information
wxing1292 committed Apr 13, 2023
1 parent 55f1589 commit a89012e
Show file tree
Hide file tree
Showing 37 changed files with 130 additions and 94 deletions.
13 changes: 11 additions & 2 deletions service/history/api/consistency_checker.go
Expand Up @@ -59,6 +59,7 @@ type (
reqClock *clockspb.VectorClock,
consistencyPredicate MutableStateConsistencyPredicate,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (WorkflowContext, error)
}

Expand Down Expand Up @@ -108,6 +109,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
reqClock *clockspb.VectorClock,
consistencyPredicate MutableStateConsistencyPredicate,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (WorkflowContext, error) {
if reqClock != nil {
currentClock := c.shardContext.CurrentVectorClock()
Expand All @@ -117,6 +119,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
reqClock,
currentClock,
workflowKey,
lockPriority,
)
}
// request vector clock cannot is not comparable with current shard vector clock
Expand All @@ -133,6 +136,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
&shardOwnershipAsserted,
consistencyPredicate,
workflowKey,
lockPriority,
)
}
return c.getCurrentWorkflowContext(
Expand All @@ -141,6 +145,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
consistencyPredicate,
workflowKey.NamespaceID,
workflowKey.WorkflowID,
lockPriority,
)
}

Expand All @@ -149,6 +154,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
reqClock *clockspb.VectorClock,
currentClock *clockspb.VectorClock,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (WorkflowContext, error) {
cmpResult, err := vclock.Compare(reqClock, currentClock)
if err != nil {
Expand All @@ -170,7 +176,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
WorkflowId: workflowKey.WorkflowID,
RunId: workflowKey.RunID,
},
workflow.CallerTypeAPI,
lockPriority,
)
if err != nil {
return nil, err
Expand All @@ -189,6 +195,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck(
shardOwnershipAsserted *bool,
consistencyPredicate MutableStateConsistencyPredicate,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (WorkflowContext, error) {
if len(workflowKey.RunID) == 0 {
return nil, serviceerror.NewInternal(fmt.Sprintf(
Expand All @@ -203,7 +210,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck(
WorkflowId: workflowKey.WorkflowID,
RunId: workflowKey.RunID,
},
workflow.CallerTypeAPI,
lockPriority,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -245,6 +252,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext(
consistencyPredicate MutableStateConsistencyPredicate,
namespaceID string,
workflowID string,
lockPriority workflow.LockPriority,
) (WorkflowContext, error) {
runID, err := c.getCurrentRunID(
ctx,
Expand All @@ -260,6 +268,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext(
shardOwnershipAsserted,
consistencyPredicate,
definition.NewWorkflowKey(namespaceID, workflowID, runID),
lockPriority,
)
if err != nil {
return nil, err
Expand Down
15 changes: 10 additions & 5 deletions service/history/api/consistency_checker_test.go
Expand Up @@ -115,7 +115,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState, nil)

Expand All @@ -124,6 +124,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.NoError(err)
s.Equal(mutableState, workflowContext.GetMutableState())
Expand All @@ -147,7 +148,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
gomock.InOrder(
wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState1, nil),
Expand All @@ -160,6 +161,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.NoError(err)
s.Equal(mutableState2, workflowContext.GetMutableState())
Expand All @@ -181,7 +183,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound(""))

Expand All @@ -192,6 +194,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.IsType(&serviceerror.NotFound{}, err)
s.Nil(workflowContext)
Expand All @@ -213,7 +216,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound(""))

Expand All @@ -224,6 +227,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.IsType(&persistence.ShardOwnershipLostError{}, err)
s.Nil(workflowContext)
Expand All @@ -245,7 +249,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewUnavailable(""))

Expand All @@ -254,6 +258,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.IsType(&serviceerror.Unavailable{}, err)
s.Nil(workflowContext)
Expand Down
1 change: 1 addition & 0 deletions service/history/api/deleteworkflow/api.go
Expand Up @@ -55,6 +55,7 @@ func Invoke(
request.WorkflowExecution.WorkflowId,
request.WorkflowExecution.RunId,
),
workflow.LockPriorityLow,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/describemutablestate/api.go
Expand Up @@ -56,6 +56,7 @@ func Invoke(
req.Execution.WorkflowId,
req.Execution.RunId,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/describeworkflow/api.go
Expand Up @@ -68,6 +68,7 @@ func Invoke(
req.Request.Execution.WorkflowId,
req.Request.Execution.RunId,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/get_workflow_util.go
Expand Up @@ -162,6 +162,7 @@ func GetMutableState(
nil,
BypassMutableStateConsistencyPredicate,
workflowKey,
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/queryworkflow/api.go
Expand Up @@ -86,6 +86,7 @@ func Invoke(
nil,
api.BypassMutableStateConsistencyPredicate,
workflowKey,
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/refreshworkflow/api.go
Expand Up @@ -51,6 +51,7 @@ func Invoke(
nil,
api.BypassMutableStateConsistencyPredicate,
workflowKey,
workflow.LockPriorityLow,
)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/replication/generate_task.go
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
)

func GenerateTask(
Expand All @@ -57,6 +58,7 @@ func GenerateTask(
request.Execution.WorkflowId,
request.Execution.RunId,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions service/history/api/resetworkflow/api.go
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/ndc"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
Expand Down Expand Up @@ -66,6 +67,7 @@ func Invoke(
workflowID,
baseRunID,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -104,6 +106,7 @@ func Invoke(
workflowID,
currentRunID,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/signalwithstartworkflow/api.go
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
Expand All @@ -59,6 +60,7 @@ func Invoke(
signalWithStartRequest.SignalWithStartRequest.WorkflowId,
"",
),
workflow.LockPriorityHigh,
)
switch err.(type) {
case nil:
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/startworkflow/api.go
Expand Up @@ -419,7 +419,7 @@ func (s *Starter) getMutableStateInfo(ctx context.Context, runID string) (*mutab
ctx,
s.namespace.ID(),
commonpb.WorkflowExecution{WorkflowId: s.request.StartRequest.WorkflowId, RunId: runID},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/update_workflow_util.go
Expand Up @@ -49,6 +49,7 @@ func GetAndUpdateWorkflowWithNew(
reqClock,
consistencyCheckFn,
workflowKey,
workflow.LockPriorityHigh,
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/updateworkflow/api.go
Expand Up @@ -60,6 +60,7 @@ func Invoke(
req.Request.WorkflowExecution.WorkflowId,
req.Request.WorkflowExecution.RunId,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
Expand All @@ -61,6 +62,7 @@ func Invoke(
request.ParentExecution.WorkflowId,
request.ParentExecution.RunId,
),
workflow.LockPriorityLow,
)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngine2_test.go
Expand Up @@ -466,7 +466,7 @@ func (s *engine2Suite) TestRecordWorkflowTaskStartedSuccess() {
metrics.AddMetricsContext(context.Background()),
tests.NamespaceID,
workflowExecution,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
s.NoError(err)
loadedMS, err := ctx.LoadMutableState(context.Background())
Expand Down Expand Up @@ -1946,7 +1946,7 @@ func (s *engine2Suite) getMutableState(namespaceID namespace.ID, we commonpb.Wor
metrics.AddMetricsContext(context.Background()),
namespaceID,
we,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
if err != nil {
return nil
Expand Down
6 changes: 3 additions & 3 deletions service/history/historyEngine_test.go
Expand Up @@ -653,7 +653,7 @@ func (s *engineSuite) TestQueryWorkflow_ConsistentQueryBufferFull() {
context.Background(),
tests.NamespaceID,
execution,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
s.NoError(err)
loadedMS, err := ctx.LoadMutableState(context.Background())
Expand Down Expand Up @@ -4365,7 +4365,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_SuccessWith
context.Background(),
tests.NamespaceID,
we,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
s.NoError(err)
loadedMS, err := ctx.LoadMutableState(context.Background())
Expand Down Expand Up @@ -5230,7 +5230,7 @@ func (s *engineSuite) getMutableState(testNamespaceID namespace.ID, we commonpb.
context.Background(),
tests.NamespaceID,
we,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
if err != nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion service/history/ndc/activity_replicator.go
Expand Up @@ -105,7 +105,7 @@ func (r *ActivityReplicatorImpl) SyncActivity(
ctx,
namespaceID,
execution,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
if err != nil {
// for get workflow execution context, with valid run id
Expand Down

0 comments on commit a89012e

Please sign in to comment.