Skip to content

Commit

Permalink
Clean Up Discovery Filtering of Peers (#6128)
Browse files Browse the repository at this point in the history
* clean up

* fix test

* rename

* context

* preston's review

* remove dep

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
nisdas and prylabs-bulldozer[bot] committed Jun 6, 2020
1 parent a1e3fc9 commit 0b70c3e
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 78 deletions.
69 changes: 69 additions & 0 deletions beacon-chain/p2p/discovery.go
Expand Up @@ -11,9 +11,11 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
iaddr "github.com/ipfs/go-ipfs-addr"
core "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// Listener defines the discovery V5 network interface that is used
Expand Down Expand Up @@ -122,6 +124,61 @@ func (s *Service) startDiscoveryV5(
return listener, nil
}

// filterPeer validates each node that we retrieve from our dht. We
// try to ascertain that the peer can be a valid protocol peer.
// Validity Conditions:
// 1) The local node is still actively looking for peers to
// connect to.
// 2) Peer has a valid IP and TCP port set in their enr.
// 3) Peer hasn't been marked as 'bad'
// 4) Peer is not currently active or connected.
// 5) Peer's fork digest in their ENR matches that of
// our localnodes.
func (s *Service) filterPeer(node *enode.Node) bool {
if len(s.Peers().Active()) >= int(s.cfg.MaxPeers) {
log.WithFields(logrus.Fields{"peer": node.String(),
"reason": "at peer limit"}).Trace("Not dialing peer")
return false
}
// ignore nodes with no ip address stored.
if node.IP() == nil {
return false
}
// do not dial nodes with their tcp ports not set
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Debug("Could not retrieve tcp port")
}
return false
}
peerData, multiAddr, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Debug("Could not convert to peer data")
return false
}
if s.peers.IsBad(peerData.ID) {
return false
}
if s.peers.IsActive(peerData.ID) {
return false
}
if s.host.Network().Connectedness(peerData.ID) == network.Connected {
return false
}
nodeENR := node.Record()
// Decide whether or not to connect to peer that does not
// match the proper fork ENR data with our local node.
if s.genesisValidatorsRoot != nil {
if err := s.compareForkENR(nodeENR); err != nil {
log.WithError(err).Trace("Fork ENR mismatches between peer and local node")
return false
}
}
// Add peer to peer handler.
s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown)
return true
}

// startDHTDiscovery supports discovery via DHT.
func startDHTDiscovery(host core.Host, bootstrapAddr string) error {
multiAddr, err := multiAddrFromString(bootstrapAddr)
Expand Down Expand Up @@ -182,6 +239,18 @@ func convertToMultiAddr(nodes []*enode.Node) []ma.Multiaddr {
return multiAddrs
}

func convertToAddrInfo(node *enode.Node) (*peer.AddrInfo, ma.Multiaddr, error) {
multiAddr, err := convertToSingleMultiAddr(node)
if err != nil {
return nil, nil, err
}
info, err := peer.AddrInfoFromP2pAddr(multiAddr)
if err != nil {
return nil, nil, err
}
return info, multiAddr, nil
}

func convertToSingleMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
ip4 := node.IP().To4()
if ip4 == nil {
Expand Down
35 changes: 29 additions & 6 deletions beacon-chain/p2p/fork_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/p2putils"
Expand Down Expand Up @@ -84,18 +85,29 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
// bootnode given all nodes provided by discv5 will have different fork digests.
cfg.UDPPort = 14000
cfg.TCPPort = 14001
cfg.MaxPeers = 30
s, err := NewService(cfg)
if err != nil {
t.Fatal(err)
}
s.genesisTime = genesisTime
s.genesisValidatorsRoot = make([]byte, 32)
s.dv5Listener = lastListener
multiAddrs := s.processPeers(nodes)
addrs := []ma.Multiaddr{}

for _, n := range nodes {
if s.filterPeer(n) {
addr, err := convertToSingleMultiAddr(n)
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, addr)
}
}

// We should not have valid peers if the fork digest mismatched.
if len(multiAddrs) != 0 {
t.Errorf("Expected 0 valid peers, got %d", len(multiAddrs))
if len(addrs) != 0 {
t.Errorf("Expected 0 valid peers, got %d", len(addrs))
}
if err := s.Stop(); err != nil {
t.Fatal(err)
Expand All @@ -104,7 +116,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {

func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
hook := logTest.NewGlobal()
logrus.SetLevel(logrus.DebugLevel)
logrus.SetLevel(logrus.TraceLevel)
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
genesisTime := time.Now()
Expand Down Expand Up @@ -171,6 +183,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
// bootnode given all nodes provided by discv5 will have different fork digests.
cfg.UDPPort = 14000
cfg.TCPPort = 14001
cfg.MaxPeers = 30
s, err := NewService(cfg)
if err != nil {
t.Fatal(err)
Expand All @@ -179,8 +192,18 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
s.genesisTime = genesisTime
s.genesisValidatorsRoot = make([]byte, 32)
s.dv5Listener = lastListener
multiAddrs := s.processPeers(nodes)
if len(multiAddrs) == 0 {
addrs := []ma.Multiaddr{}

for _, n := range nodes {
if s.filterPeer(n) {
addr, err := convertToSingleMultiAddr(n)
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, addr)
}
}
if len(addrs) == 0 {
t.Error("Expected to have valid peers, got 0")
}

Expand Down
96 changes: 24 additions & 72 deletions beacon-chain/p2p/service.go
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/sirupsen/logrus"
)

var _ = shared.Service(&Service{})
Expand Down Expand Up @@ -438,11 +437,7 @@ func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) {
}
for _, comIdx := range subnets {
if comIdx == index {
multiAddr, err := convertToSingleMultiAddr(node)
if err != nil {
return false, err
}
info, err := peer.AddrInfoFromP2pAddr(multiAddr)
info, multiAddr, err := convertToAddrInfo(node)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -510,17 +505,30 @@ func (s *Service) awaitStateInitialized() {

// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
runutil.RunEvery(s.ctx, pollingPeriod, func() {
iterator := s.dv5Listener.RandomNodes()
nodes := enode.ReadNodes(iterator, lookupLimit)
iterator.Close()
multiAddresses := s.processPeers(nodes)
// do not process a large amount than required peers.
if len(multiAddresses) > lookupLimit {
multiAddresses = multiAddresses[:lookupLimit]
iterator := s.dv5Listener.RandomNodes()
iterator = enode.Filter(iterator, s.filterPeer)
defer iterator.Close()
for {
// Exit if service's context is canceled
if s.ctx.Err() != nil {
break
}
s.connectWithAllPeers(multiAddresses)
})
exists := iterator.Next()
if !exists {
break
}
node := iterator.Node()
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(*info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(peerInfo)
}
}

func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) {
Expand All @@ -540,11 +548,6 @@ func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) {
}

func (s *Service) connectWithPeer(info peer.AddrInfo) error {
if len(s.Peers().Active()) >= int(s.cfg.MaxPeers) {
log.WithFields(logrus.Fields{"peer": info.ID.String(),
"reason": "at peer limit"}).Trace("Not dialing peer")
return nil
}
if info.ID == s.host.ID() {
return nil
}
Expand All @@ -558,57 +561,6 @@ func (s *Service) connectWithPeer(info peer.AddrInfo) error {
return nil
}

// process new peers that come in from our dht.
func (s *Service) processPeers(nodes []*enode.Node) []ma.Multiaddr {
var multiAddrs []ma.Multiaddr
for _, node := range nodes {
// ignore nodes with no ip address stored.
if node.IP() == nil {
continue
}
// do not dial nodes with their tcp ports not set
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Error("Could not retrieve tcp port")
}
continue
}
multiAddr, err := convertToSingleMultiAddr(node)
if err != nil {
log.WithError(err).Error("Could not convert to multiAddr")
continue
}
peerData, err := peer.AddrInfoFromP2pAddr(multiAddr)
if err != nil {
log.WithError(err).Error("Could not get peer id")
continue
}
if s.peers.IsBad(peerData.ID) {
continue
}
if s.peers.IsActive(peerData.ID) {
continue
}
if s.host.Network().Connectedness(peerData.ID) == network.Connected {
continue
}
nodeENR := node.Record()
// Decide whether or not to connect to peer that does not
// match the proper fork ENR data with our local node.
if s.genesisValidatorsRoot != nil {
if err := s.compareForkENR(nodeENR); err != nil {
log.WithError(err).Debug("Fork ENR mismatches between peer and local node")
continue
}
}

// Add peer to peer handler.
s.peers.Add(nodeENR, peerData.ID, multiAddr, network.DirUnknown)
multiAddrs = append(multiAddrs, multiAddr)
}
return multiAddrs
}

func (s *Service) connectToBootnodes() error {
nodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddr))
for _, addr := range s.cfg.Discv5BootStrapAddr {
Expand Down

0 comments on commit 0b70c3e

Please sign in to comment.