From e0ccdbe9665d1b053ae31b57a07c45eb3c558119 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 13 Jan 2023 19:58:22 -0400 Subject: [PATCH] refactor: peer discovery management for discv5 and peer exchange --- go.mod | 2 +- mobile/api.go | 4 +- tests/utils.go | 48 ++++ waku/node.go | 3 +- waku/v2/discovery_connector.go | 243 ++++++++++++++++++ waku/v2/discv5/discover.go | 222 ++++------------ waku/v2/discv5/discover_test.go | 94 +------ waku/v2/node/service.go | 6 + waku/v2/node/wakunode2.go | 45 +++- waku/v2/node/wakuoptions.go | 14 +- .../peer_exchange/waku_peer_exchange.go | 126 ++++----- .../peer_exchange/waku_peer_exchange_test.go | 15 +- 12 files changed, 485 insertions(+), 337 deletions(-) create mode 100644 waku/v2/discovery_connector.go diff --git a/go.mod b/go.mod index 7fd6cf0fe..223c95603 100644 --- a/go.mod +++ b/go.mod @@ -80,7 +80,7 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect + github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.0 // indirect github.com/huin/goupnp v1.0.3 // indirect diff --git a/mobile/api.go b/mobile/api.go index e92263b01..98cc46ae9 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -20,8 +20,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/ethereum/go-ethereum/p2p/enode" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/node" @@ -176,7 +174,7 @@ func NewNode(configJSON string) string { } bootnodes = append(bootnodes, bootnode) } - opts = append(opts, node.WithDiscoveryV5(*config.DiscV5UDPPort, bootnodes, true, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second)))) + opts = append(opts, node.WithDiscoveryV5(*config.DiscV5UDPPort, bootnodes, true)) } // for go-libp2p loggers diff --git a/tests/utils.go b/tests/utils.go index 5f0b2b76e..841f8d357 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -7,12 +7,14 @@ import ( "fmt" "io" "net" + "sync" "testing" "github.com/ethereum/go-ethereum/log" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) @@ -115,3 +117,49 @@ func RandomHex(n int) (string, error) { } return hex.EncodeToString(bytes), nil } + +type TestPeerDiscoverer struct { + sync.RWMutex + peerMap map[peer.ID]struct{} + peerCh chan peer.AddrInfo +} + +func NewTestPeerDiscoverer() *TestPeerDiscoverer { + result := &TestPeerDiscoverer{ + peerMap: make(map[peer.ID]struct{}), + peerCh: make(chan peer.AddrInfo, 10), + } + + go func() { + for p := range result.peerCh { + result.Lock() + result.peerMap[p.ID] = struct{}{} + result.Unlock() + } + }() + + return result +} + +func (t *TestPeerDiscoverer) PeerChannel() chan<- peer.AddrInfo { + return t.peerCh +} + +func (t *TestPeerDiscoverer) HasPeer(p peer.ID) bool { + t.RLock() + defer t.RUnlock() + _, ok := t.peerMap[p] + return ok +} + +func (t *TestPeerDiscoverer) PeerCount() int { + t.RLock() + defer t.RUnlock() + return len(t.peerMap) +} + +func (t *TestPeerDiscoverer) Clear() { + t.Lock() + defer t.Unlock() + t.peerMap = make(map[peer.ID]struct{}) +} diff --git a/waku/node.go b/waku/node.go index b378a2151..12dcff170 100644 --- a/waku/node.go +++ b/waku/node.go @@ -26,7 +26,6 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/config" - "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -245,7 +244,7 @@ func Execute(options Options) { } } - nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second)))) + nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate)) } if options.PeerExchange.Enable { diff --git a/waku/v2/discovery_connector.go b/waku/v2/discovery_connector.go new file mode 100644 index 000000000..6213ce466 --- /dev/null +++ b/waku/v2/discovery_connector.go @@ -0,0 +1,243 @@ +package v2 + +// Adapted from github.com/libp2p/go-libp2p@v0.23.2/p2p/discovery/backoff/backoffconnector.go + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/discovery/backoff" + "github.com/waku-org/go-waku/logging" + "go.uber.org/zap" + + lru "github.com/hashicorp/golang-lru" +) + +// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already +type PeerConnectionStrategy struct { + sync.RWMutex + + cache *lru.TwoQueueCache + host host.Host + cancel context.CancelFunc + + paused bool + workerCtx context.Context + workerCancel context.CancelFunc + + wg sync.WaitGroup + minPeers int + dialTimeout time.Duration + peerCh chan peer.AddrInfo + dialCh chan peer.AddrInfo + + backoff backoff.BackoffFactory + mux sync.Mutex + logger *zap.Logger +} + +// NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already. +// cacheSize is the size of a TwoQueueCache +// dialTimeout is how long we attempt to connect to a peer before giving up +// minPeers is the minimum number of peers that the node should have +// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer +func NewPeerConnectionStrategy(h host.Host, cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) { + cache, err := lru.New2Q(cacheSize) + if err != nil { + return nil, err + } + + return &PeerConnectionStrategy{ + cache: cache, + host: h, + wg: sync.WaitGroup{}, + minPeers: minPeers, + dialTimeout: dialTimeout, + backoff: backoff, + logger: logger.Named("discovery-connector"), + }, nil +} + +type connCacheData struct { + nextTry time.Time + strat backoff.BackoffStrategy +} + +// PeerChannel exposes the channel on which discovered peers should be pushed +func (c *PeerConnectionStrategy) PeerChannel() chan<- peer.AddrInfo { + return c.peerCh +} + +// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period. +func (c *PeerConnectionStrategy) Start(ctx context.Context) error { + if c.cancel != nil { + return errors.New("already started") + } + + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + c.peerCh = make(chan peer.AddrInfo) + c.dialCh = make(chan peer.AddrInfo) + + c.wg.Add(3) + go c.shouldDialPeers(ctx) + go c.workPublisher(ctx) + go c.dialPeers(ctx) + + return nil +} + +func (c *PeerConnectionStrategy) Stop() { + if c.cancel == nil { + return + } + + c.cancel() + c.wg.Wait() + + close(c.peerCh) + close(c.dialCh) +} + +func (c *PeerConnectionStrategy) isPaused() bool { + c.RLock() + defer c.RUnlock() + return c.paused +} + +func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { + defer c.wg.Done() + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + c.Lock() + c.workerCtx, c.workerCancel = context.WithCancel(ctx) + c.Unlock() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + isPaused := c.isPaused() + numPeers := len(c.host.Network().Peers()) + if numPeers >= c.minPeers && !isPaused { + c.Lock() + c.paused = true + c.workerCancel() + c.Unlock() + } else if numPeers < c.minPeers && isPaused { + c.Lock() + c.paused = false + c.workerCtx, c.workerCancel = context.WithCancel(ctx) + c.Unlock() + } + } + } +} + +func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) { + select { + case c.dialCh <- p: + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + // This timeout is to not lock the goroutine + return + } +} + +func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) { + defer c.wg.Done() + + for { + select { + case <-ctx.Done(): + return + default: + isPaused := c.isPaused() + if !isPaused { + select { + case <-ctx.Done(): + return + case p := <-c.peerCh: + c.publishWork(ctx, p) + case <-time.After(1 * time.Second): + // This timeout is to not lock the goroutine + break + } + } else { + // Check if paused again + time.Sleep(1 * time.Second) + } + } + } +} + +func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { + defer c.wg.Done() + + maxGoRoutines := c.minPeers + if maxGoRoutines > 15 { + maxGoRoutines = 15 + } + + sem := make(chan struct{}, maxGoRoutines) + for { + select { + case pi, ok := <-c.dialCh: + if !ok { + return + } + + if pi.ID == c.host.ID() || pi.ID == "" { + continue + } + + c.mux.Lock() + val, ok := c.cache.Get(pi.ID) + var cachedPeer *connCacheData + if ok { + tv := val.(*connCacheData) + now := time.Now() + if now.Before(tv.nextTry) { + c.mux.Unlock() + continue + } + + tv.nextTry = now.Add(tv.strat.Delay()) + } else { + cachedPeer = &connCacheData{strat: c.backoff()} + cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay()) + c.cache.Add(pi.ID, cachedPeer) + } + c.mux.Unlock() + + if c.host.Network().Connectedness(pi.ID) == network.Connected { + continue + } + + sem <- struct{}{} + c.wg.Add(1) + go func(pi peer.AddrInfo) { + defer c.wg.Done() + + ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout) + defer cancel() + err := c.host.Connect(ctx, pi) + if err != nil && !errors.Is(err, context.Canceled) { + c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) + } + <-sem + }(pi) + case <-ctx.Done(): + return + } + } +} diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index e0f0d238a..920cdf5ba 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -3,12 +3,10 @@ package discv5 import ( "context" "crypto/ecdsa" - "math/rand" + "errors" "net" "sync" - "time" - "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-discover/discover" @@ -17,42 +15,26 @@ import ( "go.uber.org/zap" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/nat" ) type DiscoveryV5 struct { sync.RWMutex - discovery.Discovery - - params *discV5Parameters - host host.Host - config discover.Config - udpAddr *net.UDPAddr - listener *discover.UDPv5 - localnode *enode.LocalNode - NAT nat.Interface + params *discV5Parameters + host host.Host + config discover.Config + udpAddr *net.UDPAddr + listener *discover.UDPv5 + localnode *enode.LocalNode + peerConnector PeerConnector + NAT nat.Interface log *zap.Logger started bool cancel context.CancelFunc wg *sync.WaitGroup - - peerCache peerCache -} - -type peerCache struct { - sync.RWMutex - recs map[peer.ID]PeerRecord - rng *rand.Rand -} - -type PeerRecord struct { - expire int64 - Peer peer.AddrInfo - Node *enode.Node } type discV5Parameters struct { @@ -96,9 +78,11 @@ func DefaultOptions() []DiscoveryV5Option { } } -const MaxPeersToDiscover = 600 +type PeerConnector interface { + PeerChannel() chan<- peer.AddrInfo +} -func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { +func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { params := new(discV5Parameters) optList := DefaultOptions() optList = append(optList, opts...) @@ -114,15 +98,12 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc } return &DiscoveryV5{ - host: host, - params: params, - NAT: NAT, - wg: &sync.WaitGroup{}, - peerCache: peerCache{ - rng: rand.New(rand.NewSource(rand.Int63())), - recs: make(map[peer.ID]PeerRecord), - }, - localnode: localnode, + host: host, + peerConnector: peerConnector, + params: params, + NAT: NAT, + wg: &sync.WaitGroup{}, + localnode: localnode, config: discover.Config{ PrivateKey: priv, Bootnodes: params.bootnodes, @@ -190,6 +171,7 @@ func (d *DiscoveryV5) Start(ctx context.Context) error { return err } + d.wg.Add(1) go d.runDiscoveryV5Loop(ctx) return nil @@ -237,29 +219,15 @@ func isWakuNode(node *enode.Node) bool { } */ -func hasTCPPort(node *enode.Node) bool { - enrTCP := new(enr.TCP) - if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil { - if !enr.IsNotFound(err) { - utils.Logger().Named("discv5").Error("retrieving port for enr", logging.ENode("enr", node)) - } - return false - } - - return true -} - func evaluateNode(node *enode.Node) bool { - // TODO: track https://github.com/status-im/nim-waku/issues/770 for improvements over validation func - - if node == nil || node.IP() == nil { + if node == nil { return false } // TODO: consider node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage - if /*!isWakuNode(node) ||*/ !hasTCPPort(node) { + /*if !isWakuNode(node) { return false - } + }*/ _, err := utils.EnodeToPeerInfo(node) @@ -271,27 +239,25 @@ func evaluateNode(node *enode.Node) bool { return true } -func (d *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { - // Get options - var options discovery.Options - err := options.Apply(opts...) - if err != nil { - return 0, err +func (d *DiscoveryV5) Iterator() (enode.Iterator, error) { + if d.listener == nil { + return nil, errors.New("no discv5 listener") } - // TODO: once discv5 spec introduces capability and topic discovery, implement this function - - return 20 * time.Minute, nil + iterator := d.listener.RandomNodes() + return enode.Filter(iterator, evaluateNode), nil } -func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int) { - defer d.wg.Done() +func (d *DiscoveryV5) iterate(ctx context.Context) { + iterator, err := d.Iterator() + if err != nil { + d.log.Debug("obtaining iterator", zap.Error(err)) + return + } - for { - if len(d.peerCache.recs) >= limit { - time.Sleep(1 * time.Minute) - } + defer iterator.Close() + for { if ctx.Err() != nil { break } @@ -313,111 +279,37 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi continue } - d.peerCache.Lock() - for _, p := range peerAddrs { - _, ok := d.peerCache.recs[p.ID] - if ok { - continue + if len(peerAddrs) != 0 { + select { + case <-ctx.Done(): + return + case d.peerConnector.PeerChannel() <- peerAddrs[0]: } - - d.peerCache.recs[p.ID] = PeerRecord{ - expire: time.Now().Unix() + 3600, // Expires in 1hr - Peer: p, - Node: iterator.Node(), - } - } - d.peerCache.Unlock() - } -} - -func (d *DiscoveryV5) removeExpiredPeers() int { - // Remove all expired entries from cache - currentTime := time.Now().Unix() - newCacheSize := len(d.peerCache.recs) - - for p := range d.peerCache.recs { - rec := d.peerCache.recs[p] - if rec.expire < currentTime { - newCacheSize-- - delete(d.peerCache.recs, p) } } - - return newCacheSize } func (d *DiscoveryV5) runDiscoveryV5Loop(ctx context.Context) { - iterator := d.listener.RandomNodes() - iterator = enode.Filter(iterator, evaluateNode) - defer iterator.Close() - - d.wg.Add(1) - - go d.iterate(ctx, iterator, MaxPeersToDiscover) - - <-ctx.Done() - - d.log.Warn("Discv5 loop stopped") -} - -func (d *DiscoveryV5) FindNodes(ctx context.Context, topic string, opts ...discovery.Option) ([]PeerRecord, error) { - // Get options - var options discovery.Options - err := options.Apply(opts...) - if err != nil { - return nil, err - } - - limit := options.Limit - if limit == 0 || limit > MaxPeersToDiscover { - limit = MaxPeersToDiscover - } - - // We are ignoring the topic. Future versions might use a map[string]*peerCache instead where the string represents the pubsub topic - - d.peerCache.Lock() - defer d.peerCache.Unlock() + defer d.wg.Done() - d.removeExpiredPeers() + ch := make(chan struct{}, 1) + ch <- struct{}{} // Initial execution - // Randomize and fill channel with available records - count := len(d.peerCache.recs) - if limit < count { - count = limit - } - - perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count] - permSet := make(map[int]int) - for i, v := range perm { - permSet[v] = i - } - - sendLst := make([]PeerRecord, count) - iter := 0 - for k := range d.peerCache.recs { - if sendIndex, ok := permSet[iter]; ok { - sendLst[sendIndex] = d.peerCache.recs[k] +restartLoop: + for { + select { + case <-ch: + if d.listener == nil { + break + } + d.iterate(ctx) + ch <- struct{}{} + case <-ctx.Done(): + close(ch) + break restartLoop } - iter++ - } - - return sendLst, err -} - -func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { - records, err := d.FindNodes(ctx, topic, opts...) - if err != nil { - return nil, err } - - chPeer := make(chan peer.AddrInfo, len(records)) - for _, r := range records { - chPeer <- r.Peer - } - - close(chPeer) - - return chPeer, err + d.log.Warn("Discv5 loop stopped") } func (d *DiscoveryV5) IsStarted() bool { diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index 8b91324e6..92f7f0047 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -21,7 +21,6 @@ import ( "github.com/libp2p/go-libp2p" libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/host" ) @@ -106,7 +105,8 @@ func TestDiscV5(t *testing.T) { ip1, _ := extractIP(host1.Addrs()[0]) l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - d1, err := NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), WithUDPPort(uint(udpPort1))) + peerconn1 := tests.NewTestPeerDiscoverer() + d1, err := NewDiscoveryV5(host1, prvKey1, l1, peerconn1, utils.Logger(), WithUDPPort(uint(udpPort1))) require.NoError(t, err) // H2 @@ -116,7 +116,8 @@ func TestDiscV5(t *testing.T) { require.NoError(t, err) l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - d2, err := NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + peerconn2 := tests.NewTestPeerDiscoverer() + d2, err := NewDiscoveryV5(host2, prvKey2, l2, peerconn2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) // H3 @@ -126,7 +127,8 @@ func TestDiscV5(t *testing.T) { require.NoError(t, err) l3, err := newLocalnode(prvKey3, ip3, udpPort3, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - d3, err := NewDiscoveryV5(host3, prvKey3, l3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + peerconn3 := tests.NewTestPeerDiscoverer() + d3, err := NewDiscoveryV5(host3, prvKey3, l3, peerconn3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) defer d1.Stop() @@ -142,92 +144,18 @@ func TestDiscV5(t *testing.T) { err = d3.Start(context.Background()) require.NoError(t, err) - time.Sleep(3 * time.Second) // Wait for nodes to be discovered + time.Sleep(2 * time.Second) // Wait for nodes to be discovered - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - peerChan, err := d3.FindPeers(ctx, "", discovery.Limit(2)) - require.NoError(t, err) - - foundHost1 := false - foundHost2 := false - for p := range peerChan { - if p.Addrs[0].String() == host1.Addrs()[0].String() { - foundHost1 = true - } - - if p.Addrs[0].String() == host2.Addrs()[0].String() { - foundHost2 = true - - } - } - - require.True(t, foundHost1 && foundHost2) - - // Should return nodes from the cache + require.True(t, peerconn3.HasPeer(host1.ID()) && peerconn3.HasPeer(host2.ID())) d3.Stop() - - foundHost1 = false - foundHost2 = false - - ctx1, cancel1 := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel1() - - peerChan, err = d3.FindPeers(ctx1, "", discovery.Limit(2)) - require.NoError(t, err) - for p := range peerChan { - if p.Addrs[0].String() == host1.Addrs()[0].String() { - foundHost1 = true - } - - if p.Addrs[0].String() == host2.Addrs()[0].String() { - foundHost2 = true - } - } - - require.True(t, foundHost1 && foundHost2) - - // Simulate empty cache - - for i := range d3.peerCache.recs { - delete(d3.peerCache.recs, i) - } - - ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel2() - - peerChan, err = d3.FindPeers(ctx2, "", discovery.Limit(2)) - require.NoError(t, err) - for range peerChan { - require.Fail(t, "Should not have peers") - } + peerconn3.Clear() // Restart peer search err = d3.Start(context.Background()) require.NoError(t, err) - time.Sleep(3 * time.Second) // Wait for nodes to be discovered - - foundHost1 = false - foundHost2 = false - - ctx3, cancel3 := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel3() - - peerChan, err = d3.FindPeers(ctx3, "", discovery.Limit(2)) - require.NoError(t, err) - for p := range peerChan { - if p.Addrs[0].String() == host1.Addrs()[0].String() { - foundHost1 = true - } - - if p.Addrs[0].String() == host2.Addrs()[0].String() { - foundHost2 = true - } - } - - require.True(t, foundHost1 && foundHost2) + time.Sleep(2 * time.Second) // Wait for nodes to be discovered + require.True(t, peerconn3.HasPeer(host1.ID()) && peerconn3.HasPeer(host2.ID())) } diff --git a/waku/v2/node/service.go b/waku/v2/node/service.go index e67b76e16..ef0e95e5a 100644 --- a/waku/v2/node/service.go +++ b/waku/v2/node/service.go @@ -3,6 +3,7 @@ package node import ( "context" + "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" ) @@ -15,3 +16,8 @@ type ReceptorService interface { Service MessageChannel() chan *protocol.Envelope } + +type PeerConnectorService interface { + Service + PeerChannel() chan<- peer.AddrInfo +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 56ff06631..4b3c263ee 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -4,12 +4,12 @@ import ( "context" "errors" "fmt" + "math/rand" "net" "sync" "time" "github.com/libp2p/go-libp2p" - pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/zap" "github.com/ethereum/go-ethereum/common" @@ -21,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/discovery/backoff" ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" ma "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats" @@ -70,14 +71,15 @@ type WakuNode struct { log *zap.Logger timesource timesource.Timesource - relay Service - lightPush Service - swap Service - discoveryV5 Service - peerExchange Service - filter ReceptorService - store ReceptorService - rlnRelay RLNRelay + relay Service + lightPush Service + swap Service + peerConnector PeerConnectorService + discoveryV5 Service + peerExchange Service + filter ReceptorService + store ReceptorService + rlnRelay RLNRelay wakuFlag utils.WakuEnrBitfield @@ -178,16 +180,24 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.log.Error("creating localnode", zap.Error(err)) } + // Setup peer connection strategy + cacheSize := 600 + rngSrc := rand.NewSource(rand.Int63()) + minBackoff, maxBackoff := time.Second*30, time.Hour + bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) + w.peerConnector, err = v2.NewPeerConnectionStrategy(host, cacheSize, w.opts.discoveryMinPeers, network.DialPeerTimeout, bkf, w.log) + if err != nil { + w.log.Error("creating peer connection strategy", zap.Error(err)) + } + if w.opts.enableDiscV5 { err := w.mountDiscV5() if err != nil { return nil, err } - - w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.DiscV5(), w.opts.discV5Opts...)) } - w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.log) + w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.peerConnector, w.log) if err != nil { return nil, err } @@ -280,6 +290,11 @@ func (w *WakuNode) Start(ctx context.Context) error { go w.startKeepAlive(ctx, w.opts.keepAliveInterval) } + err := w.peerConnector.Start(ctx) + if err != nil { + return err + } + if w.opts.enableNTP { err := w.timesource.Start(ctx) if err != nil { @@ -328,7 +343,7 @@ func (w *WakuNode) Start(ctx context.Context) error { w.bcaster.Register(nil, w.filter.MessageChannel()) } - err := w.setupENR(ctx, w.ListenAddresses()) + err = w.setupENR(ctx, w.ListenAddresses()) if err != nil { return err } @@ -375,6 +390,8 @@ func (w *WakuNode) Stop() { w.discoveryV5.Stop() } + w.peerConnector.Stop() + _ = w.stopRlnRelay() w.timesource.Stop() @@ -531,7 +548,7 @@ func (w *WakuNode) mountDiscV5() error { } var err error - w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.log, discV5Options...) + w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.peerConnector, w.log, discV5Options...) return err } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 7d9f93504..71cdd6d33 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -74,10 +74,11 @@ type WakuNodeParameters struct { swapDisconnectThreshold int swapPaymentThreshold int + discoveryMinPeers int + enableDiscV5 bool udpPort uint discV5bootnodes []*enode.Node - discV5Opts []pubsub.DiscoverOpt discV5autoUpdate bool enablePeerExchange bool @@ -108,6 +109,7 @@ type WakuNodeOption func(*WakuNodeParameters) error // Default options used in the libp2p node var DefaultWakuNodeOptions = []WakuNodeOption{ + WithDiscoverParams(150), WithLogger(utils.Logger()), } @@ -281,13 +283,19 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option) } } +func WithDiscoverParams(minPeers int) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.discoveryMinPeers = minPeers + return nil + } +} + // WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery -func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { +func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableDiscV5 = true params.udpPort = udpPort params.discV5bootnodes = bootnodes - params.discV5Opts = discoverOpts params.discV5autoUpdate = autoUpdate return nil } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange.go index a1c34ff47..d6a815904 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange.go @@ -18,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/libp2p/go-msgio/protoio" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/discv5" @@ -33,7 +32,6 @@ import ( const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1") const MaxCacheSize = 1000 const CacheCleanWindow = 200 -const dialTimeout = 30 * time.Second var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") @@ -51,41 +49,35 @@ type WakuPeerExchange struct { log *zap.Logger - cancel context.CancelFunc - started bool + cancel context.CancelFunc + wg sync.WaitGroup - connector *backoff.BackoffConnector + peerConnector PeerConnector + peerCh chan peer.AddrInfo enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ enrCacheMutex sync.RWMutex rng *rand.Rand } +type PeerConnector interface { + PeerChannel() chan<- peer.AddrInfo +} + // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct -func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) (*WakuPeerExchange, error) { +func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) { wakuPX := new(WakuPeerExchange) wakuPX.h = h wakuPX.disc = disc wakuPX.log = log.Named("wakupx") wakuPX.enrCache = make(map[enode.ID]peerRecord) wakuPX.rng = rand.New(rand.NewSource(rand.Int63())) - - cacheSize := 600 - rngSrc := rand.NewSource(rand.Int63()) - minBackoff, maxBackoff := time.Second*30, time.Hour - bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) - connector, err := backoff.NewBackoffConnector(h, cacheSize, dialTimeout, bkf) - - if err != nil { - return nil, err - } - wakuPX.connector = connector - + wakuPX.peerConnector = peerConnector return wakuPX, nil } // Start inits the peer exchange protocol func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error { - if wakuPX.started { + if wakuPX.cancel != nil { return errors.New("peer exchange already started") } @@ -93,14 +85,13 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) wakuPX.cancel = cancel - wakuPX.started = true + wakuPX.peerCh = make(chan peer.AddrInfo) wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx)) wakuPX.log.Info("Peer exchange protocol started") wakuPX.wg.Add(1) go wakuPX.runPeerExchangeDiscv5Loop(ctx) - return nil } @@ -128,20 +119,22 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb return err } - if wakuPX.h.Network().Connectedness(peerInfo.ID) != network.Connected { - peers = append(peers, *peerInfo) - } + peers = append(peers, *peerInfo) } if len(peers) != 0 { log.Info("connecting to newly discovered peers", zap.Int("count", len(peers))) - - ch := make(chan peer.AddrInfo, len(peers)) - for _, p := range peers { - ch <- p - } - - wakuPX.connector.Connect(ctx, ch) + wakuPX.wg.Add(1) + go func() { + defer wakuPX.wg.Done() + for _, p := range peers { + select { + case <-ctx.Done(): + return + case wakuPX.peerConnector.PeerChannel() <- p: + } + } + }() } return nil @@ -212,8 +205,9 @@ func (wakuPX *WakuPeerExchange) Stop() { if wakuPX.cancel == nil { return } - wakuPX.cancel() wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1) + wakuPX.cancel() + close(wakuPX.peerCh) wakuPX.wg.Wait() } @@ -317,28 +311,41 @@ func (wakuPX *WakuPeerExchange) cleanCache() { wakuPX.enrCache = r } -func (wakuPX *WakuPeerExchange) findPeers(ctx context.Context) { - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - peerRecords, err := wakuPX.disc.FindNodes(ctx, "") +func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) { + iterator, err := wakuPX.disc.Iterator() if err != nil { - wakuPX.log.Error("finding peers", zap.Error(err)) + wakuPX.log.Debug("obtaining iterator", zap.Error(err)) + return } + defer iterator.Close() - cnt := 0 - wakuPX.enrCacheMutex.Lock() - for _, p := range peerRecords { - cnt++ - wakuPX.enrCache[p.Node.ID()] = peerRecord{ - idx: len(wakuPX.enrCache), - node: p.Node, + for { + if ctx.Err() != nil { + break } - } - wakuPX.enrCacheMutex.Unlock() - wakuPX.log.Info("discovered px peers via discv5", zap.Int("count", cnt)) + exists := iterator.Next() + if !exists { + break + } - wakuPX.cleanCache() + addresses, err := utils.Multiaddress(iterator.Node()) + if err != nil { + wakuPX.log.Error("extracting multiaddrs from enr", zap.Error(err)) + continue + } + + if len(addresses) == 0 { + continue + } + + wakuPX.enrCacheMutex.Lock() + wakuPX.enrCache[iterator.Node().ID()] = peerRecord{ + idx: len(wakuPX.enrCache), + node: iterator.Node(), + } + wakuPX.enrCacheMutex.Unlock() + } } func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) { @@ -350,24 +357,23 @@ func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) { return } - wakuPX.log.Info("starting peer exchange discovery v5 loop") + ch := make(chan struct{}, 1) + ch <- struct{}{} // Initial execution - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - // This loop "competes" with the loop in wakunode2 - // For the purpose of collecting px peers, 30 sec intervals should be enough - - wakuPX.findPeers(ctx) - +restartLoop: for { select { - case <-ctx.Done(): - return - + case <-ch: + wakuPX.iterate(ctx) + ch <- struct{}{} case <-ticker.C: - wakuPX.findPeers(ctx) + wakuPX.cleanCache() + case <-ctx.Done(): + close(ch) + break restartLoop } - } } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 93b933fe0..2e22bd758 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -105,7 +105,8 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { ip1, _ := extractIP(host1.Addrs()[0]) l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) + discv5PeerConn1 := tests.NewTestPeerDiscoverer() + d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, discv5PeerConn1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) require.NoError(t, err) // H2 @@ -115,7 +116,8 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { require.NoError(t, err) l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()})) + discv5PeerConn2 := tests.NewTestPeerDiscoverer() + d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, discv5PeerConn2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()})) require.NoError(t, err) // H3 @@ -136,10 +138,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { time.Sleep(3 * time.Second) // Wait some time for peers to be discovered // mount peer exchange - px1, err := NewWakuPeerExchange(host1, d1, utils.Logger()) + pxPeerConn1 := tests.NewTestPeerDiscoverer() + px1, err := NewWakuPeerExchange(host1, d1, pxPeerConn1, utils.Logger()) require.NoError(t, err) - px3, err := NewWakuPeerExchange(host3, nil, utils.Logger()) + pxPeerConn3 := tests.NewTestPeerDiscoverer() + px3, err := NewWakuPeerExchange(host3, nil, pxPeerConn3, utils.Logger()) require.NoError(t, err) err = px1.Start(context.Background()) @@ -157,8 +161,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic - peer2Info := host3.Peerstore().PeerInfo(host2.ID()) - require.Equal(t, host2.Addrs()[0], peer2Info.Addrs[0]) + require.True(t, pxPeerConn3.HasPeer(host2.ID())) px1.Stop() px3.Stop()