Skip to content

Commit

Permalink
Do not let API call timeout if workflow can't be locked (#4341)
Browse files Browse the repository at this point in the history
* Use a context with timeout

* Add new error and tail time

* Refactor and add unit test

* do not override small timeouts

* define error in consts

* mark workflow busy as non-transient error to prevent retries
  • Loading branch information
samanbarghi committed May 23, 2023
1 parent b191203 commit 68856fe
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 12 deletions.
9 changes: 6 additions & 3 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,12 @@ func IsServiceClientTransientError(err error) bool {
return true
}

switch err.(type) {
case *serviceerror.ResourceExhausted,
*serviceerrors.ShardOwnershipLost:
switch err := err.(type) {
case *serviceerror.ResourceExhausted:
if err.Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW {
return true
}
case *serviceerrors.ShardOwnershipLost:
return true
}
return false
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
go.opentelemetry.io/otel/metric v0.36.0
go.opentelemetry.io/otel/sdk v1.13.0
go.opentelemetry.io/otel/sdk/metric v0.36.0
go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577
go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66
go.temporal.io/sdk v1.22.2
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,8 @@ go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.temporal.io/api v1.19.1-0.20230322213042-07fb271d475b/go.mod h1:PLQJqp1YZZikmtGm9jIbzWpP3p6zS39WQjhsO/Hiw30=
go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577 h1:BDU+5DlZuQicarZIXLhwXtup1dj8WUk+7XiK6m0brvA=
go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4=
go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66 h1:nLBDjkSXTJO/aoptKUSGmhVu78qiNIupn0j0RQGTs5M=
go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4=
go.temporal.io/sdk v1.22.2 h1:4bGxYekEN+FHAGXkRAxZcHs9k+fNO3RUmBRf97WH3So=
go.temporal.io/sdk v1.22.2/go.mod h1:LqYtPesETgMHktpH98Vk7WegNcikxErmmuaZPNWEnPw=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
3 changes: 3 additions & 0 deletions service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package consts
import (
"errors"

"go.temporal.io/api/enums/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

Expand Down Expand Up @@ -94,6 +95,8 @@ var (
ErrNamespaceHandover = common.ErrNamespaceHandover
// ErrWorkflowTaskStateInconsistent is error indicating workflow task state is inconsistent, for example there was no workflow task scheduled but buffered events are present.
ErrWorkflowTaskStateInconsistent = serviceerror.NewUnavailable("Workflow task state is inconsistent.")
// ErrResourceExhaustedBusyWorkflow is an error indicating workflow resource is exhausted and should not be retried by service handler and client
ErrResourceExhaustedBusyWorkflow = serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow is busy.")

// FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy
// for start workflow execution API
Expand Down
44 changes: 39 additions & 5 deletions service/history/workflow/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ import (

"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)
Expand Down Expand Up @@ -85,8 +87,10 @@ type (
var NoopReleaseFn ReleaseCacheFunc = func(err error) {}

const (
cacheNotReleased int32 = 0
cacheReleased int32 = 1
cacheNotReleased int32 = 0
cacheReleased int32 = 1
workflowLockTimeoutTailTime = 500 * time.Millisecond
nonApiContextLockTimeout = 500 * time.Millisecond
)

func NewCache(shard shard.Context) Cache {
Expand Down Expand Up @@ -198,16 +202,46 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal(
// Consider revisiting this if it causes too much GC activity
releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority)

if err := workflowCtx.Lock(ctx, lockPriority); err != nil {
// ctx is done before lock can be acquired
c.Release(key)
if err := c.lockWorkflowExecution(ctx, workflowCtx, key, lockPriority); err != nil {
handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1)
handler.Counter(metrics.AcquireLockFailedCounter.GetMetricName()).Record(1)
return nil, nil, err
}

return workflowCtx, releaseFunc, nil
}

func (c *CacheImpl) lockWorkflowExecution(ctx context.Context,
workflowCtx workflow.Context,
key definition.WorkflowKey,
lockPriority workflow.LockPriority) error {

// skip if there is no deadline
if deadline, ok := ctx.Deadline(); ok {
var cancel context.CancelFunc
if headers.GetCallerInfo(ctx).CallerType != headers.CallerTypeAPI {
newDeadline := time.Now().Add(nonApiContextLockTimeout)
if newDeadline.Before(deadline) {
ctx, cancel = context.WithDeadline(ctx, newDeadline)
defer cancel()
}
} else {
newDeadline := deadline.Add(-workflowLockTimeoutTailTime)
if newDeadline.After(time.Now()) {
ctx, cancel = context.WithDeadline(ctx, newDeadline)
defer cancel()
}
}
}

if err := workflowCtx.Lock(ctx, lockPriority); err != nil {
// ctx is done before lock can be acquired
c.Release(key)
return consts.ErrResourceExhaustedBusyWorkflow
}
return nil
}

func (c *CacheImpl) makeReleaseFunc(
key definition.WorkflowKey,
context workflow.Context,
Expand Down
73 changes: 72 additions & 1 deletion service/history/workflow/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -398,3 +399,73 @@ func (s *workflowCacheSuite) TestHistoryCache_CacheLatencyMetricContext() {
s.Greater(latency2, latency1)

}

func (s *workflowCacheSuite) TestCacheImpl_lockWorkflowExecution() {

testSets := []struct {
name string
shouldLockBefore bool
callerType string
withTimeout bool
wantErr bool
}{

{
name: "API context without timeout without locking beforehand should not return an error",
callerType: headers.CallerTypeAPI,
},
{
name: "API context without timeout with locking beforehand should not return an error",
shouldLockBefore: true,
callerType: headers.CallerTypeAPI,
wantErr: true,
},

{
name: "API context with timeout without locking beforehand should not return an error",
callerType: headers.CallerTypeAPI,
},
{
name: "API context with timeout and locking beforehand should return an error",
shouldLockBefore: true,
callerType: headers.CallerTypeAPI,
wantErr: true,
},
{
name: "Non API context with timeout without locking beforehand should return an error",
callerType: headers.CallerTypeBackground,
},
{
name: "Non API context with timeout and locking beforehand should return an error",
shouldLockBefore: true,
callerType: headers.CallerTypeBackground,
wantErr: true,
},
}
for _, tt := range testSets {
s.Run(tt.name, func() {
c := NewCache(s.mockShard).(*CacheImpl)
namespaceID := namespace.ID("test_namespace_id")
execution := commonpb.WorkflowExecution{
WorkflowId: "some random workflow id",
RunId: uuid.New(),
}
key := definition.NewWorkflowKey(namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId())
workflowCtx := workflow.NewContext(c.shard, key, c.logger)
ctx := headers.SetCallerType(context.Background(), tt.callerType)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

if tt.shouldLockBefore {
// lock the workflow to allow it to time out
err := workflowCtx.Lock(ctx, workflow.LockPriorityHigh)
s.Nil(err)
}

if err := c.lockWorkflowExecution(ctx, workflowCtx, key, workflow.LockPriorityHigh); (err != nil) != tt.wantErr {
s.T().Errorf("CacheImpl.lockWorkflowExecution() error = %v, wantErr %v", err, tt.wantErr)
}

})
}
}

0 comments on commit 68856fe

Please sign in to comment.