Skip to content

Commit

Permalink
Refactor Subnet Search (#8048)
Browse files Browse the repository at this point in the history
* checkpoint progress

* checkpoint

* clean up

* do better

* fix test

* fix

* fix

* preston's review

* fix

* fix

* fix

* add iterator

* go doc

* make it a config parameter

* Apply suggestions from code review

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
  • Loading branch information
nisdas and prestonvanloon committed Dec 12, 2020
1 parent 4ec396c commit 29804fa
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 94 deletions.
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"handshake.go",
"info.go",
"interfaces.go",
"iterator.go",
"log.go",
"monitoring.go",
"options.go",
Expand Down
24 changes: 8 additions & 16 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
// GossipTypeMapping.
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")

// Max number of attempts to search the network for a specific subnet.
const maxSubnetDiscoveryAttempts = 3

// Broadcast a message to the p2p network.
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
Expand Down Expand Up @@ -75,7 +72,7 @@ func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *

// Ensure we have peers with this subnet.
s.subnetLocker(subnet).RLock()
hasPeer := s.hasPeerWithSubnet(subnet)
hasPeer := s.hasPeerWithSubnet(attestationToTopic(subnet, forkDigest))
s.subnetLocker(subnet).RUnlock()

span.AddAttributes(
Expand All @@ -89,18 +86,13 @@ func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *
if err := func() error {
s.subnetLocker(subnet).Lock()
defer s.subnetLocker(subnet).Unlock()
for i := 0; i < maxSubnetDiscoveryAttempts; i++ {
if err := ctx.Err(); err != nil {
return err
}
ok, err := s.FindPeersWithSubnet(ctx, subnet)
if err != nil {
return err
}
if ok {
savedAttestationBroadcasts.Inc()
return nil
}
ok, err := s.FindPeersWithSubnet(ctx, attestationToTopic(subnet, forkDigest), subnet, 1)
if err != nil {
return err
}
if ok {
savedAttestationBroadcasts.Inc()
return nil
}
return errors.New("failed to find peers for subnet")
}(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ func (s *Service) startDiscoveryV5(
// 6) Peer's fork digest in their ENR matches that of
// our localnodes.
func (s *Service) filterPeer(node *enode.Node) bool {
// Ignore nil node entries passed in.
if node == nil {
return false
}
// ignore nodes with no ip address stored.
if node.IP() == nil {
return false
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type PeerManager interface {
Host() host.Host
ENR() *enr.Record
RefreshENR()
FindPeersWithSubnet(ctx context.Context, index uint64) (bool, error)
FindPeersWithSubnet(ctx context.Context, topic string, index, threshold uint64) (bool, error)
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
}

Expand Down
36 changes: 36 additions & 0 deletions beacon-chain/p2p/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package p2p

import (
"context"

"github.com/ethereum/go-ethereum/p2p/enode"
)

// filterNodes wraps an iterator such that Next only returns nodes for which
// the 'check' function returns true. This custom implementation also
// checks for context deadlines so that in the event the parent context has
// expired, we do exit from the search rather than perform more network
// lookups for additional peers.
func filterNodes(ctx context.Context, it enode.Iterator, check func(*enode.Node) bool) enode.Iterator {
return &filterIter{ctx, it, check}
}

type filterIter struct {
context.Context
enode.Iterator
check func(*enode.Node) bool
}

// Next looks up for the next valid node according to our
// filter criteria.
func (f *filterIter) Next() bool {
for f.Iterator.Next() {
if f.Context.Err() != nil {
return false
}
if f.check(f.Node()) {
return true
}
}
return false
}
87 changes: 48 additions & 39 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/go-bitfield"
"go.opencensus.io/trace"

Expand All @@ -20,8 +19,10 @@ var attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey

// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then we try to connect
// with those peers.
func (s *Service) FindPeersWithSubnet(ctx context.Context, index uint64) (bool, error) {
// with those peers. This method will block until the required amount of
// peers are found, the method only exits in the event of context timeouts.
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
index, threshold uint64) (bool, error) {
ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
defer span.End()

Expand All @@ -31,59 +32,67 @@ func (s *Service) FindPeersWithSubnet(ctx context.Context, index uint64) (bool,
// return if discovery isn't set
return false, nil
}

topic += s.Encoding().ProtocolSuffix()
iterator := s.dv5Listener.RandomNodes()
nodes := enode.ReadNodes(iterator, lookupLimit)
exists := false
for _, node := range nodes {
iterator = filterNodes(ctx, iterator, s.filterPeerForSubnet(index))

currNum := uint64(len(s.pubsub.ListPeers(topic)))
wg := new(sync.WaitGroup)
for {
if err := ctx.Err(); err != nil {
return false, err
}
if node.IP() == nil {
continue
if currNum >= threshold {
break
}
// do not look for nodes with no tcp port set
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Debug("Could not retrieve tcp port")
nodes := enode.ReadNodes(iterator, int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch))
for _, node := range nodes {
info, _, err := convertToAddrInfo(node)
if err != nil {
continue
}
continue
wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}()
}
// Wait for all dials to be completed.
wg.Wait()
currNum = uint64(len(s.pubsub.ListPeers(topic)))
}
return true, nil
}

// returns a method with filters peers specifically for a particular attestation subnet.
func (s *Service) filterPeerForSubnet(index uint64) func(node *enode.Node) bool {
return func(node *enode.Node) bool {
if !s.filterPeer(node) {
return false
}
subnets, err := retrieveAttSubnets(node.Record())
if err != nil {
log.Debugf("could not retrieve subnets: %v", err)
continue
return false
}
indExists := false
for _, comIdx := range subnets {
if comIdx == index {
info, multiAddr, err := convertToAddrInfo(node)
if err != nil {
return false, err
}
if s.peers.IsActive(info.ID) {
exists = true
continue
}
if s.host.Network().Connectedness(info.ID) == network.Connected {
exists = true
continue
}
if !s.peers.IsReadyToDial(info.ID) {
continue
}
s.peers.Add(node.Record(), info.ID, multiAddr, network.DirUnknown)
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
continue
}
exists = true
indExists = true
break
}
}
return indExists
}
return exists, nil
}

func (s *Service) hasPeerWithSubnet(subnet uint64) bool {
return len(s.Peers().SubscribedToSubnet(subnet)) > 0
// lower threshold to broadcast object compared to searching
// for a subnet. So that even in the event of poor peer
// connectivity, we can still broadcast an attestation.
func (s *Service) hasPeerWithSubnet(topic string) bool {
return len(s.pubsub.ListPeers(topic+s.Encoding().ProtocolSuffix())) >= 1
}

// Updates the service's discv5 listener record's attestation subnet
Expand Down
9 changes: 5 additions & 4 deletions beacon-chain/p2p/subnets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
Expand Down Expand Up @@ -105,11 +106,11 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {

// look up 3 different subnets
ctx := context.Background()
exists, err := s.FindPeersWithSubnet(ctx, 1)
exists, err := s.FindPeersWithSubnet(ctx, "", 1, params.BeaconNetworkConfig().MinimumPeersInSubnet)
require.NoError(t, err)
exists2, err := s.FindPeersWithSubnet(ctx, 2)
exists2, err := s.FindPeersWithSubnet(ctx, "", 2, params.BeaconNetworkConfig().MinimumPeersInSubnet)
require.NoError(t, err)
exists3, err := s.FindPeersWithSubnet(ctx, 3)
exists3, err := s.FindPeersWithSubnet(ctx, "", 3, params.BeaconNetworkConfig().MinimumPeersInSubnet)
require.NoError(t, err)
if !exists || !exists2 || !exists3 {
t.Fatal("Peer with subnet doesn't exist")
Expand All @@ -126,7 +127,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
testService.RefreshENR()
time.Sleep(2 * time.Second)

exists, err = s.FindPeersWithSubnet(ctx, 2)
exists, err = s.FindPeersWithSubnet(ctx, "", 2, params.BeaconNetworkConfig().MinimumPeersInSubnet)
require.NoError(t, err)

assert.Equal(t, true, exists, "Peer with subnet doesn't exist")
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (p *FakeP2P) ENR() *enr.Record {
}

// FindPeersWithSubnet mocks the p2p func.
func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ uint64) (bool, error) {
func (p *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/mock_peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m MockPeerManager) ENR() *enr.Record {
func (m MockPeerManager) RefreshENR() {}

// FindPeersWithSubnet .
func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ uint64) (bool, error) {
func (m MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
return true, nil
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (p *TestP2P) Peers() *peers.Status {
}

// FindPeersWithSubnet mocks the p2p func.
func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ uint64) (bool, error) {
func (p *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _, _ uint64) (bool, error) {
return false, nil
}

Expand Down
48 changes: 19 additions & 29 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,14 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.Vali
}
// Check every slot that there are enough peers
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
if !s.validPeersExist(topic, i) {
if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, i)) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", i)
go func(idx uint64) {
_, err := s.p2p.FindPeersWithSubnet(s.ctx, idx)
if err != nil {
log.Debugf("Could not search for peers: %v", err)
return
}
}(i)
return
_, err := s.p2p.FindPeersWithSubnet(s.ctx, s.addDigestAndIndexToTopic(topic, i), i, params.BeaconNetworkConfig().MinimumPeersInSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
return
}
}
}
}
Expand Down Expand Up @@ -330,42 +327,35 @@ func (s *Service) subscribeAggregatorSubnet(
if _, exists := subscriptions[idx]; !exists {
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
}
if !s.validPeersExist(subnetTopic, idx) {
if !s.validPeersExist(subnetTopic) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
go func(idx uint64) {
_, err := s.p2p.FindPeersWithSubnet(s.ctx, idx)
if err != nil {
log.Debugf("Could not search for peers: %v", err)
return
}
}(idx)
return
_, err := s.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
}

// lookup peers for attester specific subnets.
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
if !s.validPeersExist(subnetTopic, idx) {
if !s.validPeersExist(subnetTopic) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
go func(idx uint64) {
// perform a search for peers with the desired committee index.
_, err := s.p2p.FindPeersWithSubnet(s.ctx, idx)
if err != nil {
log.Debugf("Could not search for peers: %v", err)
return
}
}(idx)
// perform a search for peers with the desired committee index.
_, err := s.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, params.BeaconNetworkConfig().MinimumPeersInSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
}

// find if we have peers who are subscribed to the same subnet
func (s *Service) validPeersExist(subnetTopic string, idx uint64) bool {
func (s *Service) validPeersExist(subnetTopic string) bool {
numOfPeers := s.p2p.PubSub().ListPeers(subnetTopic + s.p2p.Encoding().ProtocolSuffix())
return len(s.p2p.Peers().SubscribedToSubnet(idx)) > 0 || len(numOfPeers) > 0
return uint64(len(numOfPeers)) >= params.BeaconNetworkConfig().MinimumPeersInSubnet
}

// Add fork digest to topic.
Expand Down
2 changes: 2 additions & 0 deletions shared/params/mainnet_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var mainnetNetworkConfig = &NetworkConfig{
MessageDomainValidSnappy: [4]byte{01, 00, 00, 00},
ETH2Key: "eth2",
AttSubnetKey: "attnets",
MinimumPeersInSubnet: 4,
MinimumPeersInSubnetSearch: 20,
ContractDeploymentBlock: 11184524, // Note: contract was deployed in block 11052984 but no transactions were sent until 11184524.
DepositContractAddress: "0x00000000219ab540356cBB839Cbe05303d7705Fa",
ChainID: 1, // Chain ID of eth1 mainnet.
Expand Down
6 changes: 4 additions & 2 deletions shared/params/network_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ type NetworkConfig struct {
MessageDomainValidSnappy [4]byte `yaml:"MESSAGE_DOMAIN_VALID_SNAPPY"` // MessageDomainValidSnappy is the 4-byte domain for gossip message-id isolation of valid snappy messages.

// DiscoveryV5 Config
ETH2Key string // ETH2Key is the ENR key of the eth2 object in an enr.
AttSubnetKey string // AttSubnetKey is the ENR key of the subnet bitfield in the enr.
ETH2Key string // ETH2Key is the ENR key of the eth2 object in an enr.
AttSubnetKey string // AttSubnetKey is the ENR key of the subnet bitfield in the enr.
MinimumPeersInSubnet uint64 // MinimumPeersInSubnet is the required amount of peers that a node is to have its in subnet.
MinimumPeersInSubnetSearch uint64 // PeersInSubnetSearch is the required amount of peers that we need to be able to lookup in a subnet search.

// Chain Network Config
ContractDeploymentBlock uint64 // ContractDeploymentBlock is the eth1 block in which the deposit contract is deployed.
Expand Down

0 comments on commit 29804fa

Please sign in to comment.