Skip to content

Commit

Permalink
Add Initial Support For Gossip Scoring Service (#8275)
Browse files Browse the repository at this point in the history
* checkpoint progress

* gaz

* fix

* add it in

* Update beacon-chain/p2p/pubsub.go

* fmt

* reformat imports

Co-authored-by: Victor Farazdagi <simple.square@gmail.com>
  • Loading branch information
nisdas and farazdagi committed Jan 19, 2021
1 parent b33a8eb commit 20b836d
Show file tree
Hide file tree
Showing 22 changed files with 1,616 additions and 483 deletions.
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Expand Up @@ -50,6 +50,7 @@ go_library(
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",
"//shared:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/fileutil:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/peers/peerdata/BUILD.bazel
Expand Up @@ -8,6 +8,7 @@ go_library(
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/p2p/peers/peerdata/store.go
Expand Up @@ -12,6 +12,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pbrpc "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
)

var (
Expand Down Expand Up @@ -56,6 +57,10 @@ type PeerData struct {
BadResponses int
ProcessedBlocks uint64
BlockProviderUpdated time.Time
// Gossip Scoring data.
TopicScores map[string]*pbrpc.TopicScoreSnapshot
GossipScore float64
BehaviourPenalty float64
}

// NewStore creates new peer data store.
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/p2p/peers/scorers/BUILD.bazel
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"bad_responses.go",
"block_providers.go",
"gossip_scorer.go",
"peer_status.go",
"service.go",
],
Expand All @@ -16,6 +17,7 @@ go_library(
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/rand:go_default_library",
"//shared/timeutils:go_default_library",
Expand All @@ -28,6 +30,7 @@ go_test(
srcs = [
"bad_responses_test.go",
"block_providers_test.go",
"gossip_scorer_test.go",
"peer_status_test.go",
"scorers_test.go",
"service_test.go",
Expand All @@ -39,6 +42,7 @@ go_test(
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/rand:go_default_library",
"//shared/testutil/assert:go_default_library",
Expand Down
105 changes: 105 additions & 0 deletions beacon-chain/p2p/peers/scorers/gossip_scorer.go
@@ -0,0 +1,105 @@
package scorers

import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
pbrpc "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
)

var _ Scorer = (*GossipScorer)(nil)

// GossipScorer represents scorer that evaluates peers based on their gossip performance.
// Gossip scoring metrics are periodically calculated in libp2p's internal pubsub module.
type GossipScorer struct {
config *GossipScorerConfig
store *peerdata.Store
}

// GossipScorerConfig holds configuration parameters for gossip scoring service.
type GossipScorerConfig struct{}

// newGossipScorer creates new gossip scoring service.
func newGossipScorer(store *peerdata.Store, config *GossipScorerConfig) *GossipScorer {
if config == nil {
config = &GossipScorerConfig{}
}
return &GossipScorer{
config: config,
store: store,
}
}

// Score returns calculated peer score.
func (s *GossipScorer) Score(pid peer.ID) float64 {
s.store.RLock()
defer s.store.RUnlock()
return s.score(pid)
}

// score is a lock-free version of Score.
func (s *GossipScorer) score(pid peer.ID) float64 {
peerData, ok := s.store.PeerData(pid)
if !ok {
return 0
}
return peerData.GossipScore
}

// IsBadPeer states if the peer is to be considered bad.
func (s *GossipScorer) IsBadPeer(pid peer.ID) bool {
s.store.RLock()
defer s.store.RUnlock()
return s.isBadPeer(pid)
}

// isBadPeer is lock-free version of IsBadPeer.
func (s *GossipScorer) isBadPeer(pid peer.ID) bool {
peerData, ok := s.store.PeerData(pid)
if !ok {
return false
}
return peerData.GossipScore < 0
}

// BadPeers returns the peers that are considered bad.
func (s *GossipScorer) BadPeers() []peer.ID {
s.store.RLock()
defer s.store.RUnlock()

badPeers := make([]peer.ID, 0)
for pid := range s.store.Peers() {
if s.isBadPeer(pid) {
badPeers = append(badPeers, pid)
}
}
return badPeers
}

// SetGossipData sets the gossip related data of a peer.
func (s *GossipScorer) SetGossipData(pid peer.ID, gScore float64,
bPenalty float64, topicScores map[string]*pbrpc.TopicScoreSnapshot) {
s.store.Lock()
defer s.store.Unlock()

peerData := s.store.PeerDataGetOrCreate(pid)
peerData.GossipScore = gScore
peerData.BehaviourPenalty = bPenalty
peerData.TopicScores = topicScores
}

// GossipData gets the gossip related information of the given remote peer.
// This can return nil if there is no known gossip record the peer.
// This will error if the peer does not exist.
func (s *GossipScorer) GossipData(pid peer.ID) (float64, float64, map[string]*pbrpc.TopicScoreSnapshot, error) {
s.store.RLock()
defer s.store.RUnlock()
return s.gossipData(pid)
}

// gossipData lock-free version of GossipData.
func (s *GossipScorer) gossipData(pid peer.ID) (float64, float64, map[string]*pbrpc.TopicScoreSnapshot, error) {
if peerData, ok := s.store.PeerData(pid); ok {
return peerData.GossipScore, peerData.BehaviourPenalty, peerData.TopicScores, nil
}
return 0, 0, nil, peerdata.ErrPeerUnknown
}
67 changes: 67 additions & 0 deletions beacon-chain/p2p/peers/scorers/gossip_scorer_test.go
@@ -0,0 +1,67 @@
package scorers_test

import (
"context"
"testing"

"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
pbrpc "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)

func TestScorers_Gossip_Score(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tests := []struct {
name string
update func(scorer *scorers.GossipScorer)
check func(scorer *scorers.GossipScorer)
}{
{
name: "nonexistent peer",
update: func(scorer *scorers.GossipScorer) {
},
check: func(scorer *scorers.GossipScorer) {
assert.Equal(t, 0.0, scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "existent bad peer",
update: func(scorer *scorers.GossipScorer) {
scorer.SetGossipData("peer1", -10.0, 1, nil)
},
check: func(scorer *scorers.GossipScorer) {
assert.Equal(t, -10.0, scorer.Score("peer1"), "Unexpected score")
assert.Equal(t, true, scorer.IsBadPeer("peer1"), "Unexpected good peer")
},
},
{
name: "good peer",
update: func(scorer *scorers.GossipScorer) {
scorer.SetGossipData("peer1", 10.0, 0, map[string]*pbrpc.TopicScoreSnapshot{"a": {TimeInMesh: 100}})
},
check: func(scorer *scorers.GossipScorer) {
assert.Equal(t, 10.0, scorer.Score("peer1"), "Unexpected score")
assert.Equal(t, false, scorer.IsBadPeer("peer1"), "Unexpected bad peer")
_, _, topicMap, err := scorer.GossipData("peer1")
assert.NoError(t, err)
assert.Equal(t, uint64(100), topicMap["a"].TimeInMesh, "incorrect time in mesh")
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &scorers.Config{},
})
scorer := peerStatuses.Scorers().GossipScorer()
if tt.update != nil {
tt.update(scorer)
}
tt.check(scorer)
})
}
}
12 changes: 12 additions & 0 deletions beacon-chain/p2p/peers/scorers/service.go
Expand Up @@ -32,6 +32,7 @@ type Service struct {
badResponsesScorer *BadResponsesScorer
blockProviderScorer *BlockProviderScorer
peerStatusScorer *PeerStatusScorer
gossipScorer *GossipScorer
}
weights map[Scorer]float64
totalWeight float64
Expand All @@ -42,6 +43,7 @@ type Config struct {
BadResponsesScorerConfig *BadResponsesScorerConfig
BlockProviderScorerConfig *BlockProviderScorerConfig
PeerStatusScorerConfig *PeerStatusScorerConfig
GossipScorerConfig *GossipScorerConfig
}

// NewService provides fully initialized peer scoring service.
Expand All @@ -58,6 +60,8 @@ func NewService(ctx context.Context, store *peerdata.Store, config *Config) *Ser
s.setScorerWeight(s.scorers.blockProviderScorer, 1.0)
s.scorers.peerStatusScorer = newPeerStatusScorer(store, config.PeerStatusScorerConfig)
s.setScorerWeight(s.scorers.peerStatusScorer, 0.0)
s.scorers.gossipScorer = newGossipScorer(store, config.GossipScorerConfig)
s.setScorerWeight(s.scorers.gossipScorer, 0.0)

// Start background tasks.
go s.loop(ctx)
Expand All @@ -80,6 +84,11 @@ func (s *Service) PeerStatusScorer() *PeerStatusScorer {
return s.scorers.peerStatusScorer
}

// GossipScorer exposes the peer's gossip scoring service.
func (s *Service) GossipScorer() *GossipScorer {
return s.scorers.gossipScorer
}

// ActiveScorersCount returns number of scorers that can affect score (have non-zero weight).
func (s *Service) ActiveScorersCount() int {
cnt := 0
Expand All @@ -103,6 +112,7 @@ func (s *Service) Score(pid peer.ID) float64 {
score += s.scorers.badResponsesScorer.score(pid) * s.scorerWeight(s.scorers.badResponsesScorer)
score += s.scorers.blockProviderScorer.score(pid) * s.scorerWeight(s.scorers.blockProviderScorer)
score += s.scorers.peerStatusScorer.score(pid) * s.scorerWeight(s.scorers.peerStatusScorer)
score += s.scorers.gossipScorer.score(pid) * s.scorerWeight(s.scorers.gossipScorer)
return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor
}

Expand All @@ -121,6 +131,8 @@ func (s *Service) isBadPeer(pid peer.ID) bool {
if s.scorers.peerStatusScorer.isBadPeer(pid) {
return true
}
// TODO(#6043): Hook in gossip scorer's relevant
// method to check if peer has a bad gossip score.
return false
}

Expand Down
23 changes: 21 additions & 2 deletions beacon-chain/p2p/pubsub.go
Expand Up @@ -9,6 +9,7 @@ import (
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
pbrpc "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
Expand Down Expand Up @@ -87,9 +88,13 @@ func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub

// peerInspector will scrape all the relevant scoring data and add it to our
// peer handler.
// TODO(#6043): Add hooks to add in peer inspector to our global peer handler.
func (s *Service) peerInspector(peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) {
// no-op
// Iterate through all the connected peers and through any of their
// relevant topics.
for pid, snap := range peerMap {
s.peers.Scorers().GossipScorer().SetGossipData(pid, snap.Score,
snap.BehaviourPenalty, convertTopicScores(snap.Topics))
}
}

// Content addressable ID function.
Expand Down Expand Up @@ -133,3 +138,17 @@ func setPubSubParameters() {
pubsub.GossipSubHistoryLength = 5
}
}

// convert from libp2p's internal schema to a compatible prysm protobuf format.
func convertTopicScores(topicMap map[string]*pubsub.TopicScoreSnapshot) map[string]*pbrpc.TopicScoreSnapshot {
newMap := make(map[string]*pbrpc.TopicScoreSnapshot, len(topicMap))
for t, s := range topicMap {
newMap[t] = &pbrpc.TopicScoreSnapshot{
TimeInMesh: uint64(s.TimeInMesh.Milliseconds()),
FirstMessageDeliveries: float32(s.FirstMessageDeliveries),
MeshMessageDeliveries: float32(s.MeshMessageDeliveries),
InvalidMessageDeliveries: float32(s.InvalidMessageDeliveries),
}
}
return newMap
}
21 changes: 21 additions & 0 deletions beacon-chain/rpc/debug/p2p.go
Expand Up @@ -124,6 +124,19 @@ func (ds *Server) getPeer(pid peer.ID) (*pbrpc.DebugPeerResponse, error) {
if !lastUpdated.IsZero() {
unixTime = uint64(lastUpdated.Unix())
}
gScore, bPenalty, topicMaps, err := peers.Scorers().GossipScorer().GossipData(pid)
if err != nil {
return nil, status.Errorf(codes.NotFound, "Requested peer does not exist: %v", err)
}
scoreInfo := &pbrpc.ScoreInfo{
OverallScore: float32(peers.Scorers().Score(pid)),
ProcessedBlocks: peers.Scorers().BlockProviderScorer().ProcessedBlocks(pid),
BlockProviderScore: float32(peers.Scorers().BlockProviderScorer().Score(pid)),
TopicScores: topicMaps,
GossipScore: float32(gScore),
BehaviourPenalty: float32(bPenalty),
ValidationError: errorToString(peers.Scorers().ValidationError(pid)),
}
return &pbrpc.DebugPeerResponse{
ListeningAddresses: stringAddrs,
Direction: pbDirection,
Expand All @@ -133,5 +146,13 @@ func (ds *Server) getPeer(pid peer.ID) (*pbrpc.DebugPeerResponse, error) {
PeerInfo: peerInfo,
PeerStatus: pStatus,
LastUpdated: unixTime,
ScoreInfo: scoreInfo,
}, nil
}

func errorToString(err error) string {
if err == nil {
return ""
}
return err.Error()
}
5 changes: 1 addition & 4 deletions proto/beacon/db/finalized_block_root_container.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 20b836d

Please sign in to comment.