Skip to content

Commit

Permalink
Use priority based API throttling (#1512)
Browse files Browse the repository at this point in the history
* APIs are divided into 3 groups
  * Execution APIs Group
  * Visibility APIs Group
  * Other APIs Group
* Execution APIs Group contains the following APIs
  * Priority 0 (highest priority)
    * StartWorkflowExecution
    * SignalWithStartWorkflowExecution
    * SignalWorkflowExecution
    * RequestCancelWorkflowExecution
    * TerminateWorkflowExecution
    * GetWorkflowExecutionHistory
  * Priority 1
    * RecordActivityTaskHeartbeat
    * RecordActivityTaskHeartbeatById
    * RespondActivityTaskCanceled
    * RespondActivityTaskCanceledById
    * RespondActivityTaskFailed
    * RespondActivityTaskFailedById
    * RespondActivityTaskCompleted
    * RespondActivityTaskCompletedById
    * RespondWorkflowTaskCompleted
  * Priority 2
    * ResetWorkflowExecution
    * DescribeWorkflowExecution
    * RespondWorkflowTaskFailed
    * QueryWorkflow
    * RespondQueryTaskCompleted
    * PollWorkflowTaskQueue
    * PollActivityTaskQueue
  * Priority 3 (lowest priority)
    * ResetStickyTaskQueue
    * DescribeTaskQueue
    * ListTaskQueuePartitions
* Visibility APIs Group contains the following APIs
    * CountWorkflowExecutions
    * ScanWorkflowExecutions
    * ListOpenWorkflowExecutions
    * ListClosedWorkflowExecutions
    * ListWorkflowExecutions
    * ListArchivedWorkflowExecutions
* Other APIs Group contains the following APIs
    * GetClusterInfo
    * GetSearchAttributes
    * RegisterNamespace
    * UpdateNamespace
    * DescribeNamespace
    * ListNamespaces
    * DeprecateNamespace
* Introduce new noop reservation
* Introduce new routing rate limiter
  • Loading branch information
wxing1292 committed May 10, 2021
1 parent 26cbc5f commit 76f55e6
Show file tree
Hide file tree
Showing 23 changed files with 1,132 additions and 234 deletions.
6 changes: 3 additions & 3 deletions common/persistence/visibilitySamplingClient.go
Expand Up @@ -33,9 +33,9 @@ import (
)

type visibilitySamplingClient struct {
rateLimitersForOpen quotas.NamespaceRateLimiter
rateLimitersForClosed quotas.NamespaceRateLimiter
rateLimitersForList quotas.NamespaceRateLimiter
rateLimitersForOpen *quotas.NamespaceMultiStageRateLimiterImpl
rateLimitersForClosed *quotas.NamespaceMultiStageRateLimiterImpl
rateLimitersForList *quotas.NamespaceMultiStageRateLimiterImpl
persistence VisibilityManager
config *config.VisibilityConfig
metricClient metrics.Client
Expand Down
6 changes: 2 additions & 4 deletions common/quotas/namespace_multi_stage_rate_limiter_impl.go
Expand Up @@ -34,18 +34,16 @@ type (
// NamespaceMultiStageRateLimiterImpl is a multi stage rate limiter
// special built for multi-tenancy
NamespaceMultiStageRateLimiterImpl struct {
namespaceRateLimiterFn NamespaceRateLimiterFn
namespaceRateLimiterFn func(namespaceID string) RateLimiter
sharedRateLimiters []RateLimiter

sync.RWMutex
namespaceRateLimiters map[string]RateLimiter
}
)

var _ NamespaceRateLimiter = (*NamespaceMultiStageRateLimiterImpl)(nil)

func NewNamespaceMultiStageRateLimiter(
namespaceRateLimiterFn NamespaceRateLimiterFn,
namespaceRateLimiterFn func(namespaceID string) RateLimiter,
sharedRateLimiters []RateLimiter,
) *NamespaceMultiStageRateLimiterImpl {
return &NamespaceMultiStageRateLimiterImpl{
Expand Down
75 changes: 20 additions & 55 deletions common/quotas/namespace_rate_limiter_impl.go
Expand Up @@ -33,109 +33,74 @@ import (
type (
// NamespaceRateLimiterImpl is a rate limiter special built for multi-tenancy
NamespaceRateLimiterImpl struct {
namespaceRateLimiterFn NamespaceRateLimiterFn
namespaceRateLimiterFn RequestRateLimiterFn

sync.RWMutex
namespaceRateLimiters map[string]RateLimiter
namespaceRateLimiters map[string]RequestRateLimiter
}
)

var _ NamespaceRateLimiter = (*NamespaceRateLimiterImpl)(nil)
var _ RequestRateLimiter = (*NamespaceRateLimiterImpl)(nil)

func NewNamespaceRateLimiter(
namespaceRateLimiterFn NamespaceRateLimiterFn,
namespaceRateLimiterFn RequestRateLimiterFn,
) *NamespaceRateLimiterImpl {
return &NamespaceRateLimiterImpl{
namespaceRateLimiterFn: namespaceRateLimiterFn,

namespaceRateLimiters: make(map[string]RateLimiter),
namespaceRateLimiters: make(map[string]RequestRateLimiter),
}
}

// Allow attempts to allow a request to go through. The method returns
// immediately with a true or false indicating if the request can make
// progress
func (r *NamespaceRateLimiterImpl) Allow(
namespaceID string,
) bool {

return r.AllowN(namespaceID, time.Now(), 1)
}

// AllowN attempts to allow a request to go through. The method returns
// immediately with a true or false indicating if the request can make
// progress
func (r *NamespaceRateLimiterImpl) AllowN(
namespaceID string,
now time.Time,
numToken int,
request Request,
) bool {

rateLimiter := r.getOrInitRateLimiter(namespaceID)
return rateLimiter.AllowN(now, numToken)
rateLimiter := r.getOrInitRateLimiter(request)
return rateLimiter.Allow(now, request)
}

// Reserve returns a Reservation that indicates how long the caller
// must wait before event happen.
func (r *NamespaceRateLimiterImpl) Reserve(
namespaceID string,
) Reservation {

return r.ReserveN(namespaceID, time.Now(), 1)
}

// ReserveN returns a Reservation that indicates how long the caller
// must wait before event happen.
func (r *NamespaceRateLimiterImpl) ReserveN(
namespaceID string,
now time.Time,
numToken int,
request Request,
) Reservation {

rateLimiter := r.getOrInitRateLimiter(namespaceID)
return rateLimiter.ReserveN(now, numToken)
rateLimiter := r.getOrInitRateLimiter(request)
return rateLimiter.Reserve(now, request)
}

// Wait waits till the deadline for a rate limit token to allow the request
// to go through.
func (r *NamespaceRateLimiterImpl) Wait(
ctx context.Context,
namespaceID string,
) error {

return r.WaitN(ctx, namespaceID, 1)
}

// WaitN waits till the deadline for a rate limit token to allow the request
// to go through.
func (r *NamespaceRateLimiterImpl) WaitN(
ctx context.Context,
namespaceID string,
numToken int,
request Request,
) error {

rateLimiter := r.getOrInitRateLimiter(namespaceID)
return rateLimiter.WaitN(ctx, numToken)
rateLimiter := r.getOrInitRateLimiter(request)
return rateLimiter.Wait(ctx, request)
}

func (r *NamespaceRateLimiterImpl) getOrInitRateLimiter(
namespaceID string,
) RateLimiter {
req Request,
) RequestRateLimiter {
r.RLock()
rateLimiter, ok := r.namespaceRateLimiters[namespaceID]
rateLimiter, ok := r.namespaceRateLimiters[req.Caller]
r.RUnlock()
if ok {
return rateLimiter
}

newRateLimiter := r.namespaceRateLimiterFn(namespaceID)
newRateLimiter := r.namespaceRateLimiterFn(req)
r.Lock()
defer r.Unlock()

rateLimiter, ok = r.namespaceRateLimiters[namespaceID]
rateLimiter, ok = r.namespaceRateLimiters[req.Caller]
if ok {
return rateLimiter
}
r.namespaceRateLimiters[namespaceID] = newRateLimiter
r.namespaceRateLimiters[req.Caller] = newRateLimiter
return newRateLimiter
}
69 changes: 69 additions & 0 deletions common/quotas/noop_reservation_impl.go
@@ -0,0 +1,69 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas

import (
"time"
)

type (
NoopReservationImpl struct {
}
)

var _ Reservation = (*NoopReservationImpl)(nil)

func NewNoopReservation() *NoopReservationImpl {
return &NoopReservationImpl{}
}

// OK returns whether the limiter can provide the requested number of tokens
func (r *NoopReservationImpl) OK() bool {
return true
}

// Cancel indicates that the reservation holder will not perform the reserved action
// and reverses the effects of this Reservation on the rate limit as much as possible
func (r *NoopReservationImpl) Cancel() {
// noop
}

// CancelAt indicates that the reservation holder will not perform the reserved action
// and reverses the effects of this Reservation on the rate limit as much as possible
func (r *NoopReservationImpl) CancelAt(_ time.Time) {
// noop
}

// Delay returns the duration for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
func (r *NoopReservationImpl) Delay() time.Duration {
return time.Duration(0) // no delay
}

// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
func (r *NoopReservationImpl) DelayFrom(_ time.Time) time.Duration {
return time.Duration(0) // no delay
}
2 changes: 2 additions & 0 deletions common/quotas/priority_rate_limiter_impl.go
Expand Up @@ -44,6 +44,8 @@ type (
}
)

var _ RequestRateLimiter = (*PriorityRateLimiterImpl)(nil)

// NewPriorityRateLimiter returns a new rate limiter that can handle dynamic
// configuration updates
func NewPriorityRateLimiter(
Expand Down
22 changes: 0 additions & 22 deletions common/quotas/rate_limiter.go
Expand Up @@ -66,26 +66,4 @@ type (
// Burst returns the burst for this rate limiter
Burst() int
}

// Reservation holds information about events that are permitted by a Limiter to happen after a delay
Reservation interface {
// OK returns whether the limiter can provide the requested number of tokens
OK() bool

// Cancel indicates that the reservation holder will not perform the reserved action
// and reverses the effects of this Reservation on the rate limit as much as possible
Cancel()

// CancelAt indicates that the reservation holder will not perform the reserved action
// and reverses the effects of this Reservation on the rate limit as much as possible
CancelAt(now time.Time)

// Delay returns the duration for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
Delay() time.Duration

// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
DelayFrom(now time.Time) time.Duration
}
)
89 changes: 0 additions & 89 deletions common/quotas/rate_limiter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 76f55e6

Please sign in to comment.