Skip to content

Commit

Permalink
correctly handle static relays
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Mar 30, 2022
1 parent b4a94f7 commit 9c878c3
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 34 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Expand Up @@ -101,7 +101,7 @@ type Config struct {

EnableAutoRelay bool
AutoNATConfig
StaticRelayOpt autorelay.StaticRelayOption
StaticRelayOpt autorelay.Option

EnableHolePunching bool
HolePunchingOptions []holepunch.Option
Expand Down
2 changes: 1 addition & 1 deletion options.go
Expand Up @@ -250,7 +250,7 @@ func EnableRelayService(opts ...relayv2.Option) Option {
//
// This subsystem performs automatic address rewriting to advertise relay addresses when it
// detects that the node is publicly unreachable (e.g. behind a NAT).
func EnableAutoRelay(opts ...autorelay.StaticRelayOption) Option {
func EnableAutoRelay(opts ...autorelay.Option) Option {
return func(cfg *Config) error {
if len(opts) > 0 {
if len(opts) > 1 {
Expand Down
24 changes: 23 additions & 1 deletion p2p/host/autorelay/autorelay.go
Expand Up @@ -22,6 +22,8 @@ type AutoRelay struct {
ctx context.Context
ctxCancel context.CancelFunc

conf *config

mx sync.Mutex
status network.Reachability

Expand Down Expand Up @@ -49,6 +51,7 @@ func NewAutoRelay(bhost *basic.BasicHost, peerChan <-chan peer.AddrInfo, opts ..
return nil, err
}
}
r.conf = &conf
r.relayFinder = newRelayFinder(bhost, r.peerChanOut, &conf)
bhost.AddrsFactory = r.hostAddrs

Expand All @@ -68,6 +71,25 @@ func (r *AutoRelay) background() {
}
defer subReachability.Close()

var peerChan <-chan peer.AddrInfo
if len(r.conf.staticRelays) == 0 {
peerChan = r.peerChanIn
} else {
pc := make(chan peer.AddrInfo)
peerChan = pc
r.refCount.Add(1)
go func() {
defer r.refCount.Done()
for _, sr := range r.conf.staticRelays {
select {
case pc <- sr:
case <-r.ctx.Done():
return
}
}
}()
}

for {
select {
case <-r.ctx.Done():
Expand All @@ -89,7 +111,7 @@ func (r *AutoRelay) background() {
r.mx.Lock()
r.status = evt.Reachability
r.mx.Unlock()
case pi := <-r.peerChanIn:
case pi := <-peerChan:
select {
case r.peerChanOut <- pi: // if there's space in the channel, great
default:
Expand Down
31 changes: 27 additions & 4 deletions p2p/host/autorelay/autorelay_test.go
Expand Up @@ -83,7 +83,6 @@ func newBrokenRelay(t *testing.T, workAfter int) host.Host {
require.NoError(t, err)
var n int32
h.SetStreamHandler(circuitv2_proto.ProtoIDv2Hop, func(str network.Stream) {
t.Log("rejecting reservation")
str.Reset()
num := atomic.AddInt32(&n, 1)
if int(num) >= workAfter {
Expand Down Expand Up @@ -201,12 +200,36 @@ func TestMaxBackoffs(t *testing.T) {
require.NoError(t, err)
defer ar.Close()

r1 := newBrokenRelay(t, 4)
t.Cleanup(func() { r1.Close() })
peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
r := newBrokenRelay(t, 4)
t.Cleanup(func() { r.Close() })
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}

// make sure we don't add any relays yet
require.Never(t, func() bool {
return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0
}, 300*time.Millisecond, 50*time.Millisecond)
}

func TestStaticRelays(t *testing.T) {
const numRelays = 3
var staticRelays []peer.AddrInfo
for i := 0; i < numRelays; i++ {
r := newRelay(t)
t.Cleanup(func() { r.Close() })
staticRelays = append(staticRelays, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()})
}

h := newPrivateNode(t)
ar, err := autorelay.NewAutoRelay(
h,
nil,
autorelay.WithStaticRelays(staticRelays),
autorelay.WithNumRelays(1),
)
require.NoError(t, err)
defer ar.Close()

require.Eventually(t, func() bool {
return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0
}, 500*time.Millisecond, 50*time.Millisecond)
}
11 changes: 5 additions & 6 deletions p2p/host/autorelay/options.go
Expand Up @@ -57,19 +57,18 @@ func init() {
}

type Option func(*config) error
type StaticRelayOption Option

func WithStaticRelays(static []peer.AddrInfo) StaticRelayOption {
return func(r *config) error {
if len(r.staticRelays) > 0 {
func WithStaticRelays(static []peer.AddrInfo) Option {
return func(c *config) error {
if len(c.staticRelays) > 0 {
return errors.New("can't set static relays, static relays already configured")
}
r.staticRelays = static
c.staticRelays = static
return nil
}
}

func WithDefaultStaticRelays() StaticRelayOption {
func WithDefaultStaticRelays() Option {
return WithStaticRelays(defaultStaticRelays)
}

Expand Down
44 changes: 23 additions & 21 deletions p2p/host/autorelay/relay_finder.go
Expand Up @@ -95,13 +95,11 @@ func newRelayFinder(host *basic.BasicHost, peerChan <-chan peer.AddrInfo, conf *
}

func (rf *relayFinder) background(ctx context.Context) {
if len(rf.conf.staticRelays) == 0 {
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx)
}()
}
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx)
}()

subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
Expand Down Expand Up @@ -138,10 +136,8 @@ func (rf *relayFinder) background(ctx context.Context) {
}
rf.relayMx.Unlock()
case <-rf.candidateFound:
log.Debugf("candidate found")
rf.handleNewCandidate(ctx)
case <-bootDelayTimer.C:
log.Debugf("boot delay timer")
rf.handleNewCandidate(ctx)
case <-rf.relayUpdated:
push = true
Expand All @@ -163,7 +159,7 @@ func (rf *relayFinder) background(ctx context.Context) {
}

// findNodes accepts nodes from the channel and tests if they support relaying.
// It is run on both public and private nodes (but not when static relays are set).
// It is run on both public and private nodes.
// It garbage collects old entries, so that nodes doesn't overflow.
// This makes sure that as soon as we need to find relay candidates, we have them available.
func (rf *relayFinder) findNodes(ctx context.Context) {
Expand All @@ -189,6 +185,13 @@ func (rf *relayFinder) findNodes(ctx context.Context) {
}
}

func (rf *relayFinder) notifyNewCandidate() {
select {
case rf.candidateFound <- struct{}{}:
default:
}
}

// handleNewNode tests if a peer supports circuit v1 or v2.
// This method is only run on private nodes.
// If a peer does, it is added to the candidates map.
Expand Down Expand Up @@ -220,13 +223,6 @@ func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) {
rf.notifyNewCandidate()
}

func (rf *relayFinder) notifyNewCandidate() {
select {
case rf.candidateFound <- struct{}{}:
default:
}
}

// tryNode checks if a peer actually supports either circuit v1 or circuit v2.
// It does not modify any internal state.
func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV1 bool, err error) {
Expand Down Expand Up @@ -293,10 +289,16 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) {
if len(rf.relays) == rf.conf.desiredRelays {
return
}
// During the startup phase, we don't want to connect to the first candidate that we find.
// Instead, we wait until we've found at least minCandidates, and then select the best of those.
// However, if that takes too long (longer than bootDelay),
if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay {

if len(rf.conf.staticRelays) != 0 {
// make sure we read all static relays before continuing
if len(rf.peerChan) > 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay {
return
}
} else if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay {
// During the startup phase, we don't want to connect to the first candidate that we find.
// Instead, we wait until we've found at least minCandidates, and then select the best of those.
// However, if that takes too long (longer than bootDelay), we still go ahead.
return
}

Expand Down

0 comments on commit 9c878c3

Please sign in to comment.