Skip to content

Commit

Permalink
Merge pull request #5010 from tonistiigi/flightcontrol-timeout-fix
Browse files Browse the repository at this point in the history
flightcontrol: protect contention timeouts
  • Loading branch information
tonistiigi committed Jun 10, 2024
2 parents 37ab8e8 + dce25cc commit 5760de7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
11 changes: 6 additions & 5 deletions util/flightcontrol/flightcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package flightcontrol
import (
"context"
"io"
"runtime"
"math/rand"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -43,13 +43,14 @@ func (g *Group[T]) Do(ctx context.Context, key string, fn func(ctx context.Conte
err = errors.Wrapf(errRetryTimeout, "flightcontrol")
return v, err
}
runtime.Gosched()
if backoff > 0 {
time.Sleep(backoff)
backoff *= 2
backoff = time.Duration(float64(backoff) * 1.2)
} else {
backoff = time.Millisecond
// randomize initial backoff to avoid all goroutines retrying at once
//nolint:gosec // using math/rand pseudo-randomness is acceptable here
backoff = time.Millisecond + time.Duration(rand.Intn(1e7))*time.Nanosecond
}
time.Sleep(backoff)
}
}

Expand Down
22 changes: 22 additions & 0 deletions util/flightcontrol/flightcontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,28 @@ func TestContention(t *testing.T) {
wg.Wait()
}

func TestMassiveParallel(t *testing.T) {
var retryCount int64
g := &Group[string]{}
eg, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 1000; i++ {
eg.Go(func() error {
_, err := g.Do(ctx, "key", func(ctx context.Context) (string, error) {
return "", errors.Errorf("always fail")
})
if errors.Is(err, errRetryTimeout) {
atomic.AddInt64(&retryCount, 1)
}
return err
})
// magic numbers to increase contention
time.Sleep(5 * time.Microsecond)
}
err := eg.Wait()
assert.Error(t, err)
assert.Equal(t, int64(0), retryCount)
}

func testFunc(wait time.Duration, ret string, counter *int64) func(ctx context.Context) (string, error) {
return func(ctx context.Context) (string, error) {
atomic.AddInt64(counter, 1)
Expand Down

0 comments on commit 5760de7

Please sign in to comment.