Skip to content

Commit

Permalink
implement backoff manager
Browse files Browse the repository at this point in the history
  • Loading branch information
zhan849 committed Feb 4, 2020
1 parent 4b29407 commit fc8a39d
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 26 deletions.
10 changes: 8 additions & 2 deletions staging/src/k8s.io/apimachinery/pkg/util/wait/BUILD
Expand Up @@ -10,7 +10,10 @@ go_test(
name = "go_default_test",
srcs = ["wait_test.go"],
embed = [":go_default_library"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
)

go_library(
Expand All @@ -21,7 +24,10 @@ go_library(
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/wait",
importpath = "k8s.io/apimachinery/pkg/util/wait",
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
)

filegroup(
Expand Down
123 changes: 99 additions & 24 deletions staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go
Expand Up @@ -19,10 +19,12 @@ package wait
import (
"context"
"errors"
"math"
"math/rand"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/runtime"
)

Expand Down Expand Up @@ -128,23 +130,24 @@ func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), pe
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
var t *time.Timer
var sawTimeout bool
BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
}

// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
var t clock.Timer
for {
select {
case <-stopCh:
return
default:
}

jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}

if !sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
t = backoff.Backoff()
}

func() {
Expand All @@ -153,7 +156,7 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b
}()

if sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
t = backoff.Backoff()
}

// NOTE: b/c there is no priority selection in golang
Expand All @@ -164,8 +167,7 @@ func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding b
select {
case <-stopCh:
return
case <-t.C:
sawTimeout = true
case <-t.C():
}
}
}
Expand Down Expand Up @@ -283,6 +285,92 @@ func contextForChannel(parentCh <-chan struct{}) (context.Context, context.Cance
return ctx, cancel
}

// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides
// an interface to return a timer for backoff, and caller shall backoff until Timer.C returns. If the second Backoff()
// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained.
// The BackoffManager is supposed to be called in a single-threaded environment.
type BackoffManager interface {
Backoff() clock.Timer
}

type exponentialBackoffManagerImpl struct {
backoff *Backoff
backoffTimer clock.Timer
lastBackoffStart time.Time
initialBackoff time.Duration
backoffResetDuration time.Duration
clock clock.Clock
}

// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
// This backoff manager is used to reduce load during upstream unhealthiness.
func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
return &exponentialBackoffManagerImpl{
backoff: &Backoff{
Duration: initBackoff,
Factor: backoffFactor,
Jitter: jitter,

// the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
// what we ideally need here, we set it to max int and assume we will never use up the steps
Steps: math.MaxInt32,
Cap: maxBackoff,
},
backoffTimer: c.NewTimer(0),
initialBackoff: initBackoff,
lastBackoffStart: c.Now(),
backoffResetDuration: resetDuration,
clock: c,
}
}

func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
b.backoff.Steps = math.MaxInt32
b.backoff.Duration = b.initialBackoff
}
b.lastBackoffStart = b.clock.Now()
return b.backoff.Step()
}

// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for backoff.
func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
b.backoffTimer.Reset(b.getNextBackoff())
return b.backoffTimer
}

type jitteredBackoffManagerImpl struct {
clock clock.Clock
duration time.Duration
jitter float64
backoffTimer clock.Timer
}

// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
// is negative, backoff will not be jittered.
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
return &jitteredBackoffManagerImpl{
clock: c,
duration: duration,
jitter: jitter,
backoffTimer: c.NewTimer(0),
}
}

func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
jitteredPeriod := j.duration
if j.jitter > 0.0 {
jitteredPeriod = Jitter(j.duration, j.jitter)
}
return jitteredPeriod
}

func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
j.backoffTimer.Reset(j.getNextBackoff())
return j.backoffTimer
}

// ExponentialBackoff repeats a condition check with exponential backoff.
//
// It repeatedly checks the condition and then sleeps, using `backoff.Step()`
Expand Down Expand Up @@ -503,16 +591,3 @@ func poller(interval, timeout time.Duration) WaitFunc {
return ch
})
}

// resetOrReuseTimer avoids allocating a new timer if one is already in use.
// Not safe for multiple threads.
func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
if t == nil {
return time.NewTimer(d)
}
if !t.Stop() && !sawTimeout {
<-t.C
}
t.Reset(d)
return t
}
37 changes: 37 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go
Expand Up @@ -26,6 +26,7 @@ import (
"testing"
"time"

"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/runtime"
)

Expand Down Expand Up @@ -694,3 +695,39 @@ func TestContextForChannel(t *testing.T) {
t.Errorf("unexepcted timeout waiting for parent to cancel child contexts")
}
}

func TestExponentialBackoffManagerGetNextBackoff(t *testing.T) {
fc := clock.NewFakeClock(time.Now())
backoff := NewExponentialBackoffManager(1, 10, 10, 2.0, 0.0, fc)
durations := []time.Duration{1, 2, 4, 8, 10, 10, 10}
for i := 0; i < len(durations); i++ {
generatedBackoff := backoff.(*exponentialBackoffManagerImpl).getNextBackoff()
if generatedBackoff != durations[i] {
t.Errorf("unexpected %d-th backoff: %d, expecting %d", i, generatedBackoff, durations[i])
}
}

fc.Step(11)
resetDuration := backoff.(*exponentialBackoffManagerImpl).getNextBackoff()
if resetDuration != 1 {
t.Errorf("after reset, backoff should be 1, but got %d", resetDuration)
}
}

func TestJitteredBackoffManagerGetNextBackoff(t *testing.T) {
// positive jitter
backoffMgr := NewJitteredBackoffManager(1, 1, clock.NewFakeClock(time.Now()))
for i := 0; i < 5; i++ {
backoff := backoffMgr.(*jitteredBackoffManagerImpl).getNextBackoff()
if backoff < 1 || backoff > 2 {
t.Errorf("backoff out of range: %d", backoff)
}
}

// negative jitter, shall be a fixed backoff
backoffMgr = NewJitteredBackoffManager(1, -1, clock.NewFakeClock(time.Now()))
backoff := backoffMgr.(*jitteredBackoffManagerImpl).getNextBackoff()
if backoff != 1 {
t.Errorf("backoff should be 1, but got %d", backoff)
}
}

0 comments on commit fc8a39d

Please sign in to comment.