Skip to content

Commit

Permalink
fix(dot/network): move low reputation peer removal from network ConnM…
Browse files Browse the repository at this point in the history
…anager to peer scoring logic (dot/peerstate) (ChainSafe#2068)

- peerset service (doWorks goroutine) sends messages to network service
about peers to connect, drop, reject, disconnect etc. Some of those
message were being send without a peer id and set id. This was the
reason why our excess peers were not getting removed.
- To save ourselves from such a problem in future, processMessage now
checks if peer id is empty or not.
- Also change the resultMsgCh type to peerset.Message from interface{}

Fixes ChainSafe#2039
  • Loading branch information
kishansagathiya committed Nov 30, 2021
1 parent a7594a3 commit ac16285
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 58 deletions.
34 changes: 1 addition & 33 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package network

import (
"context"
"crypto/rand"
"math/big"
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
Expand Down Expand Up @@ -130,40 +128,10 @@ func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID {
func (cm *ConnManager) Connected(n network.Network, c network.Conn) {
logger.Tracef(
"Host %s connected to peer %s", n.LocalPeer(), c.RemotePeer())

if cm.connectHandler != nil {
cm.connectHandler(c.RemotePeer())
}

cm.Lock()
defer cm.Unlock()

over := len(n.Peers()) - cm.max
if over <= 0 {
return
}

// TODO: peer scoring doesn't seem to prevent us from going over the max.
// if over the max peer count, disconnect from (total_peers - maximum) peers
// (#2039)
for i := 0; i < over; i++ {
unprotPeers := cm.unprotectedPeers(n.Peers())
if len(unprotPeers) == 0 {
return
}

i, err := rand.Int(rand.Reader, big.NewInt(int64(len(unprotPeers))))
if err != nil {
logger.Errorf("error generating random number: %s", err)
return
}

up := unprotPeers[i.Int64()]
logger.Tracef("Over max peer count, disconnecting from random unprotected peer %s", up)
err = n.ClosePeer(up)
if err != nil {
logger.Tracef("failed to close connection to peer %s", up)
}
}
}

// Disconnected is called when a connection closed
Expand Down
12 changes: 5 additions & 7 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package network
import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"sync"
Expand Down Expand Up @@ -670,6 +669,10 @@ func (s *Service) startPeerSetHandler() {

func (s *Service) processMessage(msg peerset.Message) {
peerID := msg.PeerID
if peerID == "" {
logger.Errorf("found empty peer id in peerset message")
return
}
switch msg.Status {
case peerset.Connect:
addrInfo := s.host.h.Peerstore().PeerInfo(peerID)
Expand Down Expand Up @@ -704,12 +707,7 @@ func (s *Service) startProcessingMsg() {
select {
case <-s.ctx.Done():
return
case m := <-msgCh:
msg, ok := m.(peerset.Message)
if !ok {
logger.Error(fmt.Sprintf("failed to get message from peerSet: type is %T instead of peerset.Message", m))
continue
}
case msg := <-msgCh:
s.processMessage(msg)
}
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ type PeerRemove interface {
type Peer interface {
PeerReputation(peer.ID) (peerset.Reputation, error)
SortedPeers(idx int) chan peer.IDSlice
Messages() chan interface{}
Messages() chan peerset.Message
}
6 changes: 4 additions & 2 deletions dot/peerset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package peerset

import "github.com/libp2p/go-libp2p-core/peer"
import (
"github.com/libp2p/go-libp2p-core/peer"
)

// Handler manages peerSet.
type Handler struct {
Expand Down Expand Up @@ -88,7 +90,7 @@ func (h *Handler) Incoming(setID int, peers ...peer.ID) {
}

// Messages return result message chan.
func (h *Handler) Messages() chan interface{} {
func (h *Handler) Messages() chan Message {
return h.peerSet.resultMsgCh
}

Expand Down
45 changes: 33 additions & 12 deletions dot/peerset/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type PeerSet struct {
// TODO: this will be useful for reserved only mode
// this is for future purpose if reserved-only flag is enabled (#1888).
isReservedOnly bool
resultMsgCh chan interface{}
resultMsgCh chan Message
// time when the PeerSet was created.
created time.Time
// last time when we updated the reputations of connected nodes.
Expand Down Expand Up @@ -183,6 +183,8 @@ func NewConfigSet(in, out uint32, reservedOnly bool, allocTime time.Duration) *C
}

return &ConfigSet{
// Why are we using an array of config in the set, when we are
// using just one config
Set: []*config{set},
}
}
Expand Down Expand Up @@ -228,8 +230,8 @@ func reputationTick(reput Reputation) Reputation {
return reput.sub(diff)
}

// updateTime updates the value of latestTimeUpdate and performs all the updates that happen
// over time, such as Reputation increases for staying connected.
// updateTime updates the value of latestTimeUpdate and performs all the updates that
// happen over time, such as Reputation increases for staying connected.
func (ps *PeerSet) updateTime() error {
currTime := time.Now()
// identify the time difference between current time and last update time for peer reputation in seconds.
Expand Down Expand Up @@ -282,8 +284,8 @@ func (ps *PeerSet) updateTime() error {
}

// reportPeer on report ReputationChange of the peer based on its behaviour,
// if the updated Reputation is below BannedThresholdValue then, this node need to be disconnected
// and a drop message for the peer is sent in order to disconnect.
// if the updated Reputation is below BannedThresholdValue then, this node need to
// be disconnected and a drop message for the peer is sent in order to disconnect.
func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error {
// we want reputations to be up-to-date before adjusting them.
if err := ps.updateTime(); err != nil {
Expand Down Expand Up @@ -516,8 +518,9 @@ func (ps *PeerSet) removePeer(setID int, peers ...peer.ID) error {
return nil
}

// incoming indicates that we have received an incoming connection. Must be answered either with
// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
// incoming indicates that we have received an incoming connection. Must be answered
// either with a corresponding `Accept` or `Reject`, except if we were already
// connected to this peer.
func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
if err := ps.updateTime(); err != nil {
return err
Expand All @@ -527,7 +530,11 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
for _, pid := range peers {
if ps.isReservedOnly {
if _, ok := ps.reservedNode[pid]; !ok {
ps.resultMsgCh <- Message{Status: Reject}
ps.resultMsgCh <- Message{
Status: Reject,
setID: uint64(setID),
PeerID: pid,
}
continue
}
}
Expand All @@ -546,11 +553,24 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
p := state.nodes[pid]
switch {
case p.getReputation() < BannedThresholdValue:
ps.resultMsgCh <- Message{Status: Reject}
ps.resultMsgCh <- Message{
Status: Reject,
setID: uint64(setID),
PeerID: pid,
}
case state.tryAcceptIncoming(setID, pid) != nil:
ps.resultMsgCh <- Message{Status: Reject}
ps.resultMsgCh <- Message{
Status: Reject,
setID: uint64(setID),
PeerID: pid,
}
default:
ps.resultMsgCh <- Message{Status: Accept}
logger.Debugf("incoming connection accepted from peer %s", pid)
ps.resultMsgCh <- Message{
Status: Accept,
setID: uint64(setID),
PeerID: pid,
}
}
}

Expand Down Expand Up @@ -593,6 +613,7 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e
}
ps.resultMsgCh <- Message{
Status: Drop,
setID: uint64(setIdx),
PeerID: pid,
}

Expand All @@ -610,7 +631,7 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e
// start handles all the action for the peerSet.
func (ps *PeerSet) start(aq chan action) {
ps.actionQueue = aq
ps.resultMsgCh = make(chan interface{}, msgChanSize)
ps.resultMsgCh = make(chan Message, msgChanSize)
go ps.doWork()
}

Expand Down
4 changes: 1 addition & 3 deletions dot/peerset/peerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ func TestAddReservedPeers(t *testing.T) {
if len(ps.resultMsgCh) == 0 {
break
}
m := <-ps.resultMsgCh
msg, ok := m.(Message)
require.True(t, ok)
msg := <-ps.resultMsgCh
require.Equal(t, expectedMsgs[i], msg)
}
}
Expand Down

0 comments on commit ac16285

Please sign in to comment.