Skip to content

Commit

Permalink
Core persistence priority rate limiting (#3139)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Jul 29, 2022
1 parent d186259 commit 7d9b262
Show file tree
Hide file tree
Showing 77 changed files with 1,189 additions and 375 deletions.
10 changes: 6 additions & 4 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
"sync/atomic"
"time"

"go.temporal.io/server/common/dynamicconfig"

"go.temporal.io/server/common"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/persistence"
Expand Down Expand Up @@ -227,11 +227,13 @@ func (m *metadataImpl) Start() {
return
}

err := m.refreshClusterMetadata(context.Background())
// TODO: specify a timeout for the context
ctx := headers.SetCallerInfo(context.TODO(), headers.NewCallerInfo(headers.CallerTypeBackground))
err := m.refreshClusterMetadata(ctx)
if err != nil {
m.logger.Fatal("Unable to initialize cluster metadata cache", tag.Error(err))
}
m.refresher = goro.Go(context.Background(), m.refreshLoop)
m.refresher = goro.Go(ctx, m.refreshLoop)
}

func (m *metadataImpl) Stop() {
Expand Down
8 changes: 8 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ const (
FrontendPersistenceMaxQPS = "frontend.persistenceMaxQPS"
// FrontendPersistenceGlobalMaxQPS is the max qps frontend cluster can query DB
FrontendPersistenceGlobalMaxQPS = "frontend.persistenceGlobalMaxQPS"
// FrontendEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in frontend persistence client
FrontendEnablePersistencePriorityRateLimiting = "frontend.enablePersistencePriorityRateLimiting"
// FrontendVisibilityMaxPageSize is default max size for ListWorkflowExecutions in one page
FrontendVisibilityMaxPageSize = "frontend.visibilityMaxPageSize"
// FrontendESIndexMaxResultWindow is ElasticSearch index setting max_result_window
Expand Down Expand Up @@ -239,6 +241,8 @@ const (
MatchingPersistenceMaxQPS = "matching.persistenceMaxQPS"
// MatchingPersistenceGlobalMaxQPS is the max qps matching cluster can query DB
MatchingPersistenceGlobalMaxQPS = "matching.persistenceGlobalMaxQPS"
// MatchingEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in matching persistence client
MatchingEnablePersistencePriorityRateLimiting = "matching.enablePersistencePriorityRateLimiting"
// MatchingMinTaskThrottlingBurstSize is the minimum burst size for task queue throttling
MatchingMinTaskThrottlingBurstSize = "matching.minTaskThrottlingBurstSize"
// MatchingGetTasksBatchSize is the maximum batch size to fetch from the task buffer
Expand Down Expand Up @@ -284,6 +288,8 @@ const (
HistoryPersistenceMaxQPS = "history.persistenceMaxQPS"
// HistoryPersistenceGlobalMaxQPS is the max qps history cluster can query DB
HistoryPersistenceGlobalMaxQPS = "history.persistenceGlobalMaxQPS"
// HistoryEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in history persistence client
HistoryEnablePersistencePriorityRateLimiting = "history.enablePersistencePriorityRateLimiting"
// HistoryLongPollExpirationInterval is the long poll expiration interval in the history service
HistoryLongPollExpirationInterval = "history.longPollExpirationInterval"
// HistoryCacheInitialSize is initial size of history cache
Expand Down Expand Up @@ -575,6 +581,8 @@ const (
WorkerPersistenceMaxQPS = "worker.persistenceMaxQPS"
// WorkerPersistenceGlobalMaxQPS is the max qps worker cluster can query DB
WorkerPersistenceGlobalMaxQPS = "worker.persistenceGlobalMaxQPS"
// WorkerEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in worker persistence client
WorkerEnablePersistencePriorityRateLimiting = "worker.enablePersistencePriorityRateLimiting"
// WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time
WorkerIndexerConcurrency = "worker.indexerConcurrency"
// WorkerESProcessorNumOfWorkers is num of workers for esProcessor
Expand Down
79 changes: 79 additions & 0 deletions common/headers/caller_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package headers

import (
"context"

"google.golang.org/grpc/metadata"
)

const (
CallerTypeAPI = "api"
CallerTypeBackground = "background"
)

type CallerInfo struct {
CallerType string

// TODO: add fields for CallerName and CallerInitiation
}

func NewCallerInfo(
callerType string,
) CallerInfo {
return CallerInfo{
CallerType: callerType,
}
}

// SetCallerInfo sets callerName and callerType value in incoming context
// if not already exists.
// TODO: consider only set the caller info to golang context instead of grpc metadata
// and propagate to grpc outgoing context upon making an rpc call
func SetCallerInfo(
ctx context.Context,
info CallerInfo,
) context.Context {
mdIncoming, ok := metadata.FromIncomingContext(ctx)
if !ok {
mdIncoming = metadata.MD{}
}

if len(mdIncoming.Get(callerTypeHeaderName)) == 0 {
mdIncoming.Set(callerTypeHeaderName, string(info.CallerType))
}

return metadata.NewIncomingContext(ctx, mdIncoming)
}

func GetCallerInfo(
ctx context.Context,
) CallerInfo {
values := GetValues(ctx, callerTypeHeaderName)
return CallerInfo{
CallerType: values[0],
}
}
98 changes: 98 additions & 0 deletions common/headers/caller_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package headers

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/metadata"
)

type (
callerInfoSuite struct {
*require.Assertions
suite.Suite
}
)

func TestCallerInfoSuite(t *testing.T) {
suite.Run(t, &callerInfoSuite{})
}

func (s *callerInfoSuite) SetupTest() {
s.Assertions = require.New(s.T())
}

func (s *callerInfoSuite) TestSetCallerInfo_PreserveOtherValues() {
existingKey := "key"
existingValue := "value"
callerType := CallerTypeAPI

ctx := metadata.NewIncomingContext(
context.Background(),
metadata.Pairs(existingKey, existingValue),
)

ctx = SetCallerInfo(ctx, NewCallerInfo(callerType))

md, ok := metadata.FromIncomingContext(ctx)
s.True(ok)
s.Equal(existingValue, md.Get(existingKey)[0])
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Len(md, 2)
}

func (s *callerInfoSuite) TestSetCallerInfo_NoExistingCallerInfo() {
callerType := CallerTypeAPI

ctx := SetCallerInfo(context.Background(), CallerInfo{
CallerType: callerType,
})

md, ok := metadata.FromIncomingContext(ctx)
s.True(ok)
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Len(md, 1)
}

func (s *callerInfoSuite) TestSetCallerInfo_WithExistingCallerInfo() {
callerType := CallerTypeAPI

ctx := SetCallerInfo(context.Background(), CallerInfo{
CallerType: callerType,
})

ctx = SetCallerInfo(ctx, CallerInfo{
CallerType: CallerTypeBackground,
})

md, ok := metadata.FromIncomingContext(ctx)
s.True(ok)
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Len(md, 1)
}
28 changes: 4 additions & 24 deletions common/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
SupportedServerVersionsHeaderName = "supported-server-versions"
SupportedFeaturesHeaderName = "supported-features"
SupportedFeaturesHeaderDelim = ","

callerTypeHeaderName = "caller-type"
)

var (
Expand All @@ -45,14 +47,8 @@ var (
ClientVersionHeaderName,
SupportedServerVersionsHeaderName,
SupportedFeaturesHeaderName,
callerTypeHeaderName,
}

internalVersionHeaders = metadata.New(map[string]string{
ClientNameHeaderName: ClientNameServer,
ClientVersionHeaderName: ServerVersion,
SupportedServerVersionsHeaderName: SupportedServerVersions,
SupportedFeaturesHeaderName: AllFeatures,
})
)

// GetValues returns header values for passed header names.
Expand All @@ -70,7 +66,7 @@ func GetValues(ctx context.Context, headerNames ...string) []string {
}

// Propagate propagates version headers from incoming context to outgoing context.
// It copies all version headers to outgoing context only if they are exist in incoming context
// It copies all headers to outgoing context only if they are exist in incoming context
// and doesn't exist in outgoing context already.
func Propagate(ctx context.Context) context.Context {
if mdIncoming, ok := metadata.FromIncomingContext(ctx); ok {
Expand All @@ -97,22 +93,6 @@ func Propagate(ctx context.Context) context.Context {
return ctx
}

// SetVersions sets headers for internal communications.
func SetVersions(ctx context.Context) context.Context {
return metadata.NewOutgoingContext(ctx, internalVersionHeaders)
}

// SetVersionsForTests sets headers as they would be received from the client.
// Must be used in tests only.
func SetVersionsForTests(ctx context.Context, clientVersion, clientName, supportedServerVersions, supportedFeatures string) context.Context {
return metadata.NewIncomingContext(ctx, metadata.New(map[string]string{
ClientNameHeaderName: clientName,
ClientVersionHeaderName: clientVersion,
SupportedServerVersionsHeaderName: supportedServerVersions,
SupportedFeaturesHeaderName: supportedFeatures,
}))
}

func getSingleHeaderValue(md metadata.MD, headerName string) string {
values := md.Get(headerName)
if len(values) == 0 {
Expand Down
25 changes: 25 additions & 0 deletions common/headers/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"strings"

"github.com/blang/semver/v4"
"google.golang.org/grpc/metadata"

"go.temporal.io/api/serviceerror"
)

Expand Down Expand Up @@ -69,6 +71,13 @@ var (
ClientNameServer: "<2.0.0",
ClientNameUI: "<3.0.0",
}

internalVersionHeaderPairs = []string{
ClientNameHeaderName, ClientNameServer,
ClientVersionHeaderName, ServerVersion,
SupportedServerVersionsHeaderName, SupportedServerVersions,
SupportedFeaturesHeaderName, AllFeatures,
}
)

type (
Expand Down Expand Up @@ -107,6 +116,22 @@ func GetClientNameAndVersion(ctx context.Context) (string, string) {
return clientName, clientVersion
}

// SetVersions sets headers for internal communications.
func SetVersions(ctx context.Context) context.Context {
return metadata.AppendToOutgoingContext(ctx, internalVersionHeaderPairs...)
}

// SetVersionsForTests sets headers as they would be received from the client.
// Must be used in tests only.
func SetVersionsForTests(ctx context.Context, clientVersion, clientName, supportedServerVersions, supportedFeatures string) context.Context {
return metadata.NewIncomingContext(ctx, metadata.New(map[string]string{
ClientNameHeaderName: clientName,
ClientVersionHeaderName: clientVersion,
SupportedServerVersionsHeaderName: supportedServerVersions,
SupportedFeaturesHeaderName: supportedFeatures,
}))
}

// ClientSupported returns an error if client is unsupported, nil otherwise.
func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersionCheck bool) error {
if !enableClientVersionCheck {
Expand Down

0 comments on commit 7d9b262

Please sign in to comment.