Skip to content

Commit

Permalink
refactor: peer discovery management for discv5 and peer exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Jan 17, 2023
1 parent 25486eb commit e0ccdbe
Show file tree
Hide file tree
Showing 12 changed files with 485 additions and 337 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect
Expand Down
4 changes: 1 addition & 3 deletions mobile/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/ethereum/go-ethereum/p2p/enode"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
Expand Down Expand Up @@ -176,7 +174,7 @@ func NewNode(configJSON string) string {
}
bootnodes = append(bootnodes, bootnode)
}
opts = append(opts, node.WithDiscoveryV5(*config.DiscV5UDPPort, bootnodes, true, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second))))
opts = append(opts, node.WithDiscoveryV5(*config.DiscV5UDPPort, bootnodes, true))
}

// for go-libp2p loggers
Expand Down
48 changes: 48 additions & 0 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"fmt"
"io"
"net"
"sync"
"testing"

"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
Expand Down Expand Up @@ -115,3 +117,49 @@ func RandomHex(n int) (string, error) {
}
return hex.EncodeToString(bytes), nil
}

type TestPeerDiscoverer struct {
sync.RWMutex
peerMap map[peer.ID]struct{}
peerCh chan peer.AddrInfo
}

func NewTestPeerDiscoverer() *TestPeerDiscoverer {
result := &TestPeerDiscoverer{
peerMap: make(map[peer.ID]struct{}),
peerCh: make(chan peer.AddrInfo, 10),
}

go func() {
for p := range result.peerCh {
result.Lock()
result.peerMap[p.ID] = struct{}{}
result.Unlock()
}
}()

return result
}

func (t *TestPeerDiscoverer) PeerChannel() chan<- peer.AddrInfo {
return t.peerCh
}

func (t *TestPeerDiscoverer) HasPeer(p peer.ID) bool {
t.RLock()
defer t.RUnlock()
_, ok := t.peerMap[p]
return ok
}

func (t *TestPeerDiscoverer) PeerCount() int {
t.RLock()
defer t.RUnlock()
return len(t.peerMap)
}

func (t *TestPeerDiscoverer) Clear() {
t.Lock()
defer t.Unlock()
t.peerMap = make(map[peer.ID]struct{})
}
3 changes: 1 addition & 2 deletions waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"

Expand Down Expand Up @@ -245,7 +244,7 @@ func Execute(options Options) {
}
}

nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second))))
nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate))
}

if options.PeerExchange.Enable {
Expand Down
243 changes: 243 additions & 0 deletions waku/v2/discovery_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package v2

// Adapted from github.com/libp2p/go-libp2p@v0.23.2/p2p/discovery/backoff/backoffconnector.go

import (
"context"
"errors"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
"go.uber.org/zap"

lru "github.com/hashicorp/golang-lru"
)

// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
sync.RWMutex

cache *lru.TwoQueueCache
host host.Host
cancel context.CancelFunc

paused bool
workerCtx context.Context
workerCancel context.CancelFunc

wg sync.WaitGroup
minPeers int
dialTimeout time.Duration
peerCh chan peer.AddrInfo
dialCh chan peer.AddrInfo

backoff backoff.BackoffFactory
mux sync.Mutex
logger *zap.Logger
}

// NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already.
// cacheSize is the size of a TwoQueueCache
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(h host.Host, cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
cache, err := lru.New2Q(cacheSize)
if err != nil {
return nil, err
}

return &PeerConnectionStrategy{
cache: cache,
host: h,
wg: sync.WaitGroup{},
minPeers: minPeers,
dialTimeout: dialTimeout,
backoff: backoff,
logger: logger.Named("discovery-connector"),
}, nil
}

type connCacheData struct {
nextTry time.Time
strat backoff.BackoffStrategy
}

// PeerChannel exposes the channel on which discovered peers should be pushed
func (c *PeerConnectionStrategy) PeerChannel() chan<- peer.AddrInfo {
return c.peerCh
}

// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
if c.cancel != nil {
return errors.New("already started")
}

ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
c.peerCh = make(chan peer.AddrInfo)
c.dialCh = make(chan peer.AddrInfo)

c.wg.Add(3)
go c.shouldDialPeers(ctx)
go c.workPublisher(ctx)
go c.dialPeers(ctx)

return nil
}

func (c *PeerConnectionStrategy) Stop() {
if c.cancel == nil {
return
}

c.cancel()
c.wg.Wait()

close(c.peerCh)
close(c.dialCh)
}

func (c *PeerConnectionStrategy) isPaused() bool {
c.RLock()
defer c.RUnlock()
return c.paused
}

func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
defer c.wg.Done()

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

c.Lock()
c.workerCtx, c.workerCancel = context.WithCancel(ctx)
c.Unlock()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
isPaused := c.isPaused()
numPeers := len(c.host.Network().Peers())
if numPeers >= c.minPeers && !isPaused {
c.Lock()
c.paused = true
c.workerCancel()
c.Unlock()
} else if numPeers < c.minPeers && isPaused {
c.Lock()
c.paused = false
c.workerCtx, c.workerCancel = context.WithCancel(ctx)
c.Unlock()
}
}
}
}

func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) {
select {
case c.dialCh <- p:
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
return
}
}

func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {
defer c.wg.Done()

for {
select {
case <-ctx.Done():
return
default:
isPaused := c.isPaused()
if !isPaused {
select {
case <-ctx.Done():
return
case p := <-c.peerCh:
c.publishWork(ctx, p)
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break
}
} else {
// Check if paused again
time.Sleep(1 * time.Second)
}
}
}
}

func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer c.wg.Done()

maxGoRoutines := c.minPeers
if maxGoRoutines > 15 {
maxGoRoutines = 15
}

sem := make(chan struct{}, maxGoRoutines)
for {
select {
case pi, ok := <-c.dialCh:
if !ok {
return
}

if pi.ID == c.host.ID() || pi.ID == "" {
continue
}

c.mux.Lock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
continue
}

tv.nextTry = now.Add(tv.strat.Delay())
} else {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
}
c.mux.Unlock()

if c.host.Network().Connectedness(pi.ID) == network.Connected {
continue
}

sem <- struct{}{}
c.wg.Add(1)
go func(pi peer.AddrInfo) {
defer c.wg.Done()

ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem
}(pi)
case <-ctx.Done():
return
}
}
}

0 comments on commit e0ccdbe

Please sign in to comment.