Skip to content

Commit

Permalink
Lock current execution on start workflow (#4066)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 22, 2023
1 parent d53f1b6 commit e8be006
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 7 deletions.
24 changes: 24 additions & 0 deletions service/history/api/startworkflow/api.go
Expand Up @@ -46,6 +46,7 @@ import (
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
"go.temporal.io/server/service/history/workflow/cache"
)

type eagerStartDeniedReason metrics.ReasonString
Expand Down Expand Up @@ -166,6 +167,14 @@ func (s *Starter) Invoke(
if err != nil {
return nil, err
}

// grab current workflow context as a lock so that user latency can be computed
currentRelease, err := s.lockCurrentWorkflowExecution(ctx)
if err != nil {
return nil, err
}
defer func() { currentRelease(retError) }()

err = s.createBrandNew(ctx, creationParams)
if err == nil {
return s.generateResponse(creationParams.runID, creationParams.workflowTaskInfo, extractHistoryEvents(creationParams.workflowEventBatches))
Expand All @@ -178,6 +187,21 @@ func (s *Starter) Invoke(
return s.handleConflict(ctx, creationParams, currentWorkflowConditionFailedError)
}

func (s *Starter) lockCurrentWorkflowExecution(
ctx context.Context,
) (cache.ReleaseCacheFunc, error) {
_, currentRelease, err := s.workflowConsistencyChecker.GetWorkflowCache().GetOrCreateCurrentWorkflowExecution(
ctx,
s.namespace.ID(),
s.request.StartRequest.WorkflowId,
workflow.CallerTypeAPI,
)
if err != nil {
return nil, err
}
return currentRelease, nil
}

// createNewMutableState creates a new workflow context, and closes its mutable state transaction as snapshot.
// It returns the creationContext which can later be used to insert into the executions table.
func (s *Starter) createNewMutableState(ctx context.Context, workflowID string, runID string) (*creationParams, error) {
Expand Down
67 changes: 60 additions & 7 deletions service/history/workflow/cache/cache.go
Expand Up @@ -55,6 +55,13 @@ type (
ReleaseCacheFunc func(err error)

Cache interface {
GetOrCreateCurrentWorkflowExecution(
ctx context.Context,
namespaceID namespace.ID,
workflowID string,
caller workflow.CallerType,
) (workflow.Context, ReleaseCacheFunc, error)

GetOrCreateWorkflowExecution(
ctx context.Context,
namespaceID namespace.ID,
Expand Down Expand Up @@ -97,6 +104,42 @@ func NewCache(shard shard.Context) Cache {
}
}

func (c *CacheImpl) GetOrCreateCurrentWorkflowExecution(
ctx context.Context,
namespaceID namespace.ID,
workflowID string,
caller workflow.CallerType,
) (workflow.Context, ReleaseCacheFunc, error) {

if err := c.validateWorkflowID(workflowID); err != nil {
return nil, nil, err
}

handler := c.metricsHandler.WithTags(metrics.OperationTag(metrics.HistoryCacheGetOrCreateCurrentScope))
handler.Counter(metrics.CacheRequests.GetMetricName()).Record(1)
start := time.Now()
defer func() { handler.Timer(metrics.CacheLatency.GetMetricName()).Record(time.Since(start)) }()

execution := commonpb.WorkflowExecution{
WorkflowId: workflowID,
// using empty run ID as current workflow run ID
RunId: "",
}

weCtx, weReleaseFn, err := c.getOrCreateWorkflowExecutionInternal(
ctx,
namespaceID,
execution,
handler,
true,
caller,
)

metrics.ContextCounterAdd(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), time.Since(start).Nanoseconds())

return weCtx, weReleaseFn, err
}

func (c *CacheImpl) GetOrCreateWorkflowExecution(
ctx context.Context,
namespaceID namespace.ID,
Expand Down Expand Up @@ -197,13 +240,8 @@ func (c *CacheImpl) validateWorkflowExecutionInfo(
execution *commonpb.WorkflowExecution,
) error {

if execution.GetWorkflowId() == "" {
return serviceerror.NewInvalidArgument("Can't load workflow execution. WorkflowId not set.")
}

if !utf8.ValidString(execution.GetWorkflowId()) {
// We know workflow cannot exist with invalid utf8 string as WorkflowID.
return serviceerror.NewNotFound("Workflow not exists.")
if err := c.validateWorkflowID(execution.GetWorkflowId()); err != nil {
return err
}

// RunID is not provided, lets try to retrieve the RunID for current active execution
Expand All @@ -224,3 +262,18 @@ func (c *CacheImpl) validateWorkflowExecutionInfo(
}
return nil
}

func (c *CacheImpl) validateWorkflowID(
workflowID string,
) error {
if workflowID == "" {
return serviceerror.NewInvalidArgument("Can't load workflow execution. WorkflowId not set.")
}

if !utf8.ValidString(workflowID) {
// We know workflow cannot exist with invalid utf8 string as WorkflowID.
return serviceerror.NewNotFound("Workflow not exists.")
}

return nil
}
16 changes: 16 additions & 0 deletions service/history/workflow/cache/cache_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions service/history/workflow/cache/cache_test.go
Expand Up @@ -41,6 +41,7 @@ import (

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
Expand Down Expand Up @@ -362,3 +363,38 @@ func (s *workflowCacheSuite) TestHistoryCacheConcurrentAccess_Pin() {
}
stopGroup.Wait()
}

func (s *workflowCacheSuite) TestHistoryCache_CacheLatencyMetricContext() {
s.cache = NewCache(s.mockShard)

ctx := metrics.AddMetricsContext(context.Background())
_, currentRelease, err := s.cache.GetOrCreateCurrentWorkflowExecution(
ctx,
tests.NamespaceID,
tests.WorkflowID,
workflow.CallerTypeAPI,
)
s.NoError(err)
defer currentRelease(nil)

latency1, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName())
s.True(ok)
s.NotZero(latency1)

_, release, err := s.cache.GetOrCreateWorkflowExecution(
ctx,
tests.NamespaceID,
commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
},
workflow.CallerTypeAPI,
)
s.Nil(err)
defer release(nil)

latency2, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName())
s.True(ok)
s.Greater(latency2, latency1)

}

0 comments on commit e8be006

Please sign in to comment.