Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Fix race with reuse of randomness #54

Merged
merged 3 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 25 additions & 3 deletions backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package discovery
import (
"math"
"math/rand"
"sync"
"time"
)

Expand Down Expand Up @@ -90,7 +91,8 @@ func (b *fixedBackoff) Reset() {}
// timeUnits are the units of time the polynomial is evaluated in
// polyCoefs is the array of polynomial coefficients from [c0, c1, ... cn]
func NewPolynomialBackoff(min, max time.Duration, jitter Jitter,
timeUnits time.Duration, polyCoefs []float64, rng *rand.Rand) BackoffFactory {
timeUnits time.Duration, polyCoefs []float64, rngSrc rand.Source) BackoffFactory {
rng := rand.New(&lockedSource{src: rngSrc})
return func() BackoffStrategy {
return &polynomialBackoff{
attemptBackoff: attemptBackoff{
Expand Down Expand Up @@ -138,7 +140,8 @@ func (b *polynomialBackoff) Delay() time.Duration {
// jitter is the function for adding randomness around the backoff
// timeUnits are the units of time the base^x is evaluated in
func NewExponentialBackoff(min, max time.Duration, jitter Jitter,
timeUnits time.Duration, base float64, offset time.Duration, rng *rand.Rand) BackoffFactory {
timeUnits time.Duration, base float64, offset time.Duration, rngSrc rand.Source) BackoffFactory {
rng := rand.New(&lockedSource{src: rngSrc})
return func() BackoffStrategy {
return &exponentialBackoff{
attemptBackoff: attemptBackoff{
Expand Down Expand Up @@ -173,7 +176,8 @@ func (b *exponentialBackoff) Delay() time.Duration {
// NewExponentialDecorrelatedJitter creates a BackoffFactory with backoff of the roughly of the form base^x where x is the attempt number.
// Delays start at the minimum duration and after each attempt delay = rand(min, delay * base), bounded by the max
// See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ for more information
func NewExponentialDecorrelatedJitter(min, max time.Duration, base float64, rng *rand.Rand) BackoffFactory {
func NewExponentialDecorrelatedJitter(min, max time.Duration, base float64, rngSrc rand.Source) BackoffFactory {
rng := rand.New(&lockedSource{src: rngSrc})
return func() BackoffStrategy {
return &exponentialDecorrelatedJitter{
randomizedBackoff: randomizedBackoff{
Expand Down Expand Up @@ -204,3 +208,21 @@ func (b *exponentialDecorrelatedJitter) Delay() time.Duration {
}

func (b *exponentialDecorrelatedJitter) Reset() { b.lastDelay = 0 }

type lockedSource struct {
lk sync.Mutex
src rand.Source
}

func (r *lockedSource) Int63() (n int64) {
r.lk.Lock()
n = r.src.Int63()
r.lk.Unlock()
return
}

func (r *lockedSource) Seed(seed int64) {
r.lk.Lock()
r.src.Seed(seed)
r.lk.Unlock()
}
74 changes: 70 additions & 4 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package discovery

import (
"fmt"
"golang.org/x/sync/errgroup"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -37,8 +39,7 @@ func TestFixedBackoff(t *testing.T) {
}

func TestPolynomialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewPolynomialBackoff(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rng)
bkf := NewPolynomialBackoff(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rand.NewSource(0))
b1 := bkf()
b2 := bkf()

Expand All @@ -57,8 +58,7 @@ func TestPolynomialBackoff(t *testing.T) {
}

func TestExponentialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewExponentialBackoff(time.Millisecond*650, time.Second*7, NoJitter, time.Second, 1.5, -time.Millisecond*400, rng)
bkf := NewExponentialBackoff(time.Millisecond*650, time.Second*7, NoJitter, time.Second, 1.5, -time.Millisecond*400, rand.NewSource(0))
b1 := bkf()
b2 := bkf()

Expand Down Expand Up @@ -123,3 +123,69 @@ func TestFullJitter(t *testing.T) {
t.Fatal("jitter increased overall time")
}
}

func TestManyBackoffFactory(t *testing.T) {
rngSource := rand.NewSource(0)
concurrent := 10

t.Run("Exponential", func(t *testing.T) {
testManyBackoffFactoryHelper(concurrent,
NewExponentialBackoff(time.Millisecond*650, time.Second*7, FullJitter, time.Second, 1.5, -time.Millisecond*400, rngSource),
)
})
t.Run("Polynomial", func(t *testing.T) {
testManyBackoffFactoryHelper(concurrent,
NewPolynomialBackoff(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rngSource),
)
})
t.Run("Fixed", func(t *testing.T) {
testManyBackoffFactoryHelper(concurrent,
NewFixedBackoff(time.Second),
)
})
}

func testManyBackoffFactoryHelper(concurrent int, bkf BackoffFactory) {
backoffCh := make(chan BackoffStrategy, concurrent)

errGrp := errgroup.Group{}
for i := 0; i < concurrent; i++ {
errGrp.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic %v", r)
}
}()
backoffCh <- bkf()
return
})
}
if err := errGrp.Wait(); err != nil {
panic(err)
}
close(backoffCh)

errGrp = errgroup.Group{}
for b := range backoffCh {
backoff := b
errGrp.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic %v", r)
}
}()

for i := 0; i < 5; i++ {
for j := 0; j < 10; j++ {
backoff.Delay()
}
backoff.Reset()
}
return
})
}

if err := errGrp.Wait(); err != nil {
panic(err)
}
}
5 changes: 3 additions & 2 deletions backoffcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"context"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -67,7 +68,7 @@ func TestBackoffDiscoverySingleBackoff(t *testing.T) {
d2 := &mockDiscoveryClient{h2, discServer}

bkf := NewExponentialBackoff(time.Millisecond*100, time.Second*10, NoJitter,
time.Millisecond*100, 2.5, 0, nil)
time.Millisecond*100, 2.5, 0, rand.NewSource(0))
dCache, err := NewBackoffDiscovery(d1, bkf)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) {

// Startup delay is 0ms. First backoff after finding data is 100ms, second backoff is 250ms.
bkf := NewExponentialBackoff(time.Millisecond*100, time.Second*10, NoJitter,
time.Millisecond*100, 2.5, 0, nil)
time.Millisecond*100, 2.5, 0, rand.NewSource(0))
dCache, err := NewBackoffDiscovery(d1, bkf)
if err != nil {
t.Fatal(err)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/libp2p/go-libp2p-peerstore v0.2.2
github.com/libp2p/go-libp2p-swarm v0.2.3
github.com/multiformats/go-multihash v0.0.13
golang.org/x/sync v0.0.0-20190423024810-112230192c58
)

go 1.13
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6Zh
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down