Skip to content

Commit

Permalink
QSP 23: Prune Peers From Peer Handler (#6614)
Browse files Browse the repository at this point in the history
* checkpoint
* Merge refs/heads/master into prunePeers
  • Loading branch information
nisdas committed Jul 16, 2020
1 parent df73851 commit f6756bb
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 23 deletions.
4 changes: 2 additions & 2 deletions beacon-chain/p2p/connection_gater_test.go
Expand Up @@ -21,7 +21,7 @@ func TestPeer_AtMaxLimit(t *testing.T) {
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
require.NoError(t, err, "Failed to p2p listen")
s := &Service{}
s.peers = peers.NewStatus(3)
s.peers = peers.NewStatus(3, 0)
s.cfg = &Config{MaxPeers: 0}
s.addrFilter, err = configureFilter(&Config{})
require.NoError(t, err)
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestPeer_BelowMaxLimit(t *testing.T) {
listen, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr, 2000))
require.NoError(t, err, "Failed to p2p listen")
s := &Service{}
s.peers = peers.NewStatus(3)
s.peers = peers.NewStatus(3, 1)
s.cfg = &Config{MaxPeers: 1}
s.addrFilter, err = configureFilter(&Config{})
require.NoError(t, err)
Expand Down
72 changes: 71 additions & 1 deletion beacon-chain/p2p/peers/status.go
Expand Up @@ -51,6 +51,10 @@ const (
PeerConnecting
)

// Additional buffer beyond current peer limit, from which we can store
// the relevant peer statuses.
const maxLimitBuffer = 150

var (
// ErrPeerUnknown is returned when there is an attempt to obtain data from a peer that is not known.
ErrPeerUnknown = errors.New("peer unknown")
Expand All @@ -61,6 +65,7 @@ type Status struct {
lock sync.RWMutex
maxBadResponses int
status map[peer.ID]*peerStatus
maxLimit int
}

// peerStatus is the status of an individual peer at the protocol level.
Expand All @@ -76,10 +81,11 @@ type peerStatus struct {
}

// NewStatus creates a new status entity.
func NewStatus(maxBadResponses int) *Status {
func NewStatus(maxBadResponses int, peerLimit int) *Status {
return &Status{
maxBadResponses: maxBadResponses,
status: make(map[peer.ID]*peerStatus),
maxLimit: maxLimitBuffer + peerLimit,
}
}

Expand All @@ -88,6 +94,12 @@ func (p *Status) MaxBadResponses() int {
return p.maxBadResponses
}

// MaxPeerLimit returns the max peer limit stored in
// the current peer store.
func (p *Status) MaxPeerLimit() int {
return p.maxLimit
}

// Add adds a peer.
// If a peer already exists with this ID its address and direction are updated with the supplied data.
func (p *Status) Add(record *enr.Record, pid peer.ID, address ma.Multiaddr, direction network.Direction) {
Expand Down Expand Up @@ -421,6 +433,58 @@ func (p *Status) Decay() {
}
}

// Prune clears out and removes outdated and disconnected peers.
func (p *Status) Prune() {
currSize := p.totalSize()
// Exit early if there is nothing
// to prune.
if currSize <= p.maxLimit {
return
}
disconnected := p.Disconnected()

type peerResp struct {
pid peer.ID
badResp int
}
peersToPrune := make([]*peerResp, 0, len(disconnected))
p.lock.RLock()
// Select disconnected peers with a smaller
// bad response count.
for _, pid := range disconnected {
if p.status[pid].badResponses < p.maxBadResponses {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
badResp: p.status[pid].badResponses,
})
}
}
p.lock.RUnlock()

// Sort peers in ascending order, so the peers with the
// least amount of bad responses are pruned first. This
// is to protect the node from malicious/lousy peers so
// that their memory is still kept.
sort.Slice(peersToPrune, func(i, j int) bool {
return peersToPrune[i].badResp < peersToPrune[j].badResp
})

limitDiff := currSize - p.maxLimit

if limitDiff > len(peersToPrune) {
limitDiff = len(peersToPrune)
}

peersToPrune = peersToPrune[:limitDiff]

p.lock.Lock()
defer p.lock.Unlock()
// Delete peers from map.
for _, peerRes := range peersToPrune {
delete(p.status, peerRes.pid)
}
}

// BestFinalized returns the highest finalized epoch equal to or higher than ours that is agreed upon by the majority of peers.
// This method may not return the absolute highest finalized, but the finalized epoch in which most peers can serve blocks.
// Ideally, all peers would be reporting the same finalized epoch but some may be behind due to their own latency, or because of
Expand Down Expand Up @@ -492,6 +556,12 @@ func (p *Status) HighestEpoch() uint64 {
return helpers.SlotToEpoch(highestSlot)
}

func (p *Status) totalSize() int {
p.lock.RLock()
defer p.lock.RUnlock()
return len(p.status)
}

func retrieveIndicesFromBitfield(bitV bitfield.Bitvector64) []uint64 {
committeeIdxs := make([]uint64, 0, bitV.Count())
for i := uint64(0); i < 64; i++ {
Expand Down
77 changes: 60 additions & 17 deletions beacon-chain/p2p/peers/status_test.go
Expand Up @@ -18,14 +18,14 @@ import (

func TestStatus(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)
require.NotNil(t, p, "p not created")
assert.Equal(t, maxBadResponses, p.MaxBadResponses(), "maxBadResponses incorrect value")
}

func TestPeerExplicitAdd(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
require.NoError(t, err, "Failed to create ID")
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestPeerExplicitAdd(t *testing.T) {

func TestPeerNoENR(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
require.NoError(t, err, "Failed to create ID")
Expand All @@ -76,7 +76,7 @@ func TestPeerNoENR(t *testing.T) {

func TestPeerNoOverwriteENR(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
require.NoError(t, err, "Failed to create ID")
Expand All @@ -96,7 +96,7 @@ func TestPeerNoOverwriteENR(t *testing.T) {

func TestErrUnknownPeer(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
require.NoError(t, err)
Expand All @@ -122,7 +122,7 @@ func TestErrUnknownPeer(t *testing.T) {

func TestPeerCommitteeIndices(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
require.NoError(t, err, "Failed to create ID")
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestPeerCommitteeIndices(t *testing.T) {

func TestPeerSubscribedToSubnet(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

// Add some peers with different states
numPeers := 2
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestPeerSubscribedToSubnet(t *testing.T) {

func TestPeerImplicitAdd(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
require.NoError(t, err)
Expand All @@ -205,7 +205,7 @@ func TestPeerImplicitAdd(t *testing.T) {

func TestPeerChainState(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
require.NoError(t, err)
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestPeerChainState(t *testing.T) {

func TestPeerBadResponses(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

id, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
require.NoError(t, err)
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestPeerBadResponses(t *testing.T) {

func TestAddMetaData(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

// Add some peers with different states
numPeers := 5
Expand All @@ -297,7 +297,7 @@ func TestAddMetaData(t *testing.T) {

func TestPeerConnectionStatuses(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

// Add some peers with different states
numPeersDisconnected := 11
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestPeerConnectionStatuses(t *testing.T) {

func TestDecay(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

// Peer 1 has 0 bad responses.
pid1 := addPeer(t, p, peers.PeerConnected)
Expand All @@ -359,8 +359,51 @@ func TestDecay(t *testing.T) {
assert.Equal(t, 1, badResponses3, "Unexpected bad responses for peer 3")
}

func TestPrune(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses, 30)

for i := 0; i < p.MaxPeerLimit()+100; i++ {
if i%7 == 0 {
// Peer added as disconnected.
_ = addPeer(t, p, peers.PeerDisconnected)
}
// Peer added to peer handler.
_ = addPeer(t, p, peers.PeerConnected)
}

disPeers := p.Disconnected()
firstPID := disPeers[0]
secondPID := disPeers[1]
thirdPID := disPeers[2]

// Make first peer a bad peer
p.IncrementBadResponses(firstPID)
p.IncrementBadResponses(firstPID)

// Add bad response for p2.
p.IncrementBadResponses(secondPID)

// Prune peers
p.Prune()

// Bad peer is expected to still be kept in handler.
badRes, err := p.BadResponses(firstPID)
assert.NoError(t, err, "error is supposed to be nil")
assert.Equal(t, 2, badRes, "Did not get expected amount")

// Not so good peer is pruned away so that we can reduce the
// total size of the handler.
badRes, err = p.BadResponses(secondPID)
assert.NotNil(t, err, "error is supposed to be not nil")

// Last peer has been removed.
badRes, err = p.BadResponses(thirdPID)
assert.NotNil(t, err, "error is supposed to be not nil")
}

func TestTrimmedOrderedPeers(t *testing.T) {
p := peers.NewStatus(1)
p := peers.NewStatus(1, 30)

expectedTarget := uint64(2)
maxPeers := 3
Expand Down Expand Up @@ -418,7 +461,7 @@ func TestBestPeer(t *testing.T) {
expectedFinEpoch := uint64(4)
expectedRoot := [32]byte{'t', 'e', 's', 't'}
junkRoot := [32]byte{'j', 'u', 'n', 'k'}
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

// Peer 1
pid1 := addPeer(t, p, peers.PeerConnected)
Expand Down Expand Up @@ -463,7 +506,7 @@ func TestBestPeer(t *testing.T) {
func TestBestFinalized_returnsMaxValue(t *testing.T) {
maxBadResponses := 2
maxPeers := 10
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)

for i := 0; i <= maxPeers+100; i++ {
p.Add(new(enr.Record), peer.ID(i), nil, network.DirOutbound)
Expand All @@ -479,7 +522,7 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) {

func TestStatus_CurrentEpoch(t *testing.T) {
maxBadResponses := 2
p := peers.NewStatus(maxBadResponses)
p := peers.NewStatus(maxBadResponses, 30)
// Peer 1
pid1 := addPeer(t, p, peers.PeerConnected)
p.SetChainState(pid1, &pb.Status{
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/p2p/service.go
Expand Up @@ -149,7 +149,7 @@ func NewService(cfg *Config) (*Service, error) {
}
s.pubsub = gs

s.peers = peers.NewStatus(maxBadResponses)
s.peers = peers.NewStatus(maxBadResponses, int(s.cfg.MaxPeers))

return s, nil
}
Expand Down Expand Up @@ -210,6 +210,7 @@ func (s *Service) Start() {
ensurePeerConnections(s.ctx, s.host, peersToWatch...)
})
runutil.RunEvery(s.ctx, time.Hour, s.Peers().Decay)
runutil.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune)
runutil.RunEvery(s.ctx, params.BeaconNetworkConfig().RespTimeout, s.updateMetrics)
runutil.RunEvery(s.ctx, refreshRate, func() {
s.RefreshENR()
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/mock_peersprovider.go
Expand Up @@ -25,7 +25,7 @@ func (m *MockPeersProvider) Peers() *peers.Status {
m.lock.Lock()
defer m.lock.Unlock()
if m.peers == nil {
m.peers = peers.NewStatus(5 /* maxBadResponses */)
m.peers = peers.NewStatus(5, 30)
// Pretend we are connected to two peers
id0, err := peer.Decode("16Uiu2HAkyWZ4Ni1TpvDS8dPxsozmHY85KaiFjodQuV6Tz5tkHVeR")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/p2p.go
Expand Up @@ -58,7 +58,7 @@ func NewTestP2P(t *testing.T) *TestP2P {
BHost: h,
pubsub: ps,
joinedTopics: map[string]*pubsub.Topic{},
peers: peers.NewStatus(5 /* maxBadResponses */),
peers: peers.NewStatus(5, 30),
}
}

Expand Down

0 comments on commit f6756bb

Please sign in to comment.