Skip to content

Commit

Permalink
Make archival iteration runtime into dynamic config (#1868)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 committed May 16, 2019
1 parent 93ba2c7 commit 2aa943b
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 3 deletions.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Expand Up @@ -187,6 +187,7 @@ var keys = map[Key]string{
WorkerArchiverConcurrency: "worker.ArchiverConcurrency",
WorkerArchivalsPerIteration: "worker.ArchivalsPerIteration",
WorkerDeterministicConstructionCheckProbability: "worker.DeterministicConstructionCheckProbability",
WorkerTimeLimitPerArchivalIteration: "worker.TimeLimitPerArchivalIteration",
WorkerThrottledLogRPS: "worker.throttledLogRPS",
ScannerPersistenceMaxQPS: "worker.scannerPersistenceMaxQPS",
}
Expand Down Expand Up @@ -482,6 +483,8 @@ const (
WorkerArchivalsPerIteration
// WorkerDeterministicConstructionCheckProbability controls the probability of running a deterministic construction check for any given archival
WorkerDeterministicConstructionCheckProbability
// WorkerTimeLimitPerArchivalIteration controls the time limit of each iteration of archival workflow
WorkerTimeLimitPerArchivalIteration
// WorkerThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger
WorkerThrottledLogRPS
// ScannerPersistenceMaxQPS is the maximum rate of persistence calls from worker.Scanner
Expand Down
1 change: 1 addition & 0 deletions service/worker/archiver/client_worker.go
Expand Up @@ -73,6 +73,7 @@ type (
ArchiverConcurrency dynamicconfig.IntPropertyFn
ArchivalsPerIteration dynamicconfig.IntPropertyFn
DeterministicConstructionCheckProbability dynamicconfig.FloatPropertyFn
TimeLimitPerArchivalIteration dynamicconfig.DurationPropertyFn
}

contextKey int
Expand Down
6 changes: 6 additions & 0 deletions service/worker/archiver/util.go
Expand Up @@ -28,6 +28,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/dgryski/go-farm"
"github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -111,6 +112,11 @@ func IsLast(tags map[string]string) bool {
return ok && last == "true"
}

// MaxArchivalIterationTimeout returns the max allowed timeout for a single iteration of archival workflow
func MaxArchivalIterationTimeout() time.Duration {
return workflowStartToCloseTimeout / 2
}

func modifyBlobForConstCheck(historyBlob *HistoryBlob, existingTags map[string]string) {
historyBlob.Header.UploadCluster = common.StringPtr(existingTags["upload_cluster"])
historyBlob.Header.UploadDateTime = common.StringPtr(existingTags["upload_date_time"])
Expand Down
11 changes: 10 additions & 1 deletion service/worker/archiver/workflow.go
Expand Up @@ -21,6 +21,8 @@
package archiver

import (
"time"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -31,6 +33,7 @@ import (
type dynamicConfigResult struct {
ArchiverConcurrency int
ArchivalsPerIteration int
TimelimitPerIteration time.Duration
}

func archivalWorkflow(ctx workflow.Context, carryover []ArchiveRequest) error {
Expand Down Expand Up @@ -62,9 +65,15 @@ func archivalWorkflowHelper(
_ = workflow.SideEffect(
ctx,
func(ctx workflow.Context) interface{} {
timeLimit := config.TimeLimitPerArchivalIteration()
maxTimeLimit := MaxArchivalIterationTimeout()
if timeLimit > maxTimeLimit {
timeLimit = maxTimeLimit
}
return dynamicConfigResult{
ArchiverConcurrency: config.ArchiverConcurrency(),
ArchivalsPerIteration: config.ArchivalsPerIteration(),
TimelimitPerIteration: timeLimit,
}
}).Get(&dcResult)
requestCh := workflow.NewBufferedChannel(ctx, dcResult.ArchivalsPerIteration)
Expand All @@ -75,7 +84,7 @@ func archivalWorkflowHelper(
archiver.Start()
signalCh := workflow.GetSignalChannel(ctx, signalName)
if pump == nil {
pump = NewPump(ctx, logger, metricsClient, carryover, workflowStartToCloseTimeout/2, dcResult.ArchivalsPerIteration, requestCh, signalCh)
pump = NewPump(ctx, logger, metricsClient, carryover, dcResult.TimelimitPerIteration, dcResult.ArchivalsPerIteration, requestCh, signalCh)
}
pumpResult := pump.Run()
metricsClient.AddCounter(metrics.ArchiverArchivalWorkflowScope, metrics.ArchiverNumPumpedRequestsCount, int64(len(pumpResult.PumpedHashes)))
Expand Down
5 changes: 3 additions & 2 deletions service/worker/archiver/workflow_test.go
Expand Up @@ -63,8 +63,9 @@ func (s *workflowSuite) SetupTest() {
workflowTestArchiver = &MockArchiver{}
workflowTestPump = &PumpMock{}
workflowTestConfig = &Config{
ArchiverConcurrency: dynamicconfig.GetIntPropertyFn(0),
ArchivalsPerIteration: dynamicconfig.GetIntPropertyFn(0),
ArchiverConcurrency: dynamicconfig.GetIntPropertyFn(0),
ArchivalsPerIteration: dynamicconfig.GetIntPropertyFn(0),
TimeLimitPerArchivalIteration: dynamicconfig.GetDurationPropertyFn(MaxArchivalIterationTimeout()),
}
}

Expand Down
1 change: 1 addition & 0 deletions service/worker/service.go
Expand Up @@ -102,6 +102,7 @@ func NewConfig(params *service.BootstrapParams) *Config {
ArchiverConcurrency: dc.GetIntProperty(dynamicconfig.WorkerArchiverConcurrency, 50),
ArchivalsPerIteration: dc.GetIntProperty(dynamicconfig.WorkerArchivalsPerIteration, 1000),
DeterministicConstructionCheckProbability: dc.GetFloat64Property(dynamicconfig.WorkerDeterministicConstructionCheckProbability, 0.002),
TimeLimitPerArchivalIteration: dc.GetDurationProperty(dynamicconfig.WorkerTimeLimitPerArchivalIteration, archiver.MaxArchivalIterationTimeout()),
},
IndexerCfg: &indexer.Config{
IndexerConcurrency: dc.GetIntProperty(dynamicconfig.WorkerIndexerConcurrency, 1000),
Expand Down

0 comments on commit 2aa943b

Please sign in to comment.