Skip to content

Commit

Permalink
util/retry: always run at least one iteration
Browse files Browse the repository at this point in the history
This commit updates the Retry utility to always guarantee at least a
single iteration of the retry loop. This fixes a previously-buggy
pattern that we have throughout the codebase where we would assume that
a retry loop would be executed at least once. With this false
understanding, we would try to return the error found in the last
iteration of the retry loop. This could cause us to return no error at
all if the loop never ran, which would trick upper levels into thinking
the retry loop had succeeded.

One instance of this bug was fixed in cockroachdb#43789.

Here are five other instances of the pattern that I believe are currently
susceptible to the bug:
- https://github.com/cockroachdb/cockroach/blob/d0c79625eda85d3aa38afad5b0254d419a9bc4cd/pkg/ccl/changefeedccl/changefeed_stmt.go#L568
- https://github.com/cockroachdb/cockroach/blob/d0c79625eda85d3aa38afad5b0254d419a9bc4cd/pkg/ccl/workloadccl/fixture.go#L136
- https://github.com/cockroachdb/cockroach/blob/d0c79625eda85d3aa38afad5b0254d419a9bc4cd/pkg/kv/kvserver/replica_command.go#L529
- https://github.com/cockroachdb/cockroach/blob/d0c79625eda85d3aa38afad5b0254d419a9bc4cd/pkg/sql/schema_changer.go#L1701
- https://github.com/cockroachdb/cockroach/blob/d0c79625eda85d3aa38afad5b0254d419a9bc4cd/pkg/sqlmigrations/migrations.go#L580

And a sixth was almost introduced in cockroachdb#51227.
  • Loading branch information
nvanbenschoten committed Jul 10, 2020
1 parent cff2542 commit ff7dc82
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (ds *DistSender) partialRangeFeed(
}
}
}
return nil
return ctx.Err()
}

// singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func TestApplier(t *testing.T) {
check(t, step(get(`b`)), `db0.Get(ctx, "b") // ("2", nil)`)
check(t, step(scan(`a`, `c`)), `db1.Scan(ctx, "a", "c", 0) // (["a":"1", "b":"2"], nil)`)

checkErr(t, step(get(`a`)), `db0.Get(ctx, "a") // (nil, aborted in distSender: context canceled)`)
checkErr(t, step(put(`a`, `1`)), `db1.Put(ctx, "a", 1) // aborted in distSender: context canceled`)
checkErr(t, step(scanForUpdate(`a`, `c`)), `db0.ScanForUpdate(ctx, "a", "c", 0) // (nil, aborted in distSender: context canceled)`)
checkErr(t, step(get(`a`)), `db0.Get(ctx, "a") // (nil, aborted during DistSender.Send: context canceled)`)
checkErr(t, step(put(`a`, `1`)), `db1.Put(ctx, "a", 1) // aborted during DistSender.Send: context canceled`)
checkErr(t, step(scanForUpdate(`a`, `c`)), `db0.ScanForUpdate(ctx, "a", "c", 0) // (nil, aborted during DistSender.Send: context canceled)`)

// Batch
check(t, step(batch(put(`b`, `2`), get(`a`), scan(`a`, `c`))), `
Expand All @@ -78,10 +78,10 @@ func TestApplier(t *testing.T) {
checkErr(t, step(batch(put(`b`, `2`), get(`a`), scanForUpdate(`a`, `c`))), `
{
b := &Batch{}
b.Put(ctx, "b", 2) // aborted in distSender: context canceled
b.Get(ctx, "a") // (nil, aborted in distSender: context canceled)
b.ScanForUpdate(ctx, "a", "c") // (nil, aborted in distSender: context canceled)
db0.Run(ctx, b) // aborted in distSender: context canceled
b.Put(ctx, "b", 2) // aborted during DistSender.Send: context canceled
b.Get(ctx, "a") // (nil, aborted during DistSender.Send: context canceled)
b.ScanForUpdate(ctx, "a", "c") // (nil, aborted during DistSender.Send: context canceled)
db0.Run(ctx, b) // aborted during DistSender.Send: context canceled
}
`)

Expand Down Expand Up @@ -130,7 +130,7 @@ db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
check(t, step(split(`foo`)), `db1.AdminSplit(ctx, "foo") // nil`)
check(t, step(merge(`foo`)), `db0.AdminMerge(ctx, "foo") // nil`)
checkErr(t, step(split(`foo`)),
`db1.AdminSplit(ctx, "foo") // aborted in distSender: context canceled`)
`db1.AdminSplit(ctx, "foo") // aborted during DistSender.Send: context canceled`)
checkErr(t, step(merge(`foo`)),
`db0.AdminMerge(ctx, "foo") // aborted in distSender: context canceled`)
`db0.AdminMerge(ctx, "foo") // aborted during DistSender.Send: context canceled`)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func (r *Replica) executeAdminCommandWithDescriptor(
// in a retry loop. Note that this is speculative; there wasn't an incident
// that suggested this.
retryOpts.RandomizationFactor = 0.5
lastErr := ctx.Err()
var lastErr error
for retryable := retry.StartWithCtx(ctx, retryOpts); retryable.Next(); {
// The replica may have been destroyed since the start of the retry loop.
// We need to explicitly check this condition. Having a valid lease, as we
Expand Down
16 changes: 5 additions & 11 deletions pkg/storage/cloudimpl/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,25 +188,19 @@ func checkHTTPContentRangeHeader(h string, pos int64) error {
func (r *resumingHTTPReader) sendRequest(
reqHeaders map[string]string,
) (resp *http.Response, err error) {
// Initialize err to the context.Canceled: if our context is canceled, we will
// never enter the loop below; in this case we want to return "nil, canceled"
err = context.Canceled
for attempt, retries := 0,
retry.StartWithCtx(r.ctx, HTTPRetryOptions); retries.Next(); attempt++ {
resp, err = r.client.req(r.ctx, "GET", r.url, nil, reqHeaders)

for attempt, retries := 0, retry.StartWithCtx(r.ctx, HTTPRetryOptions); retries.Next(); attempt++ {
resp, err := r.client.req(r.ctx, "GET", r.url, nil, reqHeaders)
if err == nil {
return
return resp, nil
}

log.Errorf(r.ctx, "HTTP:Req error: err=%s (attempt %d)", err, attempt)

if !errors.HasType(err, (*retryableHTTPError)(nil)) {
return
return nil, err
}
}

return
return nil, r.ctx.Err()
}

// requestNextRanges issues additional http request
Expand Down
43 changes: 26 additions & 17 deletions pkg/util/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package retry

import (
"context"
"log"
"math"
"math/rand"
"time"
Expand All @@ -27,7 +28,7 @@ type Options struct {
Multiplier float64 // Default backoff constant
MaxRetries int // Maximum number of attempts (0 for infinite)
RandomizationFactor float64 // Randomize the backoff interval by constant
Closer <-chan struct{} // Optionally end retry loop channel close.
Closer <-chan struct{} // Optionally end retry loop channel close
}

// Retry implements the public methods necessary to control an exponential-
Expand All @@ -47,7 +48,8 @@ func Start(opts Options) Retry {

// StartWithCtx returns a new Retry initialized to some default values. The
// Retry can then be used in an exponential-backoff retry loop. If the provided
// context is canceled (see Context.Done), the retry loop ends early.
// context is canceled (see Context.Done), the retry loop ends early, but will
// always run at least once.
func StartWithCtx(ctx context.Context, opts Options) Retry {
if opts.InitialBackoff == 0 {
opts.InitialBackoff = 50 * time.Millisecond
Expand All @@ -62,26 +64,31 @@ func StartWithCtx(ctx context.Context, opts Options) Retry {
opts.Multiplier = 2
}

r := Retry{opts: opts}
var r Retry
r.opts = opts
r.ctxDoneChan = ctx.Done()
r.Reset()
r.mustReset()
return r
}

// Reset resets the Retry to its initial state, meaning that the next call to
// Next will return true immediately and subsequent calls will behave as if
// they had followed the very first attempt (i.e. their backoffs will be
// short).
// Next will return true immediately and subsequent calls will behave as if they
// had followed the very first attempt (i.e. their backoffs will be short). The
// exception to this is if the provided Closer has fired or context has been
// canceled, in which case subsequent calls to Next will still return false
// immediately.
func (r *Retry) Reset() {
select {
case <-r.opts.Closer:
// When the closer has fired, you can't keep going.
return
case <-r.ctxDoneChan:
// When the context was canceled, you can't keep going.
return
default:
r.mustReset()
}
}

func (r *Retry) mustReset() {
r.currentAttempt = 0
r.isReset = true
}
Expand All @@ -100,8 +107,13 @@ func (r Retry) retryIn() time.Duration {
}

// Next returns whether the retry loop should continue, and blocks for the
// appropriate length of time before yielding back to the caller. If a stopper
// is present, Next will eagerly return false when the stopper is stopped.
// appropriate length of time before yielding back to the caller.
//
// Next is guaranteed to return true on its first call. As such, a retry loop
// can be thought of as implementing do-while semantics (i.e. always running at
// least once). Otherwide, if a context and/or closer is present, Next will
// return false if the context is canceled and/or the closer fires while the
// method is waiting.
func (r *Retry) Next() bool {
if r.isReset {
r.isReset = false
Expand Down Expand Up @@ -147,7 +159,8 @@ func (r *Retry) NextCh() <-chan time.Time {
}

// WithMaxAttempts is a helper that runs fn N times and collects the last err.
// It guarantees fn will run at least once. Otherwise, an error will be returned.
// The function will terminate early if the provided context is canceled, but it
// guarantees that fn will run at least once.
func WithMaxAttempts(ctx context.Context, opts Options, n int, fn func() error) error {
if n <= 0 {
return errors.Errorf("max attempts should not be 0 or below, got: %d", n)
Expand All @@ -162,11 +175,7 @@ func WithMaxAttempts(ctx context.Context, opts Options, n int, fn func() error)
}
}
if err == nil {
if ctx.Err() != nil {
err = errors.Wrap(ctx.Err(), "did not run function due to context completion")
} else {
err = errors.New("did not run function due to closed opts.Closer")
}
log.Fatal(ctx, "never ran function in WithMaxAttempts")
}
return err
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/util/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func TestRetryNextCh(t *testing.T) {
func TestRetryWithMaxAttempts(t *testing.T) {
expectedErr := errors.New("placeholder")
attempts := 0
noErrFunc := func() error {
attempts++
return nil
}
errWithAttemptsCounterFunc := func() error {
attempts++
return expectedErr
Expand Down Expand Up @@ -183,11 +187,11 @@ func TestRetryWithMaxAttempts(t *testing.T) {
Multiplier: 2,
MaxRetries: 1,
},
retryFunc: func() error { return nil },
retryFunc: noErrFunc,
maxAttempts: 3,

minNumAttempts: 0,
maxNumAttempts: 0,
minNumAttempts: 1,
maxNumAttempts: 1,
},
{
desc: "succeeds after one faked error",
Expand Down Expand Up @@ -235,7 +239,7 @@ func TestRetryWithMaxAttempts(t *testing.T) {
cancelCtxFunc()
},

minNumAttempts: 0,
minNumAttempts: 1,
maxNumAttempts: 3,
expectedErrText: "did not run function due to context completion: context canceled",
},
Expand All @@ -255,7 +259,7 @@ func TestRetryWithMaxAttempts(t *testing.T) {
close(closeCh)
},

minNumAttempts: 0,
minNumAttempts: 1,
maxNumAttempts: 3,
expectedErrText: "did not run function due to closed opts.Closer",
},
Expand Down

0 comments on commit ff7dc82

Please sign in to comment.