Skip to content

Commit

Permalink
Refactor scoring service (#7841)
Browse files Browse the repository at this point in the history
* refactor scoring service

* fix anti-pattern issue

* add block providers bad peers detection tests

* check status when peer scoring is disabled

* more tests
  • Loading branch information
farazdagi committed Nov 17, 2020
1 parent ad5151f commit 2034c66
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 64 deletions.
22 changes: 7 additions & 15 deletions beacon-chain/p2p/peers/scorers/bad_responses.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
package scorers

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
)

var _ Scorer = (*BadResponsesScorer)(nil)

const (
// DefaultBadResponsesThreshold defines how many bad responses to tolerate before peer is deemed bad.
DefaultBadResponsesThreshold = 6
// DefaultBadResponsesWeight is a default weight. Since score represents penalty, it has negative weight.
DefaultBadResponsesWeight = -1.0
// DefaultBadResponsesDecayInterval defines how often to decay previous statistics.
// Every interval bad responses counter will be decremented by 1.
DefaultBadResponsesDecayInterval = time.Hour
)

// BadResponsesScorer represents bad responses scoring service.
type BadResponsesScorer struct {
ctx context.Context
config *BadResponsesScorerConfig
store *peerdata.Store
}
Expand All @@ -29,29 +27,22 @@ type BadResponsesScorer struct {
type BadResponsesScorerConfig struct {
// Threshold specifies number of bad responses tolerated, before peer is banned.
Threshold int
// Weight defines weight of bad response/threshold ratio on overall score.
Weight float64
// DecayInterval specifies how often bad response stats should be decayed.
DecayInterval time.Duration
}

// newBadResponsesScorer creates new bad responses scoring service.
func newBadResponsesScorer(
ctx context.Context, store *peerdata.Store, config *BadResponsesScorerConfig) *BadResponsesScorer {
func newBadResponsesScorer(store *peerdata.Store, config *BadResponsesScorerConfig) *BadResponsesScorer {
if config == nil {
config = &BadResponsesScorerConfig{}
}
scorer := &BadResponsesScorer{
ctx: ctx,
config: config,
store: store,
}
if scorer.config.Threshold == 0 {
scorer.config.Threshold = DefaultBadResponsesThreshold
}
if scorer.config.Weight == 0.0 {
scorer.config.Weight = DefaultBadResponsesWeight
}
if scorer.config.DecayInterval == 0 {
scorer.config.DecayInterval = DefaultBadResponsesDecayInterval
}
Expand All @@ -65,7 +56,7 @@ func (s *BadResponsesScorer) Score(pid peer.ID) float64 {
return s.score(pid)
}

// score is a lock-free version of ScoreBadResponses.
// score is a lock-free version of Score.
func (s *BadResponsesScorer) score(pid peer.ID) float64 {
score := float64(0)
peerData, ok := s.store.PeerData(pid)
Expand All @@ -74,7 +65,8 @@ func (s *BadResponsesScorer) score(pid peer.ID) float64 {
}
if peerData.BadResponses > 0 {
score = float64(peerData.BadResponses) / float64(s.config.Threshold)
score = score * s.config.Weight
// Since score represents a penalty, negate it.
score *= -1
}
return score
}
Expand Down Expand Up @@ -131,7 +123,7 @@ func (s *BadResponsesScorer) isBadPeer(pid peer.ID) bool {
return false
}

// BadPeers returns the peers that are bad.
// BadPeers returns the peers that are considered bad.
func (s *BadResponsesScorer) BadPeers() []peer.ID {
s.store.RLock()
defer s.store.RUnlock()
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/p2p/peers/scorers/bad_responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func TestScorers_BadResponses_Decay(t *testing.T) {
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
Weight: 1,
},
},
})
Expand Down
22 changes: 17 additions & 5 deletions beacon-chain/p2p/peers/scorers/block_providers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package scorers

import (
"context"
"fmt"
"math"
"sort"
Expand All @@ -15,6 +14,8 @@ import (
"github.com/prysmaticlabs/prysm/shared/timeutils"
)

var _ Scorer = (*BlockProviderScorer)(nil)

const (
// DefaultBlockProviderProcessedBatchWeight is a default reward weight of a processed batch of blocks.
DefaultBlockProviderProcessedBatchWeight = float64(0.1)
Expand All @@ -35,7 +36,6 @@ const (

// BlockProviderScorer represents block provider scoring service.
type BlockProviderScorer struct {
ctx context.Context
config *BlockProviderScorerConfig
store *peerdata.Store
// maxScore is a cached value for maximum attainable block provider score.
Expand All @@ -62,13 +62,11 @@ type BlockProviderScorerConfig struct {
}

// newBlockProviderScorer creates block provider scoring service.
func newBlockProviderScorer(
ctx context.Context, store *peerdata.Store, config *BlockProviderScorerConfig) *BlockProviderScorer {
func newBlockProviderScorer(store *peerdata.Store, config *BlockProviderScorerConfig) *BlockProviderScorer {
if config == nil {
config = &BlockProviderScorerConfig{}
}
scorer := &BlockProviderScorer{
ctx: ctx,
config: config,
store: store,
}
Expand Down Expand Up @@ -176,6 +174,20 @@ func (s *BlockProviderScorer) processedBlocks(pid peer.ID) uint64 {
return 0
}

// IsBadPeer states if the peer is to be considered bad.
// Block provider scorer cannot guarantee that lower score of a peer is indeed a sign of a bad peer.
// Therefore this scorer never marks peers as bad, and relies on scores to probabilistically sort
// out low-scorers (see WeightSorted method).
func (s *BlockProviderScorer) IsBadPeer(_ peer.ID) bool {
return false
}

// BadPeers returns the peers that are considered bad.
// No peers are considered bad by block providers scorer.
func (s *BlockProviderScorer) BadPeers() []peer.ID {
return []peer.ID{}
}

// Decay updates block provider counters by decaying them.
// This urges peers to keep up the performance to continue getting a high score (and allows
// new peers to contest previously high scoring ones).
Expand Down
47 changes: 38 additions & 9 deletions beacon-chain/p2p/peers/scorers/block_providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/timeutils"
Expand Down Expand Up @@ -438,22 +439,50 @@ func TestScorers_BlockProvider_FormatScorePretty(t *testing.T) {
},
}

peerStatusGen := func() *peers.Status {
return peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &scorers.Config{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
ProcessedBlocksCap: 20 * batchSize,
Decay: 10 * batchSize,
},
},
})
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &scorers.Config{
BlockProviderScorerConfig: &scorers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
ProcessedBlocksCap: 20 * batchSize,
Decay: 10 * batchSize,
},
},
})
peerStatuses := peerStatusGen()
scorer := peerStatuses.Scorers().BlockProviderScorer()
if tt.update != nil {
tt.update(scorer)
}
tt.check(scorer)
})
}

t.Run("peer scorer disabled", func(t *testing.T) {
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
EnablePeerScorer: false,
})
defer resetCfg()
peerStatuses := peerStatusGen()
scorer := peerStatuses.Scorers().BlockProviderScorer()
assert.Equal(t, "disabled", scorer.FormatScorePretty("peer1"))
})
}

func TestScorers_BlockProvider_BadPeerMarking(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &scorers.Config{},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()

assert.Equal(t, false, scorer.IsBadPeer("peer1"), "Unexpected status for unregistered peer")
scorer.IncrementProcessedBlocks("peer1", 64)
assert.Equal(t, false, scorer.IsBadPeer("peer1"))
assert.Equal(t, 0, len(scorer.BadPeers()))
}
80 changes: 72 additions & 8 deletions beacon-chain/p2p/peers/scorers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,28 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
)

var _ Scorer = (*Service)(nil)

// ScoreRoundingFactor defines how many digits to keep in decimal part.
// This parameter is used in math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor.
const ScoreRoundingFactor = 10000

// Scorer defines minimum set of methods every peer scorer must expose.
type Scorer interface {
Score(pid peer.ID) float64
IsBadPeer(pid peer.ID) bool
BadPeers() []peer.ID
}

// Service manages peer scorers that are used to calculate overall peer score.
type Service struct {
ctx context.Context
store *peerdata.Store
scorers struct {
badResponsesScorer *BadResponsesScorer
blockProviderScorer *BlockProviderScorer
}
weights map[Scorer]float64
totalWeight float64
}

// Config holds configuration parameters for scoring service.
Expand All @@ -32,12 +42,18 @@ type Config struct {
// NewService provides fully initialized peer scoring service.
func NewService(ctx context.Context, store *peerdata.Store, config *Config) *Service {
s := &Service{
ctx: ctx,
store: store,
store: store,
weights: make(map[Scorer]float64),
}
s.scorers.badResponsesScorer = newBadResponsesScorer(ctx, store, config.BadResponsesScorerConfig)
s.scorers.blockProviderScorer = newBlockProviderScorer(ctx, store, config.BlockProviderScorerConfig)
go s.loop(s.ctx)

// Register scorers.
s.scorers.badResponsesScorer = newBadResponsesScorer(store, config.BadResponsesScorerConfig)
s.setScorerWeight(s.scorers.badResponsesScorer, 1.0)
s.scorers.blockProviderScorer = newBlockProviderScorer(store, config.BlockProviderScorerConfig)
s.setScorerWeight(s.scorers.blockProviderScorer, 1.0)

// Start background tasks.
go s.loop(ctx)

return s
}
Expand All @@ -52,6 +68,17 @@ func (s *Service) BlockProviderScorer() *BlockProviderScorer {
return s.scorers.blockProviderScorer
}

// ActiveScorersCount returns number of scorers that can affect score (have non-zero weight).
func (s *Service) ActiveScorersCount() int {
cnt := 0
for _, w := range s.weights {
if w > 0 {
cnt++
}
}
return cnt
}

// Score returns calculated peer score across all tracked metrics.
func (s *Service) Score(pid peer.ID) float64 {
s.store.RLock()
Expand All @@ -61,11 +88,37 @@ func (s *Service) Score(pid peer.ID) float64 {
if _, ok := s.store.PeerData(pid); !ok {
return 0
}
score += s.scorers.badResponsesScorer.score(pid)
score += s.scorers.blockProviderScorer.score(pid)
score += s.scorers.badResponsesScorer.score(pid) * s.scorerWeight(s.scorers.badResponsesScorer)
score += s.scorers.blockProviderScorer.score(pid) * s.scorerWeight(s.scorers.blockProviderScorer)
return math.Round(score*ScoreRoundingFactor) / ScoreRoundingFactor
}

// IsBadPeer traverses all the scorers to see if any of them classifies peer as bad.
func (s *Service) IsBadPeer(pid peer.ID) bool {
s.store.RLock()
defer s.store.RUnlock()
return s.isBadPeer(pid)
}

// isBadPeer is a lock-free version of isBadPeer.
func (s *Service) isBadPeer(pid peer.ID) bool {
return s.scorers.badResponsesScorer.isBadPeer(pid)
}

// BadPeers returns the peers that are considered bad by any of registered scorers.
func (s *Service) BadPeers() []peer.ID {
s.store.RLock()
defer s.store.RUnlock()

badPeers := make([]peer.ID, 0)
for pid := range s.store.Peers() {
if s.isBadPeer(pid) {
badPeers = append(badPeers, pid)
}
}
return badPeers
}

// loop handles background tasks.
func (s *Service) loop(ctx context.Context) {
decayBadResponsesStats := time.NewTicker(s.scorers.badResponsesScorer.Params().DecayInterval)
Expand All @@ -84,3 +137,14 @@ func (s *Service) loop(ctx context.Context) {
}
}
}

// setScorerWeight adds scorer to map of known scorers.
func (s *Service) setScorerWeight(scorer Scorer, weight float64) {
s.weights[scorer] = weight
s.totalWeight += s.weights[scorer]
}

// scorerWeight calculates contribution percentage of a given scorer in total score.
func (s *Service) scorerWeight(scorer Scorer) float64 {
return s.weights[scorer] / s.totalWeight
}

0 comments on commit 2034c66

Please sign in to comment.