Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a client side rate limit to reduce chance of getting banned #4637

Merged
merged 3 commits into from Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Expand Up @@ -25,6 +25,7 @@ go_library(
"//shared/mathutil:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_paulbellamy_ratecounter//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
Expand All @@ -51,6 +52,7 @@ go_test(
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/sliceutil:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/sync/initial-sync/round_robin.go
Expand Up @@ -274,6 +274,11 @@ func (s *Service) roundRobinSync(genesis time.Time) error {

// requestBlocks by range to a specific peer.
func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRangeRequest, pid peer.ID) ([]*eth.SignedBeaconBlock, error) {
if s.blocksRateLimiter.Remaining(pid.String()) < int64(req.Count) {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
time.Sleep(s.blocksRateLimiter.TillEmpty(pid.String()))
}
s.blocksRateLimiter.Add(pid.String(), int64(req.Count))
log.WithFields(logrus.Fields{
"peer": pid,
"start": req.StartSlot,
Expand Down
12 changes: 7 additions & 5 deletions beacon-chain/sync/initial-sync/round_robin_test.go
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/network"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
Expand Down Expand Up @@ -255,11 +256,12 @@ func TestRoundRobinSync(t *testing.T) {
DB: beaconDB,
} // no-op mock
s := &Service{
chain: mc,
p2p: p,
db: beaconDB,
synced: false,
chainStarted: true,
chain: mc,
p2p: p,
db: beaconDB,
synced: false,
chainStarted: true,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */),
}
if err := s.roundRobinSync(makeGenesisTime(tt.currentSlot)); err != nil {
t.Error(err)
Expand Down
29 changes: 17 additions & 12 deletions beacon-chain/sync/initial-sync/service.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/kevinms/leakybucket-go"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
Expand All @@ -27,6 +28,8 @@ type blockchainService interface {

const (
handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes.

allowedBlocksPerSecond = 32.0
)

// Config to set up the initial sync service.
Expand All @@ -39,24 +42,26 @@ type Config struct {

// Service service.
type Service struct {
ctx context.Context
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
synced bool
chainStarted bool
stateNotifier statefeed.Notifier
ctx context.Context
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
synced bool
chainStarted bool
stateNotifier statefeed.Notifier
blocksRateLimiter *leakybucket.Collector
}

// NewInitialSync configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewInitialSync(cfg *Config) *Service {
return &Service{
ctx: context.Background(),
chain: cfg.Chain,
p2p: cfg.P2P,
db: cfg.DB,
stateNotifier: cfg.StateNotifier,
ctx: context.Background(),
chain: cfg.Chain,
p2p: cfg.P2P,
db: cfg.DB,
stateNotifier: cfg.StateNotifier,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */),
}
}

Expand Down