Skip to content

Commit

Permalink
Rate limit archive request (#1861)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 17, 2019
1 parent 56823be commit c0dfc4f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 1 deletion.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Expand Up @@ -165,6 +165,7 @@ var keys = map[Key]string{
AdminOperationToken: "history.adminOperationToken",
EnableEventsV2: "history.enableEventsV2",
NumArchiveSystemWorkflows: "history.numArchiveSystemWorkflows",
ArchiveRequestRPS: "history.archiveRequestRPS",
EmitShardDiffLog: "history.emitShardDiffLog",
HistoryThrottledLogRPS: "history.throttledLogRPS",

Expand Down Expand Up @@ -430,6 +431,8 @@ const (
DefaultEventEncoding
// NumArchiveSystemWorkflows is key for number of archive system workflows running in total
NumArchiveSystemWorkflows
// ArchiveRequestRPS is the rate limit on the number of archive request per second
ArchiveRequestRPS

// EnableAdminProtection is whether to enable admin checking
EnableAdminProtection
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Expand Up @@ -174,7 +174,7 @@ func NewEngineWithShardContext(
metricsClient: shard.GetMetricsClient(),
historyEventNotifier: historyEventNotifier,
config: config,
archivalClient: archiver.NewClient(shard.GetMetricsClient(), shard.GetLogger(), publicClient, shard.GetConfig().NumArchiveSystemWorkflows),
archivalClient: archiver.NewClient(shard.GetMetricsClient(), shard.GetLogger(), publicClient, shard.GetConfig().NumArchiveSystemWorkflows, shard.GetConfig().ArchiveRequestRPS()),
}

txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient, logger)
Expand Down
2 changes: 2 additions & 0 deletions service/history/service.go
Expand Up @@ -135,6 +135,7 @@ type Config struct {
EnableEventsV2 dynamicconfig.BoolPropertyFnWithDomainFilter

NumArchiveSystemWorkflows dynamicconfig.IntPropertyFn
ArchiveRequestRPS dynamicconfig.IntPropertyFn

BlobSizeLimitError dynamicconfig.IntPropertyFnWithDomainFilter
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithDomainFilter
Expand Down Expand Up @@ -223,6 +224,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, enableVisibilit
EnableEventsV2: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, true),

NumArchiveSystemWorkflows: dc.GetIntProperty(dynamicconfig.NumArchiveSystemWorkflows, 1000),
ArchiveRequestRPS: dc.GetIntProperty(dynamicconfig.ArchiveRequestRPS, 300), // should be much smaller than frontend RPS

BlobSizeLimitError: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitError, 256*1024),
Expand Down
15 changes: 15 additions & 0 deletions service/worker/archiver/client.go
Expand Up @@ -22,14 +22,17 @@ package archiver

import (
"context"
"errors"
"fmt"
"math/rand"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/common/tokenbucket"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
cclient "go.uber.org/cadence/client"
)
Expand Down Expand Up @@ -57,27 +60,39 @@ type (
logger log.Logger
cadenceClient cclient.Client
numWorkflows dynamicconfig.IntPropertyFn
rateLimiter tokenbucket.TokenBucket
}
)

const tooManyRequestsErrMsg = "Too many requests to archival workflow"

// NewClient creates a new Client
func NewClient(
metricsClient metrics.Client,
logger log.Logger,
publicClient workflowserviceclient.Interface,
numWorkflows dynamicconfig.IntPropertyFn,
requestRPS int,
) Client {
return &client{
metricsClient: metricsClient,
logger: logger,
cadenceClient: cclient.NewClient(publicClient, common.SystemDomainName, &cclient.Options{}),
numWorkflows: numWorkflows,
rateLimiter: tokenbucket.New(requestRPS, clock.NewRealTimeSource()),
}
}

// Archive starts an archival task
func (c *client) Archive(request *ArchiveRequest) error {
c.metricsClient.IncCounter(metrics.ArchiverClientScope, metrics.CadenceRequests)

if ok, _ := c.rateLimiter.TryConsume(1); !ok {
c.logger.Error(tooManyRequestsErrMsg)
c.metricsClient.IncCounter(metrics.ArchiverClientScope, metrics.CadenceErrServiceBusyCounter)
return errors.New(tooManyRequestsErrMsg)
}

workflowID := fmt.Sprintf("%v-%v", workflowIDPrefix, rand.Intn(c.numWorkflows()))
workflowOptions := cclient.StartWorkflowOptions{
ID: workflowID,
Expand Down

0 comments on commit c0dfc4f

Please sign in to comment.