Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autorelay: fix busy loop bug and flaky tests in relay finder #2208

Merged
merged 13 commits into from
Mar 22, 2023
190 changes: 173 additions & 17 deletions p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package autorelay_test

import (
"context"
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -14,7 +17,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"

"github.com/benbjohnson/clock"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -182,7 +184,7 @@ func TestWaitForCandidates(t *testing.T) {

func TestBackoff(t *testing.T) {
const backoff = 20 * time.Second
cl := clock.NewMock()
cl := newMockClock()
r, err := libp2p.New(
libp2p.DisableRelay(),
libp2p.ForceReachabilityPublic(),
Expand Down Expand Up @@ -228,10 +230,10 @@ func TestBackoff(t *testing.T) {
}, 10*time.Second, 20*time.Millisecond, "reservations load should be 1 was %d", reservations.Load())
// make sure we don't add any relays yet
for i := 0; i < 2; i++ {
cl.Add(backoff / 3)
cl.advanceBy(backoff / 3)
require.Equal(t, 1, int(reservations.Load()))
}
cl.Add(backoff)
cl.advanceBy(backoff)
require.Eventually(t, func() bool {
return reservations.Load() == 2
}, 10*time.Second, 100*time.Millisecond, "reservations load should be 2 was %d", reservations.Load())
Expand Down Expand Up @@ -294,8 +296,122 @@ func TestConnectOnDisconnect(t *testing.T) {
require.NotEqualf(t, oldRelay, relaysInUse[0], "old relay should not be used again")
}

type mockClock struct {
mu sync.Mutex
now time.Time
timers []*mockInstantTimer
}

var _ autorelay.ClockWithInstantTimer = &mockClock{}

type mockInstantTimer struct {
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
c *mockClock
mu sync.Mutex
when time.Time
active bool
ch chan time.Time
}

// Ch implements autorelay.InstantTimer
func (t *mockInstantTimer) Ch() <-chan time.Time {
return t.ch
}

// Reset implements autorelay.InstantTimer
func (t *mockInstantTimer) Reset(d time.Time) bool {
t.mu.Lock()
defer t.mu.Unlock()
wasActive := t.active
t.active = true
t.when = d

// Schedule any timers that need to run. This will run this timer if t.when is before c.now
go t.c.advanceBy(0)

return wasActive
}

// Stop implements autorelay.InstantTimer
func (t *mockInstantTimer) Stop() bool {
t.mu.Lock()
defer t.mu.Unlock()
wasActive := t.active
t.active = false
return wasActive
}

var _ autorelay.InstantTimer = &mockInstantTimer{}

func newMockClock() *mockClock {
return &mockClock{now: time.Unix(0, 0)}
}

// InstantTimer implements autorelay.ClockWithInstantTimer
func (c *mockClock) InstantTimer(when time.Time) autorelay.InstantTimer {
c.mu.Lock()
defer c.mu.Unlock()
t := &mockInstantTimer{
c: c,
when: when,
ch: make(chan time.Time, 1),
active: true,
}
c.timers = append(c.timers, t)
return t
}

// Since implements autorelay.ClockWithInstantTimer
func (c *mockClock) Since(t time.Time) time.Duration {
c.mu.Lock()
defer c.mu.Unlock()
return c.now.Sub(t)
}

func (c *mockClock) Now() time.Time {
c.mu.Lock()
defer c.mu.Unlock()
return c.now
}

func (c *mockClock) advanceBy(dur time.Duration) {
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
c.mu.Lock()
defer c.mu.Unlock()
endTime := c.now.Add(dur)

// sort timers by when
if len(c.timers) > 1 {
sort.Slice(c.timers, func(i, j int) bool {
return c.timers[i].when.Before(c.timers[j].when)
})
}

for _, t := range c.timers {
t.mu.Lock()
if !t.active {
t.mu.Unlock()
continue
}
if !t.when.After(c.now) {
t.active = false
t.mu.Unlock()
// This may block if the channel is full, but that's intended. This way our mock clock never gets too far ahead of consumer.
// This also prevents us from dropping times because we're advancing too fast.
t.ch <- c.now
} else if !t.when.After(endTime) {
c.now = t.when
t.active = false
t.mu.Unlock()
// This may block if the channel is full, but that's intended. See comment above
t.ch <- c.now
} else {
t.mu.Unlock()
}
}
c.now = endTime
}

func TestMaxAge(t *testing.T) {
cl := clock.NewMock()
cl := newMockClock()

const num = 4
peerChan1 := make(chan peer.AddrInfo, num)
Expand Down Expand Up @@ -330,7 +446,7 @@ func TestMaxAge(t *testing.T) {
autorelay.WithBootDelay(0),
autorelay.WithMaxCandidateAge(20*time.Minute),
autorelay.WithClock(cl),
autorelay.WithMinInterval(time.Second),
autorelay.WithMinInterval(30*time.Second),
)
defer h.Close()

Expand All @@ -340,17 +456,16 @@ func TestMaxAge(t *testing.T) {
relays := usedRelays(h)
require.Len(t, relays, 1)

cl.advanceBy(time.Minute)
require.Eventually(t, func() bool {
// we don't know exactly when the timer is reset, just advance our timer multiple times if necessary
cl.Add(30 * time.Second)
return len(peerChans) == 0
}, 10*time.Second, 100*time.Millisecond)

cl.Add(10 * time.Minute)
cl.advanceBy(10 * time.Minute)
for _, r := range relays2 {
peerChan2 <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
}
cl.Add(11 * time.Minute)
cl.advanceBy(11 * time.Minute)

require.Eventually(t, func() bool {
relays = usedRelays(h)
Expand Down Expand Up @@ -381,11 +496,21 @@ func TestMaxAge(t *testing.T) {
for _, r := range relays2 {
ids = append(ids, r.ID())
}

require.Eventually(t, func() bool {
for _, id := range ids {
if id == relays[0] {
return true
}
}
fmt.Println("waiting for", ids, "to contain", relays[0])
return false
}, 3*time.Second, 100*time.Millisecond)
require.Contains(t, ids, relays[0])
}

func TestReconnectToStaticRelays(t *testing.T) {
cl := clock.NewMock()
cl := newMockClock()
var staticRelays []peer.AddrInfo
const numStaticRelays = 1
relays := make([]host.Host, 0, numStaticRelays)
Expand All @@ -403,7 +528,7 @@ func TestReconnectToStaticRelays(t *testing.T) {
)
defer h.Close()

cl.Add(time.Minute)
cl.advanceBy(time.Minute)
require.Eventually(t, func() bool {
return numRelays(h) == 1
}, 10*time.Second, 100*time.Millisecond)
Expand All @@ -419,14 +544,14 @@ func TestReconnectToStaticRelays(t *testing.T) {
return numRelays(h) == 0
}, 10*time.Second, 100*time.Millisecond)

cl.Add(time.Hour)
cl.advanceBy(time.Hour)
require.Eventually(t, func() bool {
return numRelays(h) == 1
}, 10*time.Second, 100*time.Millisecond)
}

func TestMinInterval(t *testing.T) {
cl := clock.NewMock()
cl := newMockClock()
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
peerChan := make(chan peer.AddrInfo, 1)
Expand All @@ -444,9 +569,40 @@ func TestMinInterval(t *testing.T) {
)
defer h.Close()

cl.Add(500 * time.Millisecond)
cl.advanceBy(400 * time.Millisecond)
// The second call to peerSource should happen after 1 second
require.Never(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 100*time.Millisecond)
cl.Add(500 * time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
cl.advanceBy(600 * time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
}

func TestNoBusyLoop0MinInterval(t *testing.T) {
var calledTimes uint64
cl := newMockClock()
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
atomic.AddUint64(&calledTimes, 1)
peerChan := make(chan peer.AddrInfo, 1)
defer close(peerChan)
r1 := newRelay(t)
t.Cleanup(func() { r1.Close() })
peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
return peerChan
},
autorelay.WithClock(cl),
autorelay.WithMinCandidates(1),
autorelay.WithMaxCandidates(1),
autorelay.WithNumRelays(0),
autorelay.WithBootDelay(time.Hour),
autorelay.WithMinInterval(time.Millisecond),
)
defer h.Close()

require.Never(t, func() bool {
cl.advanceBy(time.Second)
val := atomic.LoadUint64(&calledTimes)
return val >= 2
}, 500*time.Millisecond, 100*time.Millisecond)
val := atomic.LoadUint64(&calledTimes)
require.Less(t, val, uint64(2))
}
54 changes: 49 additions & 5 deletions p2p/host/autorelay/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/benbjohnson/clock"
)

// AutoRelay will call this function when it needs new candidates because it is
Expand All @@ -24,7 +22,7 @@ import (
type PeerSource func(ctx context.Context, num int) <-chan peer.AddrInfo

type config struct {
clock clock.Clock
clock ClockWithInstantTimer
peerSource PeerSource
// minimum interval used to call the peerSource callback
minInterval time.Duration
Expand All @@ -45,7 +43,7 @@ type config struct {
}

var defaultConfig = config{
clock: clock.New(),
clock: RealClock{},
minCandidates: 4,
maxCandidates: 20,
bootDelay: 3 * time.Minute,
Expand Down Expand Up @@ -162,7 +160,53 @@ func WithMaxCandidateAge(d time.Duration) Option {
}
}

func WithClock(cl clock.Clock) Option {
// InstantTimer is a timer that triggers at some instant rather than some duration
type InstantTimer interface {
Reset(d time.Time) bool
Stop() bool
Ch() <-chan time.Time
}

// ClockWithInstantTimer is a clock that can create timers that trigger at some
// instant rather than some duration
type ClockWithInstantTimer interface {
Now() time.Time
Since(t time.Time) time.Duration
InstantTimer(when time.Time) InstantTimer
}

type RealTimer struct{ t *time.Timer }

var _ InstantTimer = (*RealTimer)(nil)

func (t RealTimer) Ch() <-chan time.Time {
return t.t.C
}

func (t RealTimer) Reset(d time.Time) bool {
return t.t.Reset(time.Until(d))
}

func (t RealTimer) Stop() bool {
return t.t.Stop()
}

type RealClock struct{}

var _ ClockWithInstantTimer = RealClock{}

func (RealClock) Now() time.Time {
return time.Now()
}
func (RealClock) Since(t time.Time) time.Duration {
return time.Since(t)
}
func (RealClock) InstantTimer(when time.Time) InstantTimer {
t := time.NewTimer(time.Until(when))
return &RealTimer{t}
}

func WithClock(cl ClockWithInstantTimer) Option {
return func(c *config) error {
c.clock = cl
return nil
Expand Down
Loading