Skip to content

Commit

Permalink
remove import cycle loop and set discv5 in peermanager
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 26, 2023
1 parent b07334e commit 0eb933b
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 12 deletions.
8 changes: 4 additions & 4 deletions waku/v2/discv5/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/waku/v2/peermanager"

wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"

"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestDiscV5(t *testing.T) {
ip1, _ := extractIP(host1.Addrs()[0])
l1, err := newLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
peerconn1 := peermanager.NewTestPeerDiscoverer()
peerconn1 := NewTestPeerDiscoverer()
d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
d1.SetHost(host1)
Expand All @@ -121,7 +121,7 @@ func TestDiscV5(t *testing.T) {
require.NoError(t, err)
l2, err := newLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
peerconn2 := peermanager.NewTestPeerDiscoverer()
peerconn2 := NewTestPeerDiscoverer()
d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
require.NoError(t, err)
d2.SetHost(host2)
Expand All @@ -133,7 +133,7 @@ func TestDiscV5(t *testing.T) {
require.NoError(t, err)
l3, err := newLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
peerconn3 := peermanager.NewTestPeerDiscoverer()
peerconn3 := NewTestPeerDiscoverer()
d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
require.NoError(t, err)
d3.SetHost(host3)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package peermanager
package discv5

import (
"context"
Expand Down
4 changes: 3 additions & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,9 @@ func (w *WakuNode) mountDiscV5() error {
}

var err error
w.discoveryV5, err = discv5.NewDiscoveryV5(w.opts.privKey, w.localNode, w.peerConnector, w.opts.prometheusReg, w.log, discV5Options...)
discv5Inst, err := discv5.NewDiscoveryV5(w.opts.privKey, w.localNode, w.peerConnector, w.opts.prometheusReg, w.log, discV5Options...)
w.discoveryV5 = discv5Inst
w.peermanager.SetDiscv5(discv5Inst)

return err
}
Expand Down
12 changes: 12 additions & 0 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/discv5"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
Expand Down Expand Up @@ -45,6 +46,7 @@ type PeerManager struct {
sub event.Subscription
topicMutex sync.RWMutex
subRelayTopics map[string]*NodeTopicDetails
discoveryService *discv5.DiscoveryV5
}

// PeerSelection provides various options based on which Peer is selected from a list of peers.
Expand Down Expand Up @@ -106,6 +108,10 @@ func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerM
return pm
}

func (pm *PeerManager) SetDiscv5(discv5 *discv5.DiscoveryV5) {
pm.discoveryService = discv5
}

// SetHost sets the host to be used in order to access the peerStore.
func (pm *PeerManager) SetHost(host host.Host) {
pm.host = host
Expand Down Expand Up @@ -183,6 +189,10 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee
return
}

/* func (pm *PeerManager) OnDemandPeerDiscovery(cluster int16, shard int16, wakuServices []string) {
} */

// ensureMinRelayConnsPerTopic makes sure there are min of D conns per pubsubTopic.
// If not it will look into peerStore to initiate more connections.
// If peerStore doesn't have enough peers, will wait for discv5 to find more and try in next cycle
Expand Down Expand Up @@ -491,6 +501,8 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string,
if err == nil {
peerIDPtr = &peerID
} else {
//TODO: Trigger on-demand discovery for this topic.

pm.logger.Debug("could not select random peer", zap.Error(err))
}
}
Expand Down
1 change: 0 additions & 1 deletion waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ func TestConnectToRelayPeers(t *testing.T) {
ctx, pm, deferFn := initTest(t)
pc, err := NewPeerConnectionStrategy(pm, 120*time.Second, pm.logger)
require.NoError(t, err)
pm.SetPeerConnector(pc)
err = pc.Start(ctx)
require.NoError(t, err)
pm.Start(ctx)
Expand Down
9 changes: 4 additions & 5 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/peermanager"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
Expand Down Expand Up @@ -108,7 +107,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
ip1, _ := extractIP(host1.Addrs()[0])
l1, err := newLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
discv5PeerConn1 := peermanager.NewTestPeerDiscoverer()
discv5PeerConn1 := discv5.NewTestPeerDiscoverer()
d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
d1.SetHost(host1)
Expand All @@ -120,7 +119,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
require.NoError(t, err)
l2, err := newLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
discv5PeerConn2 := peermanager.NewTestPeerDiscoverer()
discv5PeerConn2 := discv5.NewTestPeerDiscoverer()
d2, err := discv5.NewDiscoveryV5(prvKey2, l2, discv5PeerConn2, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()}))
require.NoError(t, err)
d2.SetHost(host2)
Expand All @@ -143,12 +142,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
time.Sleep(3 * time.Second) // Wait some time for peers to be discovered

// mount peer exchange
pxPeerConn1 := peermanager.NewTestPeerDiscoverer()
pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err)
px1.SetHost(host1)

pxPeerConn3 := peermanager.NewTestPeerDiscoverer()
pxPeerConn3 := discv5.NewTestPeerDiscoverer()
px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err)
px3.SetHost(host3)
Expand Down

0 comments on commit 0eb933b

Please sign in to comment.