From fc8a39d439ea08b006fb7ea1db1058175cc7adf5 Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Tue, 4 Feb 2020 11:34:15 -0800 Subject: [PATCH] implement backoff manager --- .../k8s.io/apimachinery/pkg/util/wait/BUILD | 10 +- .../k8s.io/apimachinery/pkg/util/wait/wait.go | 123 ++++++++++++++---- .../apimachinery/pkg/util/wait/wait_test.go | 37 ++++++ 3 files changed, 144 insertions(+), 26 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/BUILD b/staging/src/k8s.io/apimachinery/pkg/util/wait/BUILD index 7f24e9243193..de7957e34761 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/BUILD @@ -10,7 +10,10 @@ go_test( name = "go_default_test", srcs = ["wait_test.go"], embed = [":go_default_library"], - deps = ["//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + ], ) go_library( @@ -21,7 +24,10 @@ go_library( ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/wait", importpath = "k8s.io/apimachinery/pkg/util/wait", - deps = ["//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + ], ) filegroup( diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go index 76c8c3d23cb4..4cb0c122c0cf 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -19,10 +19,12 @@ package wait import ( "context" "errors" + "math" "math/rand" "sync" "time" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/runtime" ) @@ -128,9 +130,15 @@ func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), pe // Close stopCh to stop. f may not be invoked if stop channel is already // closed. Pass NeverStop to if you don't want it stop. func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { - var t *time.Timer - var sawTimeout bool + BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh) +} +// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager. +// +// If sliding is true, the period is computed after f runs. If it is false then +// period includes the runtime for f. +func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) { + var t clock.Timer for { select { case <-stopCh: @@ -138,13 +146,8 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b default: } - jitteredPeriod := period - if jitterFactor > 0.0 { - jitteredPeriod = Jitter(period, jitterFactor) - } - if !sliding { - t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout) + t = backoff.Backoff() } func() { @@ -153,7 +156,7 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b }() if sliding { - t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout) + t = backoff.Backoff() } // NOTE: b/c there is no priority selection in golang @@ -164,8 +167,7 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b select { case <-stopCh: return - case <-t.C: - sawTimeout = true + case <-t.C(): } } } @@ -283,6 +285,92 @@ func contextForChannel(parentCh <-chan struct{}) (context.Context, context.Cance return ctx, cancel } +// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides +// an interface to return a timer for backoff, and caller shall backoff until Timer.C returns. If the second Backoff() +// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained. +// The BackoffManager is supposed to be called in a single-threaded environment. +type BackoffManager interface { + Backoff() clock.Timer +} + +type exponentialBackoffManagerImpl struct { + backoff *Backoff + backoffTimer clock.Timer + lastBackoffStart time.Time + initialBackoff time.Duration + backoffResetDuration time.Duration + clock clock.Clock +} + +// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and +// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset. +// This backoff manager is used to reduce load during upstream unhealthiness. +func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager { + return &exponentialBackoffManagerImpl{ + backoff: &Backoff{ + Duration: initBackoff, + Factor: backoffFactor, + Jitter: jitter, + + // the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not + // what we ideally need here, we set it to max int and assume we will never use up the steps + Steps: math.MaxInt32, + Cap: maxBackoff, + }, + backoffTimer: c.NewTimer(0), + initialBackoff: initBackoff, + lastBackoffStart: c.Now(), + backoffResetDuration: resetDuration, + clock: c, + } +} + +func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration { + if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration { + b.backoff.Steps = math.MaxInt32 + b.backoff.Duration = b.initialBackoff + } + b.lastBackoffStart = b.clock.Now() + return b.backoff.Step() +} + +// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for backoff. +func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer { + b.backoffTimer.Reset(b.getNextBackoff()) + return b.backoffTimer +} + +type jitteredBackoffManagerImpl struct { + clock clock.Clock + duration time.Duration + jitter float64 + backoffTimer clock.Timer +} + +// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter +// is negative, backoff will not be jittered. +func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager { + return &jitteredBackoffManagerImpl{ + clock: c, + duration: duration, + jitter: jitter, + backoffTimer: c.NewTimer(0), + } +} + +func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration { + jitteredPeriod := j.duration + if j.jitter > 0.0 { + jitteredPeriod = Jitter(j.duration, j.jitter) + } + return jitteredPeriod +} + +func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer { + j.backoffTimer.Reset(j.getNextBackoff()) + return j.backoffTimer +} + // ExponentialBackoff repeats a condition check with exponential backoff. // // It repeatedly checks the condition and then sleeps, using `backoff.Step()` @@ -503,16 +591,3 @@ func poller(interval, timeout time.Duration) WaitFunc { return ch }) } - -// resetOrReuseTimer avoids allocating a new timer if one is already in use. -// Not safe for multiple threads. -func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer { - if t == nil { - return time.NewTimer(d) - } - if !t.Stop() && !sawTimeout { - <-t.C - } - t.Reset(d) - return t -} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go index 8cad6eaf9392..2e07df8cbdbd 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/runtime" ) @@ -694,3 +695,39 @@ func TestContextForChannel(t *testing.T) { t.Errorf("unexepcted timeout waiting for parent to cancel child contexts") } } + +func TestExponentialBackoffManagerGetNextBackoff(t *testing.T) { + fc := clock.NewFakeClock(time.Now()) + backoff := NewExponentialBackoffManager(1, 10, 10, 2.0, 0.0, fc) + durations := []time.Duration{1, 2, 4, 8, 10, 10, 10} + for i := 0; i < len(durations); i++ { + generatedBackoff := backoff.(*exponentialBackoffManagerImpl).getNextBackoff() + if generatedBackoff != durations[i] { + t.Errorf("unexpected %d-th backoff: %d, expecting %d", i, generatedBackoff, durations[i]) + } + } + + fc.Step(11) + resetDuration := backoff.(*exponentialBackoffManagerImpl).getNextBackoff() + if resetDuration != 1 { + t.Errorf("after reset, backoff should be 1, but got %d", resetDuration) + } +} + +func TestJitteredBackoffManagerGetNextBackoff(t *testing.T) { + // positive jitter + backoffMgr := NewJitteredBackoffManager(1, 1, clock.NewFakeClock(time.Now())) + for i := 0; i < 5; i++ { + backoff := backoffMgr.(*jitteredBackoffManagerImpl).getNextBackoff() + if backoff < 1 || backoff > 2 { + t.Errorf("backoff out of range: %d", backoff) + } + } + + // negative jitter, shall be a fixed backoff + backoffMgr = NewJitteredBackoffManager(1, -1, clock.NewFakeClock(time.Now())) + backoff := backoffMgr.(*jitteredBackoffManagerImpl).getNextBackoff() + if backoff != 1 { + t.Errorf("backoff should be 1, but got %d", backoff) + } +}