Skip to content

Commit

Permalink
add backoff
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Aug 23, 2023
1 parent 346e771 commit eb18105
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 8 deletions.
166 changes: 166 additions & 0 deletions client/backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backoff

import (
"context"
"fmt"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/log"
"github.com/pkg/errors"
"go.uber.org/zap"
)

// Backoffer is a utility for retrying queries.
type Backoffer struct {
ctx context.Context

fn map[string]backoffFn
maxSleep int
totalSleep int

errors []error
configs []*Config
backoffSleepMS map[string]int
backoffTimes map[string]int
}

// NewBackoffer (Deprecated) creates a Backoffer with maximum sleep time(in ms).
func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer {
return &Backoffer{
ctx: ctx,
maxSleep: maxSleep,
}
}

// Backoff sleeps a while base on the Config and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(cfg *Config, err error) error {
if span := opentracing.SpanFromContext(b.ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("tikv.backoff.%s", cfg), opentracing.ChildOf(span.Context()))
defer span1.Finish()
opentracing.ContextWithSpan(b.ctx, span1)

Check warning on line 56 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L54-L56

Added lines #L54 - L56 were not covered by tests
}
return b.BackoffWithCfgAndMaxSleep(cfg, -1, err)
}

// BackoffWithCfgAndMaxSleep sleeps a while base on the Config and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err error) error {
select {
case <-b.ctx.Done():
return errors.WithStack(err)
default:
}
b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
b.configs = append(b.configs, cfg)

// Lazy initialize.
if b.fn == nil {
b.fn = make(map[string]backoffFn)
}
f, ok := b.fn[cfg.name]
if !ok {
f = cfg.createBackoffFn()
b.fn[cfg.name] = f
}
realSleep := f(b.ctx, maxSleepMs)

b.totalSleep += realSleep
if b.backoffSleepMS == nil {
b.backoffSleepMS = make(map[string]int)
}
b.backoffSleepMS[cfg.name] += realSleep
if b.backoffTimes == nil {
b.backoffTimes = make(map[string]int)
}
b.backoffTimes[cfg.name]++

log.Debug("retry later",
zap.Error(err),
zap.Int("totalSleep", b.totalSleep),
zap.Int("maxSleep", b.maxSleep),
zap.Stringer("type", cfg))
return nil
}

func (b *Backoffer) String() string {
if b.totalSleep == 0 {
return ""

Check warning on line 103 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L102-L103

Added lines #L102 - L103 were not covered by tests
}
return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.configs)

Check warning on line 105 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L105

Added line #L105 was not covered by tests
}

// GetTotalSleep returns total sleep time.
func (b *Backoffer) GetTotalSleep() int {
return b.totalSleep

Check warning on line 110 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L110

Added line #L110 was not covered by tests
}

// GetCtx returns the bound context.
func (b *Backoffer) GetCtx() context.Context {
return b.ctx

Check warning on line 115 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L115

Added line #L115 was not covered by tests
}

// SetCtx sets the bound context to ctx.
func (b *Backoffer) SetCtx(ctx context.Context) {
b.ctx = ctx

Check warning on line 120 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L120

Added line #L120 was not covered by tests
}

// GetBackoffTimes returns a map contains backoff time count by type.
func (b *Backoffer) GetBackoffTimes() map[string]int {
return b.backoffTimes

Check warning on line 125 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L125

Added line #L125 was not covered by tests
}

// GetBackoffTime returns backoff time count by specific type.
func (b *Backoffer) GetBackoffTime(s string) int {
return b.backoffTimes[s]
}

// GetTotalBackoffTimes returns the total backoff times of the backoffer.
func (b *Backoffer) GetTotalBackoffTimes() int {
total := 0

Check warning on line 135 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L135

Added line #L135 was not covered by tests
for _, t := range b.backoffTimes {
total += t

Check warning on line 137 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L137

Added line #L137 was not covered by tests
}
return total

Check warning on line 139 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L139

Added line #L139 was not covered by tests
}

// GetBackoffSleepMS returns a map contains backoff sleep time by type.
func (b *Backoffer) GetBackoffSleepMS() map[string]int {
return b.backoffSleepMS

Check warning on line 144 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L144

Added line #L144 was not covered by tests
}

// ErrorsNum returns the number of errors.
func (b *Backoffer) ErrorsNum() int {
return len(b.errors)

Check warning on line 149 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L149

Added line #L149 was not covered by tests
}

// Reset resets the sleep state of the backoffer, so that following backoff
// can sleep shorter. The reason why we don't create a new backoffer is that
// backoffer is similar to context, and it records some metrics that we
// want to record for an entire process which is composed of serveral stages.
func (b *Backoffer) Reset() {
b.fn = nil
b.totalSleep = 0

Check warning on line 158 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L157-L158

Added lines #L157 - L158 were not covered by tests
}

// ResetMaxSleep resets the sleep state and max sleep limit of the backoffer.
// It's used when switches to the next stage of the process.
func (b *Backoffer) ResetMaxSleep(maxSleep int) {
b.Reset()
b.maxSleep = maxSleep

Check warning on line 165 in client/backoff/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/backoff.go#L164-L165

Added lines #L164 - L165 were not covered by tests
}
29 changes: 29 additions & 0 deletions client/backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backoff

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBackoffErrorType(t *testing.T) {
b := NewBackoffer(context.TODO(), 800)
err := b.Backoff(BoMemberUpdate, errors.New("no leader")) // 100 ms
assert.Nil(t, err)
}
141 changes: 141 additions & 0 deletions client/backoff/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backoff

import (
"context"
"math"
"math/rand"
"time"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// Config is the configuration of the Backoff function.
type Config struct {
name string
fnCfg *BackoffFnCfg
err error
}

// Backoff Config variables
var (
// BoMemberUpdate is for member change events.
BoMemberUpdate = NewConfig("memberUpdate", NewBackoffFnCfg(100, 2000, EqualJitter), nil)
)

// backoffFn is the backoff function which compute the sleep time and do sleep.
type backoffFn func(ctx context.Context, maxSleepMs int) int

func (c *Config) createBackoffFn() backoffFn {
return newBackoffFn(c.fnCfg.base, c.fnCfg.cap, c.fnCfg.jitter)
}

// BackoffFnCfg is the configuration for the backoff func which implements exponential backoff with
// optional jitters.
// See http://www.awsarchitectureblog.com/2015/03/backoff.html
type BackoffFnCfg struct {
base int
cap int
jitter int
}

// NewBackoffFnCfg creates the config for BackoffFn.
func NewBackoffFnCfg(base, cap, jitter int) *BackoffFnCfg {
return &BackoffFnCfg{
base,
cap,
jitter,
}
}

// NewConfig creates a new Config for the Backoff operation.
func NewConfig(name string, backoffFnCfg *BackoffFnCfg, err error) *Config {
return &Config{
name: name,
fnCfg: backoffFnCfg,
err: err,
}
}

func (c *Config) String() string {
return c.name
}

// SetErrors sets a more detailed error instead of the default bo config.
func (c *Config) SetErrors(err error) {
c.err = err

Check warning on line 80 in client/backoff/config.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/config.go#L80

Added line #L80 was not covered by tests
}

const (
// NoJitter makes the backoff sequence strict exponential.
NoJitter = 1 + iota
// FullJitter applies random factors to strict exponential.
FullJitter
// EqualJitter is also randomized, but prevents very short sleeps.
EqualJitter
// DecorrJitter increases the maximum jitter based on the last random value.
DecorrJitter
)

// newBackoffFn creates a backoff func which implements exponential backoff with
// optional jitters.
// See http://www.awsarchitectureblog.com/2015/03/backoff.html
func newBackoffFn(base, cap, jitter int) backoffFn {
if base < 2 {
// Top prevent panic in 'rand.Intn'.
base = 2

Check warning on line 100 in client/backoff/config.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/config.go#L100

Added line #L100 was not covered by tests
}
attempts := 0
lastSleep := base
return func(ctx context.Context, maxSleepMs int) int {
var sleep int
switch jitter {
case NoJitter:
sleep = expo(base, cap, attempts)
case FullJitter:
v := expo(base, cap, attempts)
sleep = rand.Intn(v)

Check warning on line 111 in client/backoff/config.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/config.go#L107-L111

Added lines #L107 - L111 were not covered by tests
case EqualJitter:
v := expo(base, cap, attempts)
sleep = v/2 + rand.Intn(v/2)
case DecorrJitter:
sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base))))

Check warning on line 116 in client/backoff/config.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/config.go#L115-L116

Added lines #L115 - L116 were not covered by tests
}
log.Debug("backoff",
zap.Int("base", base),
zap.Int("sleep", sleep),
zap.Int("attempts", attempts))

realSleep := sleep
// when set maxSleepMs >= 0 will force sleep maxSleepMs milliseconds.
if maxSleepMs >= 0 && realSleep > maxSleepMs {
realSleep = maxSleepMs

Check warning on line 126 in client/backoff/config.go

View check run for this annotation

Codecov / codecov/patch

client/backoff/config.go#L126

Added line #L126 was not covered by tests
}
select {
case <-time.After(time.Duration(realSleep) * time.Millisecond):
attempts++
lastSleep = sleep
return realSleep
case <-ctx.Done():
return 0
}
}
}

func expo(base, cap, n int) int {
return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n))))
}
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
go.uber.org/goleak v1.1.11
Expand All @@ -25,7 +26,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
Expand Down

0 comments on commit eb18105

Please sign in to comment.