Skip to content

Commit

Permalink
Merge branch 'master' into snowden/effective-resource-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Jul 18, 2023
2 parents b31f1cf + ba5b23d commit 0709b06
Show file tree
Hide file tree
Showing 33 changed files with 808 additions and 435 deletions.
5 changes: 5 additions & 0 deletions common/constants.go
Expand Up @@ -102,3 +102,8 @@ const (
// DefaultQueueReaderID is the default readerID when loading history tasks
DefaultQueueReaderID int64 = 0
)

const (
// DefaultOperatorRPSRatio is the default percentage of rate limit that should be used for operator priority requests
DefaultOperatorRPSRatio float64 = 0.2
)
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -119,6 +119,9 @@ const (
// ShardPerNsRPSWarnPercent is the per-shard per-namespace RPS limit for warning as a percentage of ShardRPSWarnLimit
// these warning are not emitted if the value is set to 0 or less
ShardPerNsRPSWarnPercent = "system.shardPerNsRPSWarnPercent"
// OperatorRPSRatio is the percentage of the rate limit provided to priority rate limiters that should be used for
// operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20% if not specified)
OperatorRPSRatio = "system.operatorRPSRatio"

// Whether the deadlock detector should dump goroutines
DeadlockDumpGoroutines = "system.deadlock.DumpGoroutines"
Expand Down
7 changes: 4 additions & 3 deletions common/headers/caller_info.go
Expand Up @@ -31,6 +31,7 @@ import (
)

const (
CallerTypeOperator = "operator"
CallerTypeAPI = "api"
CallerTypeBackground = "background"
CallerTypePreemptable = "preemptable"
Expand Down Expand Up @@ -113,7 +114,7 @@ func SetCallerInfo(
) context.Context {
return setIncomingMD(ctx, map[string]string{
callerNameHeaderName: info.CallerName,
callerTypeHeaderName: info.CallerType,
CallerTypeHeaderName: info.CallerType,
callOriginHeaderName: info.CallOrigin,
})
}
Expand All @@ -133,7 +134,7 @@ func SetCallerType(
ctx context.Context,
callerType string,
) context.Context {
return setIncomingMD(ctx, map[string]string{callerTypeHeaderName: callerType})
return setIncomingMD(ctx, map[string]string{CallerTypeHeaderName: callerType})
}

// SetOrigin set call origin in the context.
Expand Down Expand Up @@ -168,7 +169,7 @@ func setIncomingMD(
func GetCallerInfo(
ctx context.Context,
) CallerInfo {
values := GetValues(ctx, callerNameHeaderName, callerTypeHeaderName, callOriginHeaderName)
values := GetValues(ctx, callerNameHeaderName, CallerTypeHeaderName, callOriginHeaderName)
return CallerInfo{
CallerName: values[0],
CallerType: values[1],
Expand Down
8 changes: 4 additions & 4 deletions common/headers/caller_info_test.go
Expand Up @@ -127,7 +127,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_PreserveOtherValues() {
s.True(ok)
s.Equal(existingValue, md.Get(existingKey)[0])
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Equal(callerType, md.Get(CallerTypeHeaderName)[0])
s.Equal(callOrigin, md.Get(callOriginHeaderName)[0])
s.Len(md, 4)
}
Expand All @@ -146,7 +146,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_NoExistingCallerInfo() {
md, ok := metadata.FromIncomingContext(ctx)
s.True(ok)
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Equal(callerType, md.Get(CallerTypeHeaderName)[0])
s.Equal(callOrigin, md.Get(callOriginHeaderName)[0])
s.Len(md, 3)
}
Expand All @@ -169,7 +169,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_WithExistingCallerInfo() {
md, ok := metadata.FromIncomingContext(ctx)
s.True(ok)
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Equal(callerType, md.Get(CallerTypeHeaderName)[0])
s.Equal(callOrigin, md.Get(callOriginHeaderName)[0])
s.Len(md, 3)
}
Expand All @@ -187,7 +187,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_WithPartialCallerInfo() {
md, ok := metadata.FromIncomingContext(ctx)
s.True(ok)
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Equal(callerType, md.Get(CallerTypeHeaderName)[0])
s.Empty(md.Get(callOriginHeaderName))
s.Len(md, 2)
}
4 changes: 2 additions & 2 deletions common/headers/headers.go
Expand Up @@ -38,7 +38,7 @@ const (
SupportedFeaturesHeaderDelim = ","

callerNameHeaderName = "caller-name"
callerTypeHeaderName = "caller-type"
CallerTypeHeaderName = "caller-type"
callOriginHeaderName = "call-initiation"
)

Expand All @@ -50,7 +50,7 @@ var (
SupportedServerVersionsHeaderName,
SupportedFeaturesHeaderName,
callerNameHeaderName,
callerTypeHeaderName,
CallerTypeHeaderName,
callOriginHeaderName,
}
)
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/client/fx.go
Expand Up @@ -45,6 +45,7 @@ type (
PersistenceNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter
PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter
EnablePriorityRateLimiting dynamicconfig.BoolPropertyFn
OperatorRPSRatio dynamicconfig.FloatPropertyFn

DynamicRateLimitingParams dynamicconfig.MapPropertyFn

Expand All @@ -59,6 +60,7 @@ type (
PersistenceNamespaceMaxQPS PersistenceNamespaceMaxQps
PersistencePerShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS
EnablePriorityRateLimiting EnablePriorityRateLimiting
OperatorRPSRatio OperatorRPSRatio
ClusterName ClusterName
ServiceName primitives.ServiceName
MetricsHandler metrics.Handler
Expand Down Expand Up @@ -92,6 +94,7 @@ func FactoryProvider(
params.PersistenceMaxQPS,
params.PersistencePerShardNamespaceMaxQPS,
RequestPriorityFn,
params.OperatorRPSRatio,
params.HealthSignals,
params.DynamicRateLimitingParams,
params.Logger,
Expand Down
74 changes: 49 additions & 25 deletions common/persistence/client/quotas.go
Expand Up @@ -41,29 +41,30 @@ type (

var (
CallerTypeDefaultPriority = map[string]int{
headers.CallerTypeAPI: 1,
headers.CallerTypeBackground: 3,
headers.CallerTypePreemptable: 4,
headers.CallerTypeOperator: 0,
headers.CallerTypeAPI: 2,
headers.CallerTypeBackground: 4,
headers.CallerTypePreemptable: 5,
}

APITypeCallOriginPriorityOverride = map[string]int{
"StartWorkflowExecution": 0,
"SignalWithStartWorkflowExecution": 0,
"SignalWorkflowExecution": 0,
"RequestCancelWorkflowExecution": 0,
"TerminateWorkflowExecution": 0,
"GetWorkflowExecutionHistory": 0,
"UpdateWorkflowExecution": 0,
"StartWorkflowExecution": 1,
"SignalWithStartWorkflowExecution": 1,
"SignalWorkflowExecution": 1,
"RequestCancelWorkflowExecution": 1,
"TerminateWorkflowExecution": 1,
"GetWorkflowExecutionHistory": 1,
"UpdateWorkflowExecution": 1,
}

BackgroundTypeAPIPriorityOverride = map[string]int{
"GetOrCreateShard": 0,
"UpdateShard": 0,
"GetOrCreateShard": 1,
"UpdateShard": 1,

// This is a preprequisite for checkpointing queue process progress
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTransfer): 0,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTimer): 0,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryVisibility): 0,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTransfer): 1,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTimer): 1,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryVisibility): 1,

// Task resource isolation assumes task can always be loaded.
// When one namespace has high load, all task processing goroutines
Expand All @@ -73,19 +74,20 @@ var (
// NOTE: we also don't want task loading to consume all persistence request tokens,
// and blocks all other operations. This is done by setting the queue host rps limit
// dynamic config.
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTransfer): 2,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTimer): 2,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 2,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTransfer): 3,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTimer): 3,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 3,
}

RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4}
RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4, 5}
)

func NewPriorityRateLimiter(
namespaceMaxQPS PersistenceNamespaceMaxQps,
hostMaxQPS PersistenceMaxQps,
perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
healthSignals p.HealthSignalAggregator,
dynamicParams DynamicRateLimitingParams,
logger log.Logger,
Expand All @@ -94,20 +96,21 @@ func NewPriorityRateLimiter(

return quotas.NewMultiRequestRateLimiter(
// per shardID+namespaceID rate limiters
newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxQPS, hostMaxQPS, requestPriorityFn),
newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxQPS, hostMaxQPS, requestPriorityFn, operatorRPSRatio),
// per namespaceID rate limiters
newPriorityNamespaceRateLimiter(namespaceMaxQPS, hostMaxQPS, requestPriorityFn),
newPriorityNamespaceRateLimiter(namespaceMaxQPS, hostMaxQPS, requestPriorityFn, operatorRPSRatio),
// host-level dynamic rate limiter
newPriorityDynamicRateLimiter(hostRateFn, requestPriorityFn, healthSignals, dynamicParams, logger),
newPriorityDynamicRateLimiter(hostRateFn, requestPriorityFn, operatorRPSRatio, healthSignals, dynamicParams, logger),
// basic host-level rate limiter
newPriorityRateLimiter(hostRateFn, requestPriorityFn),
newPriorityRateLimiter(hostRateFn, requestPriorityFn, operatorRPSRatio),
)
}

func newPerShardPerNamespacePriorityRateLimiter(
perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS,
hostMaxQPS PersistenceMaxQps,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
) quotas.RequestRateLimiter {
return quotas.NewMapRequestRateLimiter(func(req quotas.Request) quotas.RequestRateLimiter {
if hasCaller(req) && hasCallerSegment(req) {
Expand All @@ -118,6 +121,7 @@ func newPerShardPerNamespacePriorityRateLimiter(
return float64(perShardNamespaceMaxQPS(req.Caller))
},
requestPriorityFn,
operatorRPSRatio,
)
}
return quotas.NoopRequestRateLimiter
Expand All @@ -137,6 +141,7 @@ func newPriorityNamespaceRateLimiter(
namespaceMaxQPS PersistenceNamespaceMaxQps,
hostMaxQPS PersistenceMaxQps,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
) quotas.RequestRateLimiter {
return quotas.NewNamespaceRequestRateLimiter(func(req quotas.Request) quotas.RequestRateLimiter {
if hasCaller(req) {
Expand All @@ -154,6 +159,7 @@ func newPriorityNamespaceRateLimiter(
return namespaceQPS
},
requestPriorityFn,
operatorRPSRatio,
)
}
return quotas.NoopRequestRateLimiter
Expand All @@ -163,10 +169,15 @@ func newPriorityNamespaceRateLimiter(
func newPriorityRateLimiter(
rateFn quotas.RateFn,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range RequestPrioritiesOrdered {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn))
if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(operatorRateFn(rateFn, operatorRPSRatio)))
} else {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn))
}
}

return quotas.NewPriorityRateLimiter(
Expand All @@ -178,14 +189,19 @@ func newPriorityRateLimiter(
func newPriorityDynamicRateLimiter(
rateFn quotas.RateFn,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
healthSignals p.HealthSignalAggregator,
dynamicParams DynamicRateLimitingParams,
logger log.Logger,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range RequestPrioritiesOrdered {
// TODO: refactor this so dynamic rate adjustment is global for all priorities
rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, rateFn, dynamicParams, logger)
if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] {
rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, operatorRateFn(rateFn, operatorRPSRatio), dynamicParams, logger)
} else {
rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, rateFn, dynamicParams, logger)
}
}

return quotas.NewPriorityRateLimiter(
Expand All @@ -211,6 +227,8 @@ func NewNoopPriorityRateLimiter(

func RequestPriorityFn(req quotas.Request) int {
switch req.CallerType {
case headers.CallerTypeOperator:
return CallerTypeDefaultPriority[req.CallerType]
case headers.CallerTypeAPI:
if priority, ok := APITypeCallOriginPriorityOverride[req.Initiation]; ok {
return priority
Expand All @@ -229,6 +247,12 @@ func RequestPriorityFn(req quotas.Request) int {
}
}

func operatorRateFn(rateFn quotas.RateFn, operatorRPSRatio OperatorRPSRatio) quotas.RateFn {
return func() float64 {
return operatorRPSRatio() * rateFn()
}
}

func hasCaller(req quotas.Request) bool {
return req.Caller != "" && req.Caller != headers.CallerNameSystem
}
Expand Down
52 changes: 44 additions & 8 deletions common/persistence/client/quotas_test.go
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/quotas"
"golang.org/x/exp/slices"

Expand Down Expand Up @@ -104,12 +105,13 @@ func (s *quotasSuite) TestCallOriginDefined() {
}

func (s *quotasSuite) TestPriorityNamespaceRateLimiter_DoesLimit() {
var namespaceMaxRPS = func(namespace string) int { return 1 }
var hostMaxRPS = func() int { return 1 }
namespaceMaxRPS := func(namespace string) int { return 1 }
hostMaxRPS := func() int { return 1 }
operatorRPSRatioFn := func() float64 { return 0.2 }

var limiter = newPriorityNamespaceRateLimiter(namespaceMaxRPS, hostMaxRPS, RequestPriorityFn)
limiter := newPriorityNamespaceRateLimiter(namespaceMaxRPS, hostMaxRPS, RequestPriorityFn, operatorRPSRatioFn)

var request = quotas.NewRequest(
request := quotas.NewRequest(
"test-api",
1,
"test-namespace",
Expand All @@ -131,12 +133,13 @@ func (s *quotasSuite) TestPriorityNamespaceRateLimiter_DoesLimit() {
}

func (s *quotasSuite) TestPerShardNamespaceRateLimiter_DoesLimit() {
var perShardNamespaceMaxRPS = func(namespace string) int { return 1 }
var hostMaxRPS = func() int { return 1 }
perShardNamespaceMaxRPS := func(namespace string) int { return 1 }
hostMaxRPS := func() int { return 1 }
operatorRPSRatioFn := func() float64 { return 0.2 }

var limiter = newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxRPS, hostMaxRPS, RequestPriorityFn)
limiter := newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxRPS, hostMaxRPS, RequestPriorityFn, operatorRPSRatioFn)

var request = quotas.NewRequest(
request := quotas.NewRequest(
"test-api",
1,
"test-namespace",
Expand All @@ -156,3 +159,36 @@ func (s *quotasSuite) TestPerShardNamespaceRateLimiter_DoesLimit() {

s.True(wasLimited)
}

func (s *quotasSuite) TestOperatorPrioritized() {
rateFn := func() float64 { return 5 }
operatorRPSRatioFn := func() float64 { return 0.2 }
limiter := newPriorityRateLimiter(rateFn, RequestPriorityFn, operatorRPSRatioFn)

operatorRequest := quotas.NewRequest(
"DescribeWorkflowExecution",
1,
"test-namespace",
headers.CallerTypeOperator,
-1,
"DescribeWorkflowExecution")

apiRequest := quotas.NewRequest(
"DescribeWorkflowExecution",
1,
"test-namespace",
headers.CallerTypeAPI,
-1,
"DescribeWorkflowExecution")

requestTime := time.Now()
wasLimited := false

for i := 0; i < 6; i++ {
if !limiter.Allow(requestTime, apiRequest) {
wasLimited = true
s.True(limiter.Allow(requestTime, operatorRequest))
}
}
s.True(wasLimited)
}

0 comments on commit 0709b06

Please sign in to comment.