Skip to content

Commit

Permalink
Slasher committees cache (#4812)
Browse files Browse the repository at this point in the history
* add committees cache
* committees cache usage
* fix test
* fix log
* goimports
* Merge branch 'master' of github.com:prysmaticlabs/prysm into slasher_committees_cache

# Conflicts:
#	slasher/service/data_update.go
* fix imports
* fix comment
* fix comment
* Merge refs/heads/master into slasher_committees_cache
* Merge refs/heads/master into slasher_committees_cache
* Update slasher/cache/BUILD.bazel

Co-Authored-By: Ivan Martinez <ivanthegreatdev@gmail.com>
* Merge refs/heads/master into slasher_committees_cache
* Merge refs/heads/master into slasher_committees_cache
* Merge refs/heads/master into slasher_committees_cache
* added in the service context
* baz
* Merge refs/heads/master into slasher_committees_cache
* Merge refs/heads/master into slasher_committees_cache
  • Loading branch information
shayzluf committed Feb 10, 2020
1 parent 0ed8246 commit 02b6d77
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 31 deletions.
3 changes: 2 additions & 1 deletion beacon-chain/rpc/testing/beacon_chain_service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion shared/mock/beacon_chain_service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions slasher/cache/BUILD.bazel
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["committees_cache.go"],
importpath = "github.com/prysmaticlabs/prysm/slasher/cache",
visibility = ["//slasher:__subpackages__"],
deps = [
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["committees_cache_test.go"],
embed = [":go_default_library"],
deps = ["@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library"],
)
63 changes: 63 additions & 0 deletions slasher/cache/committees_cache.go
@@ -0,0 +1,63 @@
package cache

import (
"context"
"sync"

lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)

var (
// Metrics
committeesCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "committees_cache_hit",
Help: "The total number of cache hits on the committees cache.",
})
committeesCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "committees_cache_miss",
Help: "The total number of cache misses on the committees cache.",
})
)

// CommitteesCache is used to store the cached results of committees for epoch.
type CommitteesCache struct {
cache *lru.Cache
lock sync.RWMutex
}

// NewCommitteesCache initializes the map and underlying cache.
func NewCommitteesCache() *CommitteesCache {
cache, err := lru.New(50)
if err != nil {
panic(err)
}
return &CommitteesCache{
cache: cache,
}
}

// Get returns the cached response.
func (c *CommitteesCache) Get(ctx context.Context, epoch uint64) (*ethpb.BeaconCommittees, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
c.lock.RLock()

item, exists := c.cache.Get(epoch)

if exists && item != nil {
committeesCacheHit.Inc()
return item.(*ethpb.BeaconCommittees), nil
}
committeesCacheMiss.Inc()
return nil, nil
}

// Put the response in the cache.
func (c *CommitteesCache) Put(ctx context.Context, epoch uint64, committees *ethpb.BeaconCommittees) error {
c.cache.Add(epoch, committees)
return nil
}
47 changes: 47 additions & 0 deletions slasher/cache/committees_cache_test.go
@@ -0,0 +1,47 @@
package cache_test

import (
"context"
"reflect"
"testing"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/slasher/cache"
)

func TestCommitteesCache_RoundTrip(t *testing.T) {
ctx := context.Background()
c := cache.NewCommitteesCache()
numValidators := 64
wanted := make(map[uint64]*ethpb.BeaconCommittees_CommitteesList)
committeeItems := make([]*ethpb.BeaconCommittees_CommitteeItem, 1)
committeeItems[0] = &ethpb.BeaconCommittees_CommitteeItem{ValidatorIndices: []uint64{1, 2, 3}}
wanted[0] = &ethpb.BeaconCommittees_CommitteesList{Committees: committeeItems}
wantedRes := &ethpb.BeaconCommittees{
Epoch: 5,
Committees: wanted,
ActiveValidatorCount: uint64(numValidators),
}

committees, err := c.Get(ctx, 5)
if err != nil {
t.Error(err)
}

if committees != nil {
t.Errorf("Empty cache returned an object: %v", committees)
}

if err = c.Put(ctx, 5, wantedRes); err != nil {
t.Error(err)
}

res, err := c.Get(ctx, 5)
if err != nil {
t.Error(err)
}

if !reflect.DeepEqual(wantedRes, res) {
t.Error("Expected equal protos to return from cache")
}
}
2 changes: 2 additions & 0 deletions slasher/service/BUILD.bazel
Expand Up @@ -16,6 +16,7 @@ go_library(
"//shared/params:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/version:go_default_library",
"//slasher/cache:go_default_library",
"//slasher/db:go_default_library",
"//slasher/flags:go_default_library",
"//slasher/rpc:go_default_library",
Expand All @@ -29,6 +30,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli//:go_default_library",
"@io_opencensus_go//plugin/ocgrpc:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//credentials:go_default_library",
Expand Down
93 changes: 67 additions & 26 deletions slasher/service/data_update.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/prysmaticlabs/prysm/slasher/db"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -21,26 +22,28 @@ import (
// on all historical attestations made until the current head.
// The latest epoch is updated after each iteration in case the long
// process is interrupted.
func (s *Service) historicalAttestationFeeder() error {
func (s *Service) historicalAttestationFeeder(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Slasher.Service.historicalAttestationFeeder")
defer span.End()
startFromEpoch, err := s.getLatestDetectedEpoch()
if err != nil {
return errors.Wrap(err, "failed to latest detected epoch")
}
ch, err := s.getChainHead()
ch, err := s.getChainHead(ctx)
if err != nil {
return errors.Wrap(err, "failed to get chain head")
}

for epoch := startFromEpoch; epoch < ch.FinalizedEpoch; epoch++ {
atts, bCommittees, err := s.attsAndCommitteesForEpoch(epoch)
atts, bCommittees, err := s.attsAndCommitteesForEpoch(ctx, epoch)
if err != nil || bCommittees == nil {
log.Error(err)
continue
}
log.Infof("Checking %v attestations from epoch %v for slashable events", len(atts), epoch)
for _, attestation := range atts {
idxAtt, err := convertToIndexed(s.context, attestation, bCommittees)
if err = s.detectSlashings(idxAtt); err != nil {
idxAtt, err := convertToIndexed(ctx, attestation, bCommittees)
if err = s.detectSlashings(ctx, idxAtt); err != nil {
log.Error(err)
continue
}
Expand All @@ -54,8 +57,10 @@ func (s *Service) historicalAttestationFeeder() error {
}

// attestationFeeder feeds attestations that were received by archive endpoint.
func (s *Service) attestationFeeder() error {
as, err := s.beaconClient.StreamAttestations(s.context, &ptypes.Empty{})
func (s *Service) attestationFeeder(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Slasher.Service.attestationFeeder")
defer span.End()
as, err := s.beaconClient.StreamAttestations(ctx, &ptypes.Empty{})
if err != nil {
return errors.Wrap(err, "failed to retrieve attestation stream")
}
Expand All @@ -69,37 +74,36 @@ func (s *Service) attestationFeeder() error {
if err != nil {
return err
}
committeeReq := &ethpb.ListCommitteesRequest{
QueryFilter: &ethpb.ListCommitteesRequest_Epoch{
Epoch: att.Data.Target.Epoch,
},
}
bCommittees, err := s.beaconClient.ListBeaconCommittees(s.context, committeeReq)
bCommittees, err := s.getCommittees(ctx, att)
if err != nil {
return errors.Wrapf(err, "could not list beacon committees for epoch %d", att.Data.Target.Epoch)
err = errors.Wrapf(err, "could not list beacon committees for epoch %d", att.Data.Target.Epoch)
log.WithError(err)
continue
}
idxAtt, err := convertToIndexed(s.context, att, bCommittees)
if err = s.detectSlashings(idxAtt); err != nil {
idxAtt, err := convertToIndexed(ctx, att, bCommittees)
if err = s.detectSlashings(ctx, idxAtt); err != nil {
log.Error(err)
continue
}
log.Info("detected attestation for target: %d", att.Data.Target)
log.Infof("detected attestation for target: %d", att.Data.Target.Epoch)
case <-s.context.Done():
return status.Error(codes.Canceled, "Stream context canceled")
}
}
}

// finalizedChangeUpdater this is a stub for the coming PRs #3133.
func (s *Service) finalizedChangeUpdater() error {
func (s *Service) finalizedChangeUpdater(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Slasher.Service.finalizedChangeUpdater")
defer span.End()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
d := time.Duration(secondsPerSlot) * time.Second
tick := time.Tick(d)
var finalizedEpoch uint64
for {
select {
case <-tick:
ch, err := s.beaconClient.GetChainHead(s.context, &ptypes.Empty{})
ch, err := s.beaconClient.GetChainHead(ctx, &ptypes.Empty{})
if err != nil {
log.Error(err)
continue
Expand All @@ -119,8 +123,10 @@ func (s *Service) finalizedChangeUpdater() error {
}
}

func (s *Service) detectSlashings(idxAtt *ethpb.IndexedAttestation) error {
attSlashingResp, err := s.slasher.IsSlashableAttestation(s.context, idxAtt)
func (s *Service) detectSlashings(ctx context.Context, idxAtt *ethpb.IndexedAttestation) error {
ctx, span := trace.StartSpan(ctx, "Slasher.Service.detectSlashings")
defer span.End()
attSlashingResp, err := s.slasher.IsSlashableAttestation(ctx, idxAtt)
if err != nil {
return errors.Wrap(err, "failed to check attestation")
}
Expand All @@ -143,7 +149,36 @@ func (s *Service) detectSlashings(idxAtt *ethpb.IndexedAttestation) error {
return nil
}

func (s *Service) getCommittees(ctx context.Context, at *ethpb.Attestation) (*ethpb.BeaconCommittees, error) {
ctx, span := trace.StartSpan(ctx, "Slasher.Service.getCommittees")
defer span.End()
epoch := at.Data.Target.Epoch
committees, err := committeesCache.Get(ctx, epoch)
if err != nil {
return nil, err
}
if committees != nil {
return committees, nil
}
committeeReq := &ethpb.ListCommitteesRequest{
QueryFilter: &ethpb.ListCommitteesRequest_Epoch{
Epoch: epoch,
},
}
bCommittees, err := s.beaconClient.ListBeaconCommittees(ctx, committeeReq)
if err != nil {
log.WithError(err).Errorf("Could not list beacon committees for epoch %d", at.Data.Target.Epoch)
return nil, err
}
if err := committeesCache.Put(ctx, epoch, bCommittees); err != nil {
return nil, err
}
return bCommittees, nil
}

func convertToIndexed(ctx context.Context, att *ethpb.Attestation, bCommittee *ethpb.BeaconCommittees) (*ethpb.IndexedAttestation, error) {
ctx, span := trace.StartSpan(ctx, "Slasher.Service.convertToIndexed")
defer span.End()
slotCommittees, ok := bCommittee.Committees[att.Data.Slot]
if !ok || slotCommittees == nil {
return nil, fmt.Errorf(
Expand All @@ -168,14 +203,18 @@ func convertToIndexed(ctx context.Context, att *ethpb.Attestation, bCommittee *e
return idxAtt, nil
}

func (s *Service) attsAndCommitteesForEpoch(epoch uint64) ([]*ethpb.Attestation, *ethpb.BeaconCommittees, error) {
attResp, err := s.beaconClient.ListAttestations(s.context, &ethpb.ListAttestationsRequest{
func (s *Service) attsAndCommitteesForEpoch(
ctx context.Context, epoch uint64,
) ([]*ethpb.Attestation, *ethpb.BeaconCommittees, error) {
ctx, span := trace.StartSpan(ctx, "Slasher.Service.attsAndCommitteesForEpoch")
defer span.End()
attResp, err := s.beaconClient.ListAttestations(ctx, &ethpb.ListAttestationsRequest{
QueryFilter: &ethpb.ListAttestationsRequest_TargetEpoch{TargetEpoch: epoch},
})
if err != nil {
log.WithError(err).Errorf("Could not list attestations for epoch: %d", epoch)
}
bCommittees, err := s.beaconClient.ListBeaconCommittees(s.context, &ethpb.ListCommitteesRequest{
bCommittees, err := s.beaconClient.ListBeaconCommittees(ctx, &ethpb.ListCommitteesRequest{
QueryFilter: &ethpb.ListCommitteesRequest_Epoch{
Epoch: epoch,
},
Expand All @@ -194,11 +233,13 @@ func (s *Service) getLatestDetectedEpoch() (uint64, error) {
return e, nil
}

func (s *Service) getChainHead() (*ethpb.ChainHead, error) {
func (s *Service) getChainHead(ctx context.Context) (*ethpb.ChainHead, error) {
ctx, span := trace.StartSpan(ctx, "Slasher.Service.getChainHead")
defer span.End()
if s.beaconClient == nil {
return nil, errors.New("cannot feed old attestations to slasher, beacon client has not been started")
}
ch, err := s.beaconClient.GetChainHead(s.context, &ptypes.Empty{})
ch, err := s.beaconClient.GetChainHead(ctx, &ptypes.Empty{})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 02b6d77

Please sign in to comment.