Skip to content

Commit

Permalink
add poller autoscaling in activity and decision workers (#1186)
Browse files Browse the repository at this point in the history
* add pollerAutoScalerOptions

* add tests for options

* change autoscaler interface to include Stop method

* add poller auto scaler to task worker

* gracefully stop autoscaler

* move up autoscaler in pollTask

* add unit tests
  • Loading branch information
shijiesheng committed Sep 2, 2022
1 parent d94db89 commit 3a730da
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 51 deletions.
6 changes: 3 additions & 3 deletions internal/common/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type (
// GetCurrent ResourceUnit of resource
GetCurrent() ResourceUnit
// Start starts the autoscaler go routine that scales the ResourceUnit according to Estimator
Start() DoneFunc
Start()
// Stop stops the autoscaler if started or do nothing if not yet started
Stop()
}
// DoneFunc func to turn off auto scaler
DoneFunc func()
)
54 changes: 37 additions & 17 deletions internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ import (
"go.uber.org/atomic"
"go.uber.org/cadence/internal/common/autoscaler"
"go.uber.org/zap"
"sync"
"time"
)

// defaultPollerScalerCooldownInSeconds
const (
defaultPollerScalerCooldownInSeconds = 120
defaultPollerAutoScalerCooldown = time.Minute
defaultPollerAutoScalerTargetUtilization = 0.6
defaultMinConcurrentPollerSize = 1
)

var (
Expand All @@ -50,6 +53,7 @@ type (
sem semaphore.Semaphore // resizable semaphore to control number of concurrent pollers
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup // graceful stop
recommender autoscaler.Recommender
onAutoScale []func() // hook functions that run post autoscale
}
Expand All @@ -60,31 +64,41 @@ type (
// This avoids unnecessary usage of CompareAndSwap
atomicBits *atomic.Uint64
}

pollerAutoScalerOptions struct {
Enabled bool
InitCount int
MinCount int
MaxCount int
Cooldown time.Duration
DryRun bool
TargetUtilization float64
}
)

func newPollerScaler(
initialPollerCount int,
minPollerCount int,
maxPollerCount int,
targetMilliUsage uint64,
isDryRun bool,
cooldownTime time.Duration,
options pollerAutoScalerOptions,
logger *zap.Logger,
hooks ...func()) *pollerAutoScaler {
ctx, cancel := context.WithCancel(context.Background())
if !options.Enabled {
return nil
}

return &pollerAutoScaler{
isDryRun: isDryRun,
cooldownTime: cooldownTime,
isDryRun: options.DryRun,
cooldownTime: options.Cooldown,
logger: logger,
sem: semaphore.New(initialPollerCount),
sem: semaphore.New(options.InitCount),
wg: &sync.WaitGroup{},
ctx: ctx,
cancel: cancel,
pollerUsageEstimator: pollerUsageEstimator{atomicBits: atomic.NewUint64(0)},
recommender: autoscaler.NewLinearRecommender(
autoscaler.ResourceUnit(minPollerCount),
autoscaler.ResourceUnit(maxPollerCount),
autoscaler.ResourceUnit(options.MinCount),
autoscaler.ResourceUnit(options.MaxCount),
autoscaler.Usages{
autoscaler.PollerUtilizationRate: autoscaler.MilliUsage(targetMilliUsage),
autoscaler.PollerUtilizationRate: autoscaler.MilliUsage(options.TargetUtilization * 1000),
},
),
onAutoScale: hooks,
Expand All @@ -107,9 +121,11 @@ func (p *pollerAutoScaler) GetCurrent() autoscaler.ResourceUnit {
}

// Start an auto-scaler go routine and returns a done to stop it
func (p *pollerAutoScaler) Start() autoscaler.DoneFunc {
func (p *pollerAutoScaler) Start() {
logger := p.logger.Sugar()
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-p.ctx.Done():
Expand Down Expand Up @@ -139,9 +155,13 @@ func (p *pollerAutoScaler) Start() autoscaler.DoneFunc {
}
}
}()
return func() {
p.cancel()
}
return
}

// Stop stops the poller autoscaler
func (p *pollerAutoScaler) Stop() {
p.cancel()
p.wg.Wait()
}

// Reset metrics from the start
Expand Down
29 changes: 21 additions & 8 deletions internal/internal_poller_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

func Test_pollerAutoscaler(t *testing.T) {
type args struct {
disabled bool
noTaskPoll, taskPoll, unrelated int
initialPollerCount int
minPollerCount int
Expand Down Expand Up @@ -146,24 +147,36 @@ func Test_pollerAutoscaler(t *testing.T) {
},
want: 6,
},
{
name: "disabled",
args: args{disabled: true},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
autoscalerEpoch := atomic.NewUint64(0)
pollerScaler := newPollerScaler(
tt.args.initialPollerCount,
tt.args.minPollerCount,
tt.args.maxPollerCount,
tt.args.targetMilliUsage,
tt.args.isDryRun,
tt.args.cooldownTime,
pollerAutoScalerOptions{
Enabled: !tt.args.disabled,
InitCount: tt.args.initialPollerCount,
MinCount: tt.args.minPollerCount,
MaxCount: tt.args.maxPollerCount,
Cooldown: tt.args.cooldownTime,
DryRun: tt.args.isDryRun,
TargetUtilization: float64(tt.args.targetMilliUsage) / 1000,
},
zaptest.NewLogger(t),
// hook function that collects number of iterations
func() {
autoscalerEpoch.Add(1)
})
pollerScalerDone := pollerScaler.Start()
if tt.args.disabled {
assert.Nil(t, pollerScaler)
return
}

pollerScaler.Start()

// simulate concurrent polling
pollChan := generateRandomPollResults(tt.args.noTaskPoll, tt.args.taskPoll, tt.args.unrelated)
Expand All @@ -183,7 +196,7 @@ func Test_pollerAutoscaler(t *testing.T) {
assert.Eventually(t, func() bool {
return autoscalerEpoch.Load() == uint64(tt.args.autoScalerEpoch)
}, tt.args.cooldownTime+20*time.Millisecond, 10*time.Millisecond)
pollerScalerDone()
pollerScaler.Stop()
res := pollerScaler.GetCurrent()
assert.Equal(t, tt.want, int(res))
})
Expand Down
1 change: 1 addition & 0 deletions internal/internal_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
type (
FeatureFlags struct {
WorkflowExecutionAlreadyCompletedErrorEnabled bool
PollerAutoScalerEnabled bool
}
)

Expand Down
30 changes: 30 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,15 @@ func newWorkflowTaskWorkerInternal(
params,
)
worker := newBaseWorker(baseWorkerOptions{
pollerAutoScaler: pollerAutoScalerOptions{
Enabled: params.FeatureFlags.PollerAutoScalerEnabled,
InitCount: params.MaxConcurrentDecisionTaskPollers,
MinCount: params.MinConcurrentDecisionTaskPollers,
MaxCount: params.MaxConcurrentDecisionTaskPollers,
Cooldown: params.PollerAutoScalerCooldown,
DryRun: params.PollerAutoScalerDryRun,
TargetUtilization: params.PollerAutoScalerTargetUtilization,
},
pollerCount: params.MaxConcurrentDecisionTaskPollers,
pollerRate: defaultPollerRate,
maxConcurrentTask: params.MaxConcurrentDecisionTaskExecutionSize,
Expand Down Expand Up @@ -443,6 +452,15 @@ func newActivityTaskWorker(
ensureRequiredParams(&workerParams)
base := newBaseWorker(
baseWorkerOptions{
pollerAutoScaler: pollerAutoScalerOptions{
Enabled: workerParams.FeatureFlags.PollerAutoScalerEnabled,
InitCount: workerParams.MaxConcurrentActivityTaskPollers,
MinCount: workerParams.MinConcurrentActivityTaskPollers,
MaxCount: workerParams.MaxConcurrentActivityTaskPollers,
Cooldown: workerParams.PollerAutoScalerCooldown,
DryRun: workerParams.PollerAutoScalerDryRun,
TargetUtilization: workerParams.PollerAutoScalerTargetUtilization,
},
pollerCount: workerParams.MaxConcurrentActivityTaskPollers,
pollerRate: defaultPollerRate,
maxConcurrentTask: workerParams.MaxConcurrentActivityExecutionSize,
Expand Down Expand Up @@ -1198,6 +1216,18 @@ func augmentWorkerOptions(options WorkerOptions) WorkerOptions {
if options.MaxConcurrentSessionExecutionSize == 0 {
options.MaxConcurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize
}
if options.MinConcurrentActivityTaskPollers == 0 {
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentPollerSize
}
if options.MinConcurrentDecisionTaskPollers == 0 {
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentPollerSize
}
if options.PollerAutoScalerCooldown == 0 {
options.PollerAutoScalerCooldown = defaultPollerAutoScalerCooldown
}
if options.PollerAutoScalerTargetUtilization == 0 {
options.PollerAutoScalerTargetUtilization = defaultPollerAutoScalerTargetUtilization
}

// if the user passes in a tracer then add a tracing context propagator
if options.Tracer != nil {
Expand Down
55 changes: 47 additions & 8 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type (

// baseWorkerOptions options to configure base worker.
baseWorkerOptions struct {
pollerAutoScaler pollerAutoScalerOptions
pollerCount int
pollerRate int
maxConcurrentTask int
Expand All @@ -133,6 +134,7 @@ type (
metricsScope tally.Scope

pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
}
Expand All @@ -156,15 +158,25 @@ func createPollRetryPolicy() backoff.RetryPolicy {

func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
ctx, cancel := context.WithCancel(context.Background())

var pollerAS *pollerAutoScaler
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
pollerAS = newPollerScaler(
pollerOptions,
logger,
)
}

bw := &baseWorker{
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.

limiterContext: ctx,
limiterContextCancel: cancel,
Expand All @@ -185,6 +197,10 @@ func (bw *baseWorker) Start() {

bw.metricsScope.Counter(metrics.WorkerStartCounter).Inc(1)

if bw.pollerAutoScaler != nil {
bw.pollerAutoScaler.Start()
}

for i := 0; i < bw.options.pollerCount; i++ {
bw.shutdownWG.Add(1)
go bw.runPoller()
Expand Down Expand Up @@ -255,9 +271,24 @@ func (bw *baseWorker) runTaskDispatcher() {
}
}

/*
There are three types of constraint on polling tasks:
1. poller auto scaler is to constraint number of concurrent pollers
2. retrier is a backoff constraint on errors
3. limiter is a per-second constraint
*/
func (bw *baseWorker) pollTask() {
var err error
var task interface{}

if bw.pollerAutoScaler != nil {
if pErr := bw.pollerAutoScaler.Acquire(1); pErr == nil {
defer bw.pollerAutoScaler.Release(1)
} else {
bw.logger.Warn("poller auto scaler acquire error", zap.Error(pErr))
}
}

bw.retrier.Throttle()
if bw.pollLimiter == nil || bw.pollLimiter.Wait(bw.limiterContext) == nil {
task, err = bw.options.taskWorker.PollTask()
Expand All @@ -273,6 +304,11 @@ func (bw *baseWorker) pollTask() {
}
bw.retrier.Failed()
} else {
if bw.pollerAutoScaler != nil {
if pErr := bw.pollerAutoScaler.CollectUsage(task); pErr != nil {
bw.logger.Warn("poller auto scaler collect usage error", zap.Error(pErr))
}
}
bw.retrier.Succeeded()
}
}
Expand Down Expand Up @@ -347,6 +383,9 @@ func (bw *baseWorker) Stop() {
}
close(bw.shutdownCh)
bw.limiterContextCancel()
if bw.pollerAutoScaler != nil {
bw.pollerAutoScaler.Stop()
}

if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
traceLog(func() {
Expand Down
Loading

0 comments on commit 3a730da

Please sign in to comment.