Skip to content

Commit

Permalink
Add Buffer For Inbound Peers (#8018)
Browse files Browse the repository at this point in the history
* allow inbound peers

* comment

* gaz

* gaz

* Update deps.bzl

Co-authored-by: Shay Zluf <thezluf@gmail.com>

* add test

* re-arrange imports

* fix up

* fix tests

* gaz

* Update beacon-chain/p2p/service_test.go

* fmt

Co-authored-by: Victor Farazdagi <simple.square@gmail.com>
Co-authored-by: Shay Zluf <thezluf@gmail.com>
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
  • Loading branch information
4 people committed Dec 2, 2020
1 parent cf3181e commit 387f7b2
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 19 deletions.
3 changes: 2 additions & 1 deletion beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ go_library(
"@com_github_libp2p_go_libp2p_noise//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_libp2p_go_libp2p_secio//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_multiformats_go_multiaddr_net//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
Expand Down Expand Up @@ -123,6 +122,7 @@ go_test(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
Expand Down Expand Up @@ -151,6 +151,7 @@ go_test(
"@com_github_libp2p_go_libp2p_core//host:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_libp2p_go_libp2p_noise//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_libp2p_go_libp2p_swarm//testing:go_default_library",
Expand Down
16 changes: 11 additions & 5 deletions beacon-chain/p2p/connection_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ import (
"github.com/sirupsen/logrus"
)

// limit for rate limiter when processing new inbound dials.
const ipLimit = 4
const (
// Limit for rate limiter when processing new inbound dials.
ipLimit = 4

// burst limit for inbound dials.
const ipBurst = 8
// Burst limit for inbound dials.
ipBurst = 8

// High watermark buffer signifies the buffer till which
// we will handle inbound requests.
highWatermarkBuffer = 10
)

// InterceptPeerDial tests whether we're permitted to Dial the specified peer.
func (s *Service) InterceptPeerDial(_ peer.ID) (allow bool) {
Expand All @@ -44,7 +50,7 @@ func (s *Service) InterceptAccept(n network.ConnMultiaddrs) (allow bool) {
"reason": "exceeded dial limit"}).Trace("Not accepting inbound dial from ip address")
return false
}
if s.isPeerAtLimit() {
if s.isPeerAtLimit(true /* inbound */) {
log.WithFields(logrus.Fields{"peer": n.RemoteMultiaddr(),
"reason": "at peer limit"}).Trace("Not accepting inbound dial")
return false
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/p2p/connection_gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func TestPeer_AtMaxLimit(t *testing.T) {
require.NoError(t, err)
}()

for i := 0; i < highWatermarkBuffer; i++ {
addPeer(t, s.peers, peers.PeerConnected)
}

// create alternate host
listen, err = multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr2, 3000))
require.NoError(t, err, "Failed to p2p listen")
Expand Down
9 changes: 7 additions & 2 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *Service) listenForNewNodes() {
if s.ctx.Err() != nil {
break
}
if s.isPeerAtLimit() {
if s.isPeerAtLimit(false /* inbound */) {
// Pause the main loop for a period to stop looking
// for new peers.
log.Trace("Not looking for peers, at peer limit")
Expand Down Expand Up @@ -276,9 +276,14 @@ func (s *Service) filterPeer(node *enode.Node) bool {
// This checks our set max peers in our config, and
// determines whether our currently connected and
// active peers are above our set max peer limit.
func (s *Service) isPeerAtLimit() bool {
func (s *Service) isPeerAtLimit(inbound bool) bool {
numOfConns := len(s.host.Network().Peers())
maxPeers := int(s.cfg.MaxPeers)
// If we are measuring the limit for inbound peers
// we apply the high watermark buffer.
if inbound {
maxPeers += highWatermarkBuffer
}
activePeers := len(s.Peers().Active())

return activePeers >= maxPeers || numOfConns >= maxPeers
Expand Down
56 changes: 56 additions & 0 deletions beacon-chain/p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@ import (

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
testp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/iputils"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
Expand Down Expand Up @@ -226,3 +237,48 @@ func TestHostIsResolved(t *testing.T) {
newIP := list.Self().IP()
assert.Equal(t, exampleIP, newIP.String(), "Did not resolve to expected IP")
}

func TestInboundPeerLimit(t *testing.T) {
fakePeer := testp2p.NewTestP2P(t)
s := &Service{
cfg: &Config{MaxPeers: 30},
ipLimiter: leakybucket.NewCollector(ipLimit, ipBurst, false),
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &scorers.Config{},
}),
host: fakePeer.BHost,
}

for i := 0; i < 30; i++ {
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
}

require.Equal(t, true, s.isPeerAtLimit(false), "not at limit for outbound peers")
require.Equal(t, false, s.isPeerAtLimit(true), "at limit for inbound peers")

for i := 0; i < highWatermarkBuffer; i++ {
_ = addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
}

require.Equal(t, true, s.isPeerAtLimit(true), "not at limit for inbound peers")
}

// addPeer is a helper to add a peer with a given connection state)
func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState) peer.ID {
// Set up some peers with different states
mhBytes := []byte{0x11, 0x04}
idBytes := make([]byte, 4)
_, err := rand.Read(idBytes)
require.NoError(t, err)
mhBytes = append(mhBytes, idBytes...)
id, err := peer.IDFromBytes(mhBytes)
require.NoError(t, err)
p.Add(new(enr.Record), id, nil, network.DirUnknown)
p.SetConnectionState(id, state)
p.SetMetadata(id, &pb.MetaData{
SeqNumber: 0,
Attnets: bitfield.NewBitvector64(),
})
return id
}
3 changes: 1 addition & 2 deletions beacon-chain/p2p/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
noise "github.com/libp2p/go-libp2p-noise"
secio "github.com/libp2p/go-libp2p-secio"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/version"
Expand Down Expand Up @@ -37,7 +36,7 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) []libp2p.Opt
libp2p.ConnectionGater(s),
}

options = append(options, libp2p.Security(noise.ID, noise.New), libp2p.Security(secio.ID, secio.New))
options = append(options, libp2p.Security(noise.ID, noise.New))

if cfg.EnableUPnP {
options = append(options, libp2p.NATPortMap()) //Allow to use UPnP
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/p2p/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
noise "github.com/libp2p/go-libp2p-noise"
"github.com/multiformats/go-multiaddr"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
Expand Down Expand Up @@ -72,7 +73,7 @@ func createHost(t *testing.T, port int) (host.Host, *ecdsa.PrivateKey, net.IP) {
ipAddr := net.ParseIP("127.0.0.1")
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, port))
require.NoError(t, err, "Failed to p2p listen")
h, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen)}...)
h, err := libp2p.New(context.Background(), []libp2p.Option{privKeyOption(pkey), libp2p.ListenAddrs(listen), libp2p.Security(noise.ID, noise.New)}...)
require.NoError(t, err)
return h, pkey, ipAddr
}
Expand Down
7 changes: 0 additions & 7 deletions deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -799,13 +799,6 @@ def prysm_deps():
sum = "h1:RYi2hDdss1u4YE7GwixGzWwVo47T8UQwnTLB6vQiq+o=",
version = "v2.1.0+incompatible",
)

go_repository(
name = "com_github_gofrs_flock",
importpath = "github.com/gofrs/flock",
sum = "h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=",
version = "v0.7.1",
)
go_repository(
name = "com_github_golang_freetype",
importpath = "github.com/golang/freetype",
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ require (
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-libp2p-noise v0.1.1
github.com/libp2p/go-libp2p-pubsub v0.3.6
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-swarm v0.2.8
github.com/libp2p/go-libp2p-tls v0.1.4-0.20200421131144-8a8ad624a291 // indirect
github.com/libp2p/go-mplex v0.1.3 // indirect
Expand Down

0 comments on commit 387f7b2

Please sign in to comment.