Skip to content

Commit

Permalink
Merge pull request #107826 from smarterclayton/context_wait
Browse files Browse the repository at this point in the history
wait: Introduce new methods that allow detection of context cancellation

Kubernetes-commit: 5469b170fe8717cb9fae8f12498cd1afd5586891
  • Loading branch information
k8s-publishing-bot committed Mar 15, 2023
2 parents 1281665 + 831cf05 commit 2da2e3c
Show file tree
Hide file tree
Showing 10 changed files with 1,537 additions and 166 deletions.
213 changes: 188 additions & 25 deletions pkg/util/wait/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package wait
import (
"context"
"math"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -51,33 +52,104 @@ type Backoff struct {
Cap time.Duration
}

// Step (1) returns an amount of time to sleep determined by the
// original Duration and Jitter and (2) mutates the provided Backoff
// to update its Steps and Duration.
// Step returns an amount of time to sleep determined by the original
// Duration and Jitter. The backoff is mutated to update its Steps and
// Duration. A nil Backoff always has a zero-duration step.
func (b *Backoff) Step() time.Duration {
if b.Steps < 1 {
if b.Jitter > 0 {
return Jitter(b.Duration, b.Jitter)
}
return b.Duration
if b == nil {
return 0
}
b.Steps--
var nextDuration time.Duration
nextDuration, b.Duration, b.Steps = delay(b.Steps, b.Duration, b.Cap, b.Factor, b.Jitter)
return nextDuration
}

// DelayFunc returns a function that will compute the next interval to
// wait given the arguments in b. It does not mutate the original backoff
// but the function is safe to use only from a single goroutine.
func (b Backoff) DelayFunc() DelayFunc {
steps := b.Steps
duration := b.Duration
cap := b.Cap
factor := b.Factor
jitter := b.Jitter

return func() time.Duration {
var nextDuration time.Duration
// jitter is applied per step and is not cumulative over multiple steps
nextDuration, duration, steps = delay(steps, duration, cap, factor, jitter)
return nextDuration
}
}

// calculate the next step
if b.Factor != 0 {
b.Duration = time.Duration(float64(b.Duration) * b.Factor)
if b.Cap > 0 && b.Duration > b.Cap {
b.Duration = b.Cap
b.Steps = 0
// Timer returns a timer implementation appropriate to this backoff's parameters
// for use with wait functions.
func (b Backoff) Timer() Timer {
if b.Steps > 1 || b.Jitter != 0 {
return &variableTimer{new: internalClock.NewTimer, fn: b.DelayFunc()}
}
if b.Duration > 0 {
return &fixedTimer{new: internalClock.NewTicker, interval: b.Duration}
}
return newNoopTimer()
}

// delay implements the core delay algorithm used in this package.
func delay(steps int, duration, cap time.Duration, factor, jitter float64) (_ time.Duration, next time.Duration, nextSteps int) {
// when steps is non-positive, do not alter the base duration
if steps < 1 {
if jitter > 0 {
return Jitter(duration, jitter), duration, 0
}
return duration, duration, 0
}
steps--

// calculate the next step's interval
if factor != 0 {
next = time.Duration(float64(duration) * factor)
if cap > 0 && next > cap {
next = cap
steps = 0
}
} else {
next = duration
}

if b.Jitter > 0 {
duration = Jitter(duration, b.Jitter)
// add jitter for this step
if jitter > 0 {
duration = Jitter(duration, jitter)
}
return duration

return duration, next, steps

}

// DelayWithReset returns a DelayFunc that will return the appropriate next interval to
// wait. Every resetInterval the backoff parameters are reset to their initial state.
// This method is safe to invoke from multiple goroutines, but all calls will advance
// the backoff state when Factor is set. If Factor is zero, this method is the same as
// invoking b.DelayFunc() since Steps has no impact without Factor. If resetInterval is
// zero no backoff will be performed as the same calling DelayFunc with a zero factor
// and steps.
func (b Backoff) DelayWithReset(c clock.Clock, resetInterval time.Duration) DelayFunc {
if b.Factor <= 0 {
return b.DelayFunc()
}
if resetInterval <= 0 {
b.Steps = 0
b.Factor = 0
return b.DelayFunc()
}
return (&backoffManager{
backoff: b,
initialBackoff: b,
resetInterval: resetInterval,

clock: c,
lastStart: c.Now(),
timer: nil,
}).Step
}

// Until loops until stop channel is closed, running f every period.
Expand Down Expand Up @@ -187,15 +259,65 @@ func JitterUntilWithContext(ctx context.Context, f func(context.Context), period
JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
}

// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides
// an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff()
// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in
// undetermined behavior.
// The BackoffManager is supposed to be called in a single-threaded environment.
// backoffManager provides simple backoff behavior in a threadsafe manner to a caller.
type backoffManager struct {
backoff Backoff
initialBackoff Backoff
resetInterval time.Duration

clock clock.Clock

lock sync.Mutex
lastStart time.Time
timer clock.Timer
}

// Step returns the expected next duration to wait.
func (b *backoffManager) Step() time.Duration {
b.lock.Lock()
defer b.lock.Unlock()

switch {
case b.resetInterval == 0:
b.backoff = b.initialBackoff
case b.clock.Now().Sub(b.lastStart) > b.resetInterval:
b.backoff = b.initialBackoff
b.lastStart = b.clock.Now()
}
return b.backoff.Step()
}

// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer
// for exponential backoff. The returned timer must be drained before calling Backoff() the second
// time.
func (b *backoffManager) Backoff() clock.Timer {
b.lock.Lock()
defer b.lock.Unlock()
if b.timer == nil {
b.timer = b.clock.NewTimer(b.Step())
} else {
b.timer.Reset(b.Step())
}
return b.timer
}

// Timer returns a new Timer instance that shares the clock and the reset behavior with all other
// timers.
func (b *backoffManager) Timer() Timer {
return DelayFunc(b.Step).Timer(b.clock)
}

// BackoffManager manages backoff with a particular scheme based on its underlying implementation.
type BackoffManager interface {
// Backoff returns a shared clock.Timer that is Reset on every invocation. This method is not
// safe for use from multiple threads. It returns a timer for backoff, and caller shall backoff
// until Timer.C() drains. If the second Backoff() is called before the timer from the first
// Backoff() call finishes, the first timer will NOT be drained and result in undetermined
// behavior.
Backoff() clock.Timer
}

// Deprecated: Will be removed when the legacy polling functions are removed.
type exponentialBackoffManagerImpl struct {
backoff *Backoff
backoffTimer clock.Timer
Expand All @@ -208,6 +330,27 @@ type exponentialBackoffManagerImpl struct {
// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
// This backoff manager is used to reduce load during upstream unhealthiness.
//
// Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
// Backoff struct, use DelayWithReset() to get a DelayFunc that periodically resets itself, and then
// invoke Timer() when calling wait.BackoffUntil.
//
// Instead of:
//
// bm := wait.NewExponentialBackoffManager(init, max, reset, factor, jitter, clock)
// ...
// wait.BackoffUntil(..., bm.Backoff, ...)
//
// Use:
//
// delayFn := wait.Backoff{
// Duration: init,
// Cap: max,
// Steps: int(math.Ceil(float64(max) / float64(init))), // now a required argument
// Factor: factor,
// Jitter: jitter,
// }.DelayWithReset(reset, clock)
// wait.BackoffUntil(..., delayFn.Timer(), ...)
func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
return &exponentialBackoffManagerImpl{
backoff: &Backoff{
Expand Down Expand Up @@ -248,6 +391,7 @@ func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
return b.backoffTimer
}

// Deprecated: Will be removed when the legacy polling functions are removed.
type jitteredBackoffManagerImpl struct {
clock clock.Clock
duration time.Duration
Expand All @@ -257,6 +401,19 @@ type jitteredBackoffManagerImpl struct {

// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
// is negative, backoff will not be jittered.
//
// Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
// Backoff struct and invoke Timer() when calling wait.BackoffUntil.
//
// Instead of:
//
// bm := wait.NewJitteredBackoffManager(duration, jitter, clock)
// ...
// wait.BackoffUntil(..., bm.Backoff, ...)
//
// Use:
//
// wait.BackoffUntil(..., wait.Backoff{Duration: duration, Jitter: jitter}.Timer(), ...)
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
return &jitteredBackoffManagerImpl{
clock: c,
Expand Down Expand Up @@ -296,6 +453,9 @@ func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
// 3. a sleep truncated by the cap on duration has been completed.
// In case (1) the returned error is what the condition function returned.
// In all other cases, ErrWaitTimeout is returned.
//
// Since backoffs are often subject to cancellation, we recommend using
// ExponentialBackoffWithContext and passing a context to the method.
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
for backoff.Steps > 0 {
if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
Expand All @@ -309,8 +469,11 @@ func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
return ErrWaitTimeout
}

// ExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never
// exceeds the deadline specified by the request context.
// ExponentialBackoffWithContext repeats a condition check with exponential backoff.
// It immediately returns an error if the condition returns an error, the context is cancelled
// or hits the deadline, or if the maximum attempts defined in backoff is exceeded (ErrWaitTimeout).
// If an error is returned by the condition the backoff stops immediately. The condition will
// never be invoked more than backoff.Steps times.
func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionWithContextFunc) error {
for backoff.Steps > 0 {
select {
Expand Down
51 changes: 51 additions & 0 deletions pkg/util/wait/delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package wait

import (
"context"
"sync"
"time"

"k8s.io/utils/clock"
)

// DelayFunc returns the next time interval to wait.
type DelayFunc func() time.Duration

// Timer takes an arbitrary delay function and returns a timer that can handle arbitrary interval changes.
// Use Backoff{...}.Timer() for simple delays and more efficient timers.
func (fn DelayFunc) Timer(c clock.Clock) Timer {
return &variableTimer{fn: fn, new: c.NewTimer}
}

// Until takes an arbitrary delay function and runs until cancelled or the condition indicates exit. This
// offers all of the functionality of the methods in this package.
func (fn DelayFunc) Until(ctx context.Context, immediate, sliding bool, condition ConditionWithContextFunc) error {
return loopConditionUntilContext(ctx, &variableTimer{fn: fn, new: internalClock.NewTimer}, immediate, sliding, condition)
}

// Concurrent returns a version of this DelayFunc that is safe for use by multiple goroutines that
// wish to share a single delay timer.
func (fn DelayFunc) Concurrent() DelayFunc {
var lock sync.Mutex
return func() time.Duration {
lock.Lock()
defer lock.Unlock()
return fn()
}
}
Loading

0 comments on commit 2da2e3c

Please sign in to comment.