From eddaea869bfbc2ba15f09e1b07b20e58586b8f07 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Sun, 8 Mar 2020 12:56:43 -0500 Subject: [PATCH] Prepare Slasher for Production (#5020) * rem slasher proto * Merge branch 'master' of github.com:prysmaticlabs/prysm * Merge branch 'master' of github.com:prysmaticlabs/prysm * Merge branch 'master' of github.com:prysmaticlabs/prysm * Merge branch 'master' of github.com:prysmaticlabs/prysm * Merge branch 'master' of github.com:prysmaticlabs/prysm * add a bit more better logging * Empty db fix * Improve logs * Fix small issues in spanner, improvements * Change costs back to 1 for now * Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher * Change the cache back to 0 * Cleanup * Merge branch 'master' into cleanup-slasher * lint * added in better spans * log * rem spanner in super intensive operation * Merge branch 'master' into cleanup-slasher * add todo * Merge branch 'cleanup-slasher' of github.com:prysmaticlabs/prysm into cleanup-slasher * Merge branch 'master' into cleanup-slasher * Apply suggestions from code review * no logrus * Merge branch 'master' into cleanup-slasher * Merge branch 'cleanup-slasher' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher * Remove spammy logs * Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher * gaz * Rename func * Add back needed code * Add todo * Add span to cache func --- slasher/db/iface/interface.go | 2 +- slasher/db/kv/attester_slashings.go | 14 +-- slasher/db/kv/block_header.go | 12 +- slasher/db/kv/chain_data.go | 4 +- slasher/db/kv/indexed_attestations.go | 18 +-- slasher/db/kv/proposer_slashings.go | 14 +-- slasher/db/kv/spanner.go | 114 ++++++++++++------ slasher/db/kv/spanner_test.go | 4 +- slasher/detection/attestations/BUILD.bazel | 6 +- slasher/detection/attestations/spanner.go | 44 ++++--- .../detection/attestations/spanner_test.go | 89 ++++---------- slasher/detection/detect.go | 10 +- slasher/detection/service.go | 6 +- 13 files changed, 176 insertions(+), 161 deletions(-) diff --git a/slasher/db/iface/interface.go b/slasher/db/iface/interface.go index ece300a45ec..3e6d6ad34cd 100644 --- a/slasher/db/iface/interface.go +++ b/slasher/db/iface/interface.go @@ -62,7 +62,7 @@ type WriteAccessDatabase interface { // MinMaxSpan related methods. SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap map[uint64]detectionTypes.Span) error - SaveValidatorEpochSpans(ctx context.Context, validatorIdx uint64, epoch uint64, spans detectionTypes.Span) error + SaveValidatorEpochSpan(ctx context.Context, validatorIdx uint64, epoch uint64, spans detectionTypes.Span) error SaveCachedSpansMaps(ctx context.Context) error DeleteEpochSpans(ctx context.Context, validatorIdx uint64) error DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx uint64, epoch uint64) error diff --git a/slasher/db/kv/attester_slashings.go b/slasher/db/kv/attester_slashings.go index 5923f2fddd0..318b4b2e779 100644 --- a/slasher/db/kv/attester_slashings.go +++ b/slasher/db/kv/attester_slashings.go @@ -38,7 +38,7 @@ func unmarshalAttSlashings(encoded [][]byte) ([]*ethpb.AttesterSlashing, error) // AttesterSlashings accepts a status and returns all slashings with this status. // returns empty []*ethpb.AttesterSlashing if no slashing has been found with this status. func (db *Store) AttesterSlashings(ctx context.Context, status types.SlashingStatus) ([]*ethpb.AttesterSlashing, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.AttesterSlashings") + ctx, span := trace.StartSpan(ctx, "slasherDB.AttesterSlashings") defer span.End() encoded := make([][]byte, 0) err := db.view(func(tx *bolt.Tx) error { @@ -59,7 +59,7 @@ func (db *Store) AttesterSlashings(ctx context.Context, status types.SlashingSta // DeleteAttesterSlashing deletes an attester slashing proof from db. func (db *Store) DeleteAttesterSlashing(ctx context.Context, attesterSlashing *ethpb.AttesterSlashing) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteAttesterSlashing") + ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteAttesterSlashing") defer span.End() root, err := hashutil.HashProto(attesterSlashing) if err != nil { @@ -80,7 +80,7 @@ func (db *Store) DeleteAttesterSlashing(ctx context.Context, attesterSlashing *e // HasAttesterSlashing returns true and slashing status if a slashing is found in the db. func (db *Store) HasAttesterSlashing(ctx context.Context, slashing *ethpb.AttesterSlashing) (bool, types.SlashingStatus, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.HasAttesterSlashing") + ctx, span := trace.StartSpan(ctx, "slasherDB.HasAttesterSlashing") defer span.End() var status types.SlashingStatus var found bool @@ -103,7 +103,7 @@ func (db *Store) HasAttesterSlashing(ctx context.Context, slashing *ethpb.Attest // SaveAttesterSlashing accepts a slashing proof and its status and writes it to disk. func (db *Store) SaveAttesterSlashing(ctx context.Context, status types.SlashingStatus, slashing *ethpb.AttesterSlashing) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveAttesterSlashing") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveAttesterSlashing") defer span.End() enc, err := proto.Marshal(slashing) if err != nil { @@ -120,7 +120,7 @@ func (db *Store) SaveAttesterSlashing(ctx context.Context, status types.Slashing // SaveAttesterSlashings accepts a slice of slashing proof and its status and writes it to disk. func (db *Store) SaveAttesterSlashings(ctx context.Context, status types.SlashingStatus, slashings []*ethpb.AttesterSlashing) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveAttesterSlashings") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveAttesterSlashings") defer span.End() enc := make([][]byte, len(slashings)) key := make([][]byte, len(slashings)) @@ -148,7 +148,7 @@ func (db *Store) SaveAttesterSlashings(ctx context.Context, status types.Slashin // GetLatestEpochDetected returns the latest detected epoch from db. func (db *Store) GetLatestEpochDetected(ctx context.Context) (uint64, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.GetLatestEpochDetected") + ctx, span := trace.StartSpan(ctx, "slasherDB.GetLatestEpochDetected") defer span.End() var epoch uint64 err := db.view(func(tx *bolt.Tx) error { @@ -166,7 +166,7 @@ func (db *Store) GetLatestEpochDetected(ctx context.Context) (uint64, error) { // SetLatestEpochDetected sets the latest slashing detected epoch in db. func (db *Store) SetLatestEpochDetected(ctx context.Context, epoch uint64) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SetLatestEpochDetected") + ctx, span := trace.StartSpan(ctx, "slasherDB.SetLatestEpochDetected") defer span.End() return db.update(func(tx *bolt.Tx) error { b := tx.Bucket(slashingBucket) diff --git a/slasher/db/kv/block_header.go b/slasher/db/kv/block_header.go index 5accebf7e42..bbb9a4f645c 100644 --- a/slasher/db/kv/block_header.go +++ b/slasher/db/kv/block_header.go @@ -14,7 +14,7 @@ import ( ) func unmarshalBlockHeader(ctx context.Context, enc []byte) (*ethpb.SignedBeaconBlockHeader, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalBlockHeader") + ctx, span := trace.StartSpan(ctx, "slasherDB.unmarshalBlockHeader") defer span.End() protoBlockHeader := ðpb.SignedBeaconBlockHeader{} err := proto.Unmarshal(enc, protoBlockHeader) @@ -27,7 +27,7 @@ func unmarshalBlockHeader(ctx context.Context, enc []byte) (*ethpb.SignedBeaconB // BlockHeaders accepts an epoch and validator id and returns the corresponding block header array. // Returns nil if the block header for those values does not exist. func (db *Store) BlockHeaders(ctx context.Context, epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.BlockHeaders") + ctx, span := trace.StartSpan(ctx, "slasherDB.BlockHeaders") defer span.End() var blockHeaders []*ethpb.SignedBeaconBlockHeader err := db.view(func(tx *bolt.Tx) error { @@ -47,7 +47,7 @@ func (db *Store) BlockHeaders(ctx context.Context, epoch uint64, validatorID uin // HasBlockHeader accepts an epoch and validator id and returns true if the block header exists. func (db *Store) HasBlockHeader(ctx context.Context, epoch uint64, validatorID uint64) bool { - ctx, span := trace.StartSpan(ctx, "SlasherDB.HasBlockHeader") + ctx, span := trace.StartSpan(ctx, "slasherDB.HasBlockHeader") defer span.End() prefix := encodeEpochValidatorID(epoch, validatorID) var hasBlockHeader bool @@ -67,7 +67,7 @@ func (db *Store) HasBlockHeader(ctx context.Context, epoch uint64, validatorID u // SaveBlockHeader accepts a block header and writes it to disk. func (db *Store) SaveBlockHeader(ctx context.Context, epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveBlockHeader") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveBlockHeader") defer span.End() key := encodeEpochValidatorIDSig(epoch, validatorID, blockHeader.Signature) enc, err := proto.Marshal(blockHeader) @@ -96,7 +96,7 @@ func (db *Store) SaveBlockHeader(ctx context.Context, epoch uint64, validatorID // DeleteBlockHeader deletes a block header using the epoch and validator id. func (db *Store) DeleteBlockHeader(ctx context.Context, epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteBlockHeader") + ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteBlockHeader") defer span.End() key := encodeEpochValidatorIDSig(epoch, validatorID, blockHeader.Signature) return db.update(func(tx *bolt.Tx) error { @@ -110,7 +110,7 @@ func (db *Store) DeleteBlockHeader(ctx context.Context, epoch uint64, validatorI // PruneBlockHistory leaves only records younger then history size. func (db *Store) PruneBlockHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.pruneBlockHistory") + ctx, span := trace.StartSpan(ctx, "slasherDB.pruneBlockHistory") defer span.End() pruneTill := int64(currentEpoch) - int64(pruningEpochAge) if pruneTill <= 0 { diff --git a/slasher/db/kv/chain_data.go b/slasher/db/kv/chain_data.go index 306c09b1aad..47c03b40f35 100644 --- a/slasher/db/kv/chain_data.go +++ b/slasher/db/kv/chain_data.go @@ -12,7 +12,7 @@ import ( // ChainHead retrieves the persisted chain head from the database accordingly. func (db *Store) ChainHead(ctx context.Context) (*ethpb.ChainHead, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.ChainHead") + ctx, span := trace.StartSpan(ctx, "slasherDB.ChainHead") defer span.End() var res *ethpb.ChainHead if err := db.update(func(tx *bolt.Tx) error { @@ -31,7 +31,7 @@ func (db *Store) ChainHead(ctx context.Context) (*ethpb.ChainHead, error) { // SaveChainHead accepts a beacon chain head object and persists it to the DB. func (db *Store) SaveChainHead(ctx context.Context, head *ethpb.ChainHead) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveChainHead") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveChainHead") defer span.End() enc, err := proto.Marshal(head) if err != nil { diff --git a/slasher/db/kv/indexed_attestations.go b/slasher/db/kv/indexed_attestations.go index 3cc5034e2a7..88d966acc50 100644 --- a/slasher/db/kv/indexed_attestations.go +++ b/slasher/db/kv/indexed_attestations.go @@ -13,7 +13,7 @@ import ( ) func unmarshalIndexedAttestation(ctx context.Context, enc []byte) (*ethpb.IndexedAttestation, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalIndexedAttestation") + ctx, span := trace.StartSpan(ctx, "slasherDB.unmarshalIndexedAttestation") defer span.End() protoIdxAtt := ðpb.IndexedAttestation{} err := proto.Unmarshal(enc, protoIdxAtt) @@ -27,7 +27,7 @@ func unmarshalIndexedAttestation(ctx context.Context, enc []byte) (*ethpb.Indexe // indexed attestations. // Returns nil if the indexed attestation does not exist with that target epoch. func (db *Store) IndexedAttestationsForTarget(ctx context.Context, targetEpoch uint64) ([]*ethpb.IndexedAttestation, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.IndexedAttestationsForTarget") + ctx, span := trace.StartSpan(ctx, "slasherDB.IndexedAttestationsForTarget") defer span.End() var idxAtts []*ethpb.IndexedAttestation key := bytesutil.Bytes8(targetEpoch) @@ -48,7 +48,7 @@ func (db *Store) IndexedAttestationsForTarget(ctx context.Context, targetEpoch u // IndexedAttestationsWithPrefix accepts a target epoch and signature bytes to find all attestations with the requested prefix. // Returns nil if the indexed attestation does not exist with that target epoch. func (db *Store) IndexedAttestationsWithPrefix(ctx context.Context, targetEpoch uint64, sigBytes []byte) ([]*ethpb.IndexedAttestation, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.IndexedAttestationsWithPrefix") + ctx, span := trace.StartSpan(ctx, "slasherDB.IndexedAttestationsWithPrefix") defer span.End() var idxAtts []*ethpb.IndexedAttestation key := encodeEpochSig(targetEpoch, sigBytes[:]) @@ -68,7 +68,7 @@ func (db *Store) IndexedAttestationsWithPrefix(ctx context.Context, targetEpoch // HasIndexedAttestation accepts an attestation and returns true if it exists in the DB. func (db *Store) HasIndexedAttestation(ctx context.Context, att *ethpb.IndexedAttestation) (bool, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.HasIndexedAttestation") + ctx, span := trace.StartSpan(ctx, "slasherDB.HasIndexedAttestation") defer span.End() key := encodeEpochSig(att.Data.Target.Epoch, att.Signature) var hasAttestation bool @@ -88,7 +88,7 @@ func (db *Store) HasIndexedAttestation(ctx context.Context, att *ethpb.IndexedAt // SaveIndexedAttestation accepts an indexed attestation and writes it to the DB. func (db *Store) SaveIndexedAttestation(ctx context.Context, idxAttestation *ethpb.IndexedAttestation) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveIndexedAttestation") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveIndexedAttestation") defer span.End() key := encodeEpochSig(idxAttestation.Data.Target.Epoch, idxAttestation.Signature) enc, err := proto.Marshal(idxAttestation) @@ -113,7 +113,7 @@ func (db *Store) SaveIndexedAttestation(ctx context.Context, idxAttestation *eth // SaveIndexedAttestations accepts multiple indexed attestations and writes them to the DB. func (db *Store) SaveIndexedAttestations(ctx context.Context, idxAttestations []*ethpb.IndexedAttestation) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveIndexedAttestations") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveIndexedAttestations") defer span.End() keys := make([][]byte, len(idxAttestations)) marshaledAtts := make([][]byte, len(idxAttestations)) @@ -145,7 +145,7 @@ func (db *Store) SaveIndexedAttestations(ctx context.Context, idxAttestations [] // DeleteIndexedAttestation deletes a indexed attestation using the slot and its root as keys in their respective buckets. func (db *Store) DeleteIndexedAttestation(ctx context.Context, idxAttestation *ethpb.IndexedAttestation) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteIndexedAttestation") + ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteIndexedAttestation") defer span.End() key := encodeEpochSig(idxAttestation.Data.Target.Epoch, idxAttestation.Signature) return db.update(func(tx *bolt.Tx) error { @@ -163,7 +163,7 @@ func (db *Store) DeleteIndexedAttestation(ctx context.Context, idxAttestation *e // PruneAttHistory removes all attestations from the DB older than the pruning epoch age. func (db *Store) PruneAttHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.pruneAttHistory") + ctx, span := trace.StartSpan(ctx, "slasherDB.pruneAttHistory") defer span.End() pruneFromEpoch := int64(currentEpoch) - int64(pruningEpochAge) if pruneFromEpoch <= 0 { @@ -186,7 +186,7 @@ func (db *Store) PruneAttHistory(ctx context.Context, currentEpoch uint64, pruni // LatestIndexedAttestationsTargetEpoch returns latest target epoch in db // returns 0 if there is no indexed attestations in db. func (db *Store) LatestIndexedAttestationsTargetEpoch(ctx context.Context) (uint64, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.LatestIndexedAttestationsTargetEpoch") + ctx, span := trace.StartSpan(ctx, "slasherDB.LatestIndexedAttestationsTargetEpoch") defer span.End() var lt uint64 err := db.view(func(tx *bolt.Tx) error { diff --git a/slasher/db/kv/proposer_slashings.go b/slasher/db/kv/proposer_slashings.go index 929667c0144..efc86af63da 100644 --- a/slasher/db/kv/proposer_slashings.go +++ b/slasher/db/kv/proposer_slashings.go @@ -14,7 +14,7 @@ import ( ) func unmarshalProposerSlashing(ctx context.Context, enc []byte) (*ethpb.ProposerSlashing, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalProposerSlashing") + ctx, span := trace.StartSpan(ctx, "slasherDB.unmarshalProposerSlashing") defer span.End() protoSlashing := ðpb.ProposerSlashing{} if err := proto.Unmarshal(enc, protoSlashing); err != nil { @@ -24,7 +24,7 @@ func unmarshalProposerSlashing(ctx context.Context, enc []byte) (*ethpb.Proposer } func unmarshalProposerSlashingArray(ctx context.Context, encoded [][]byte) ([]*ethpb.ProposerSlashing, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalProposerSlashingArray") + ctx, span := trace.StartSpan(ctx, "slasherDB.unmarshalProposerSlashingArray") defer span.End() proposerSlashings := make([]*ethpb.ProposerSlashing, len(encoded)) for i, enc := range encoded { @@ -39,7 +39,7 @@ func unmarshalProposerSlashingArray(ctx context.Context, encoded [][]byte) ([]*e // ProposalSlashingsByStatus returns all the proposal slashing proofs with a certain status. func (db *Store) ProposalSlashingsByStatus(ctx context.Context, status types.SlashingStatus) ([]*ethpb.ProposerSlashing, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.ProposalSlashingsByStatus") + ctx, span := trace.StartSpan(ctx, "slasherDB.ProposalSlashingsByStatus") defer span.End() encoded := make([][]byte, 0) err := db.view(func(tx *bolt.Tx) error { @@ -60,7 +60,7 @@ func (db *Store) ProposalSlashingsByStatus(ctx context.Context, status types.Sla // DeleteProposerSlashing deletes a proposer slashing proof. func (db *Store) DeleteProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteProposerSlashing") + ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteProposerSlashing") defer span.End() root, err := hashutil.HashProto(slashing) if err != nil { @@ -79,7 +79,7 @@ func (db *Store) DeleteProposerSlashing(ctx context.Context, slashing *ethpb.Pro // HasProposerSlashing returns the slashing key if it is found in db. func (db *Store) HasProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) (bool, types.SlashingStatus, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.HasProposerSlashing") + ctx, span := trace.StartSpan(ctx, "slasherDB.HasProposerSlashing") defer span.End() var status types.SlashingStatus var found bool @@ -103,7 +103,7 @@ func (db *Store) HasProposerSlashing(ctx context.Context, slashing *ethpb.Propos // SaveProposerSlashing accepts a proposer slashing and its status header and writes it to disk. func (db *Store) SaveProposerSlashing(ctx context.Context, status types.SlashingStatus, slashing *ethpb.ProposerSlashing) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveProposerSlashing") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveProposerSlashing") defer span.End() enc, err := proto.Marshal(slashing) if err != nil { @@ -120,7 +120,7 @@ func (db *Store) SaveProposerSlashing(ctx context.Context, status types.Slashing // SaveProposerSlashings accepts a slice of slashing proof and its status and writes it to disk. func (db *Store) SaveProposerSlashings(ctx context.Context, status types.SlashingStatus, slashings []*ethpb.ProposerSlashing) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveProposerSlashings") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveProposerSlashings") defer span.End() encSlashings := make([][]byte, len(slashings)) keys := make([][]byte, len(slashings)) diff --git a/slasher/db/kv/spanner.go b/slasher/db/kv/spanner.go index 79cd328c9c6..5024db74b44 100644 --- a/slasher/db/kv/spanner.go +++ b/slasher/db/kv/spanner.go @@ -8,15 +8,17 @@ import ( "github.com/boltdb/bolt" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/slasher/detection/attestations/types" log "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) -// Tracks the highest observed epoch from the validator span maps +// Tracks the highest and lowest observed epochs from the validator span maps // used for attester slashing detection. This value is purely used // as a cache key and only needs to be maintained in memory. var highestObservedEpoch uint64 +var lowestObservedEpoch = params.BeaconConfig().FarFutureEpoch func cacheTypeMismatchError(value interface{}) error { return fmt.Errorf("cache contains a value of type: %v "+ @@ -32,7 +34,7 @@ func persistSpanMapsOnEviction(db *Store) func(uint64, uint64, interface{}, int6 // required by the ristretto cache OnEvict method. // See https://godoc.org/github.com/dgraph-io/ristretto#Config. return func(epoch uint64, _ uint64, value interface{}, cost int64) { - log.Tracef("evicting span map for epoch: %d", epoch) + log.Tracef("Evicting span map for epoch: %d", epoch) err := db.update(func(tx *bolt.Tx) error { bucket := tx.Bucket(validatorsMinMaxSpanBucket) epochBucket, err := bucket.CreateBucketIfNotExists(bytesutil.Bytes8(epoch)) @@ -44,22 +46,21 @@ func persistSpanMapsOnEviction(db *Store) func(uint64, uint64, interface{}, int6 return cacheTypeMismatchError(value) } for k, v := range spanMap { - err = epochBucket.Put(bytesutil.Bytes8(k), marshalSpan(v)) - if err != nil { + if err = epochBucket.Put(bytesutil.Bytes8(k), marshalSpan(v)); err != nil { return err } } return nil }) if err != nil { - log.Errorf("failed to save span map to db on cache eviction: %v", err) + log.Errorf("Failed to save span map to db on cache eviction: %v", err) } } } // Unmarshal a span map from an encoded, flattened array. func unmarshalSpan(ctx context.Context, enc []byte) (types.Span, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalSpan") + ctx, span := trace.StartSpan(ctx, "slasherDB.unmarshalSpan") defer span.End() r := types.Span{} if len(enc) != spannerEncodedLength { @@ -89,7 +90,7 @@ func marshalSpan(span types.Span) []byte { // enabled and the epoch key exists. Returns nil if the span map // for this validator index does not exist. func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]types.Span, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.EpochSpansMap") + ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpansMap") defer span.End() if db.spanCacheEnabled { v, ok := db.spanCache.Get(epoch) @@ -134,25 +135,23 @@ func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]ty // when caching is enabled. // Returns error if the spans for this validator index and epoch does not exist. func (db *Store) EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uint64, epoch uint64) (types.Span, error) { - ctx, span := trace.StartSpan(ctx, "SlasherDB.EpochSpanByValidatorIndex") + ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpanByValidatorIndex") defer span.End() - var err error if db.spanCacheEnabled { - v, ok := db.spanCache.Get(epoch) - spanMap := make(map[uint64]types.Span) + setObservedEpochs(epoch) + spanMap, err := db.findOrLoadEpochInCache(ctx, epoch) + if err != nil { + return types.Span{}, err + } + spans, ok := spanMap[validatorIdx] if ok { - spanMap, ok = v.(map[uint64]types.Span) - if !ok { - return types.Span{}, cacheTypeMismatchError(v) - } - spans, ok := spanMap[validatorIdx] - if ok { - return spans, nil - } + return spans, nil } + return types.Span{}, nil } + var spans types.Span - err = db.view(func(tx *bolt.Tx) error { + err := db.view(func(tx *bolt.Tx) error { b := tx.Bucket(validatorsMinMaxSpanBucket) epochBucket := b.Bucket(bytesutil.Bytes8(epoch)) if epochBucket == nil { @@ -173,23 +172,20 @@ func (db *Store) EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uin return spans, err } -// SaveValidatorEpochSpans accepts validator index epoch and spans returns. +// SaveValidatorEpochSpan accepts validator index epoch and spans returns. // it reads the epoch spans from cache, updates it and save it back to cache // if caching is enabled. // Returns error if the spans for this validator index and epoch does not exist. -func (db *Store) SaveValidatorEpochSpans( +func (db *Store) SaveValidatorEpochSpan( ctx context.Context, validatorIdx uint64, epoch uint64, - spans types.Span, + span types.Span, ) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveValidatorEpochSpans") - defer span.End() - defer span.End() + ctx, traceSpan := trace.StartSpan(ctx, "slasherDB.SaveValidatorEpochSpan") + defer traceSpan.End() if db.spanCacheEnabled { - if epoch > highestObservedEpoch { - highestObservedEpoch = epoch - } + setObservedEpochs(epoch) v, ok := db.spanCache.Get(epoch) spanMap := make(map[uint64]types.Span) if ok { @@ -198,13 +194,14 @@ func (db *Store) SaveValidatorEpochSpans( return cacheTypeMismatchError(v) } } - spanMap[validatorIdx] = spans + spanMap[validatorIdx] = span saved := db.spanCache.Set(epoch, spanMap, 1) if !saved { return fmt.Errorf("failed to save span map to cache") } return nil } + return db.update(func(tx *bolt.Tx) error { b := tx.Bucket(validatorsMinMaxSpanBucket) epochBucket, err := b.CreateBucketIfNotExists(bytesutil.Bytes8(epoch)) @@ -212,7 +209,7 @@ func (db *Store) SaveValidatorEpochSpans( return err } key := bytesutil.Bytes8(validatorIdx) - value := marshalSpan(spans) + value := marshalSpan(span) return epochBucket.Put(key, value) }) } @@ -221,12 +218,10 @@ func (db *Store) SaveValidatorEpochSpans( // saves the spans to cache if caching is enabled. The key in the cache is the highest // epoch seen by slasher and the value is the span map itself. func (db *Store) SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap map[uint64]types.Span) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveEpochSpansMap") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveEpochSpansMap") defer span.End() if db.spanCacheEnabled { - if epoch > highestObservedEpoch { - highestObservedEpoch = epoch - } + setObservedEpochs(epoch) saved := db.spanCache.Set(epoch, spanMap, 1) if !saved { return fmt.Errorf("failed to save span map to cache") @@ -256,12 +251,12 @@ func (db *Store) enableSpanCache(enable bool) { // SaveCachedSpansMaps saves all span maps that are currently // in memory into the DB. if no span maps are in db or cache is disabled it returns nil. func (db *Store) SaveCachedSpansMaps(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveCachedSpansMaps") + ctx, span := trace.StartSpan(ctx, "slasherDB.SaveCachedSpansMaps") defer span.End() if db.spanCacheEnabled { db.enableSpanCache(false) defer db.enableSpanCache(true) - for epoch := uint64(0); epoch <= highestObservedEpoch; epoch++ { + for epoch := lowestObservedEpoch; epoch <= highestObservedEpoch; epoch++ { v, ok := db.spanCache.Get(epoch) if ok { spanMap, ok := v.(map[uint64]types.Span) @@ -271,16 +266,19 @@ func (db *Store) SaveCachedSpansMaps(ctx context.Context) error { if err := db.SaveEpochSpansMap(ctx, epoch, spanMap); err != nil { return errors.Wrap(err, "failed to save span maps from cache") } - } } + // Reset the observed epochs after saving to the DB. + lowestObservedEpoch = params.BeaconConfig().FarFutureEpoch + highestObservedEpoch = 0 + log.Debugf("Epochs %d to %d have been saved", lowestObservedEpoch, highestObservedEpoch) } return nil } // DeleteEpochSpans deletes a epochs validators span map using a epoch index as bucket key. func (db *Store) DeleteEpochSpans(ctx context.Context, epoch uint64) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteEpochSpans") + ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteEpochSpans") defer span.End() if db.spanCacheEnabled { _, ok := db.spanCache.Get(epoch) @@ -300,7 +298,7 @@ func (db *Store) DeleteEpochSpans(ctx context.Context, epoch uint64) error { // deletes spans from cache if caching is enabled. // using a validator index as bucket key. func (db *Store) DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx uint64, epoch uint64) error { - ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteValidatorSpanByEpoch") + ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteValidatorSpanByEpoch") defer span.End() if db.spanCacheEnabled { v, ok := db.spanCache.Get(epoch) @@ -318,6 +316,7 @@ func (db *Store) DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx ui } return nil } + return db.update(func(tx *bolt.Tx) error { bucket := tx.Bucket(validatorsMinMaxSpanBucket) e := bytesutil.Bytes8(epoch) @@ -326,3 +325,38 @@ func (db *Store) DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx ui return epochBucket.Delete(v) }) } + +// findOrLoadEpochInCache checks if the requested epoch is in the cache, and if not, we load it from the DB. +func (db *Store) findOrLoadEpochInCache(ctx context.Context, epoch uint64) (map[uint64]types.Span, error) { + ctx, span := trace.StartSpan(ctx, "slasherDB.findOrLoadEpochInCache") + defer span.End() + v, epochFound := db.spanCache.Get(epoch) + if epochFound { + spanMap, ok := v.(map[uint64]types.Span) + if !ok { + return make(map[uint64]types.Span), cacheTypeMismatchError(v) + } + return spanMap, nil + } + db.enableSpanCache(false) + defer db.enableSpanCache(true) + // If the epoch we want isn't in the cache, load it in. + spanForEpoch, err := db.EpochSpansMap(ctx, epoch) + if err != nil { + return make(map[uint64]types.Span), errors.Wrap(err, "failed to get span map for epoch") + } + saved := db.spanCache.Set(epoch, spanForEpoch, 1) + if !saved { + return make(map[uint64]types.Span), fmt.Errorf("failed to save span map to cache") + } + return spanForEpoch, nil +} + +func setObservedEpochs(epoch uint64) { + if epoch > highestObservedEpoch { + highestObservedEpoch = epoch + } + if epoch < lowestObservedEpoch { + lowestObservedEpoch = epoch + } +} diff --git a/slasher/db/kv/spanner_test.go b/slasher/db/kv/spanner_test.go index 7569884e9d7..7bd68cfbac7 100644 --- a/slasher/db/kv/spanner_test.go +++ b/slasher/db/kv/spanner_test.go @@ -117,7 +117,6 @@ func TestStore_WrongTypeInCache(t *testing.T) { if err == nil || !strings.Contains(err.Error(), "cache contains a value of type") { t.Fatalf("expected error type in cache : %v", err) } - } } @@ -279,8 +278,7 @@ func TestValidatorSpanMap_SaveCachedSpansMaps(t *testing.T) { } // wait for value to pass through cache buffers time.Sleep(time.Millisecond * 10) - err := db.SaveCachedSpansMaps(ctx) - if err != nil { + if err := db.SaveCachedSpansMaps(ctx); err != nil { t.Errorf("Failed to save cached span maps to db: %v", err) } db.spanCache.Clear() diff --git a/slasher/detection/attestations/BUILD.bazel b/slasher/detection/attestations/BUILD.bazel index 3ef4bd64220..99ffa861e5b 100644 --- a/slasher/detection/attestations/BUILD.bazel +++ b/slasher/detection/attestations/BUILD.bazel @@ -23,13 +23,9 @@ go_test( srcs = ["spanner_test.go"], embed = [":go_default_library"], deps = [ - "//shared/cmd:go_default_library", "//shared/sliceutil:go_default_library", - "//slasher/db:go_default_library", - "//slasher/db/kv:go_default_library", + "//slasher/db/testing:go_default_library", "//slasher/detection/attestations/types:go_default_library", - "//slasher/flags:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", - "@com_github_urfave_cli//:go_default_library", ], ) diff --git a/slasher/detection/attestations/spanner.go b/slasher/detection/attestations/spanner.go index a756c0c0e9d..ba72e28095c 100644 --- a/slasher/detection/attestations/spanner.go +++ b/slasher/detection/attestations/spanner.go @@ -12,6 +12,11 @@ import ( "go.opencensus.io/trace" ) +// We look back 128 epochs when updating min/max spans +// for incoming attestations. +// TODO(#5040): Remove lookback and handle min spans properly. +const epochLookback = 128 + var _ = iface.SpanDetector(&SpanDetector{}) // SpanDetector defines a struct which can detect slashable @@ -38,7 +43,7 @@ func (s *SpanDetector) DetectSlashingForValidator( validatorIdx uint64, attData *ethpb.AttestationData, ) (*types.DetectionResult, error) { - ctx, traceSpan := trace.StartSpan(ctx, "detection.DetectSlashingForValidator") + ctx, traceSpan := trace.StartSpan(ctx, "spanner.DetectSlashingForValidator") defer traceSpan.End() sourceEpoch := attData.Source.Epoch targetEpoch := attData.Target.Epoch @@ -64,7 +69,7 @@ func (s *SpanDetector) DetectSlashingForValidator( } return &types.DetectionResult{ Kind: types.SurroundVote, - SlashableEpoch: sourceEpoch + uint64(minSpan), + SlashableEpoch: slashableEpoch, SigBytes: span.SigBytes, }, nil } @@ -82,6 +87,7 @@ func (s *SpanDetector) DetectSlashingForValidator( SigBytes: span.SigBytes, }, nil } + sp, err = s.slasherDB.EpochSpanByValidatorIndex(ctx, validatorIdx, targetEpoch) if err != nil { return nil, err @@ -101,7 +107,7 @@ func (s *SpanDetector) DetectSlashingForValidator( // UpdateSpans given an indexed attestation for all of its attesting indices. func (s *SpanDetector) UpdateSpans(ctx context.Context, att *ethpb.IndexedAttestation) error { - ctx, span := trace.StartSpan(ctx, "detection.UpdateSpans") + ctx, span := trace.StartSpan(ctx, "spanner.UpdateSpans") defer span.End() source := att.Data.Source.Epoch target := att.Data.Target.Epoch @@ -128,15 +134,17 @@ func (s *SpanDetector) UpdateSpans(ctx context.Context, att *ethpb.IndexedAttest // saveSigBytes saves the first 2 bytes of the signature for the att we're updating the spans to. // Later used to help us find the violating attestation in the DB. func (s *SpanDetector) saveSigBytes(ctx context.Context, att *ethpb.IndexedAttestation, valIdx uint64) error { + ctx, traceSpan := trace.StartSpan(ctx, "spanner.saveSigBytes") + defer traceSpan.End() target := att.Data.Target.Epoch - sp, err := s.slasherDB.EpochSpanByValidatorIndex(ctx, valIdx, target) + span, err := s.slasherDB.EpochSpanByValidatorIndex(ctx, valIdx, target) if err != nil { return err } // If the validator has already attested for this target epoch, // then we do not need to update the values of the span sig bytes. - if sp.HasAttested { + if span.HasAttested { return nil } @@ -145,19 +153,24 @@ func (s *SpanDetector) saveSigBytes(ctx context.Context, att *ethpb.IndexedAttes sigBytes = [2]byte{att.Signature[0], att.Signature[1]} } // Save the signature bytes into the span for this epoch. - sp.HasAttested = true - sp.SigBytes = sigBytes - return s.slasherDB.SaveValidatorEpochSpans(ctx, valIdx, target, sp) + span.HasAttested = true + span.SigBytes = sigBytes + return s.slasherDB.SaveValidatorEpochSpan(ctx, valIdx, target, span) } // Updates a min span for a validator index given a source and target epoch // for an attestation produced by the validator. Used for catching surrounding votes. func (s *SpanDetector) updateMinSpan(ctx context.Context, source uint64, target uint64, valIdx uint64) error { + ctx, traceSpan := trace.StartSpan(ctx, "spanner.updateMinSpan") + defer traceSpan.End() if source < 1 { return nil } - for epochInt := int64(source - 1); epochInt >= 0; epochInt-- { - epoch := uint64(epochInt) + lowestEpoch := source - epochLookback + if int(lowestEpoch) <= 0 { + lowestEpoch = 0 + } + for epoch := source - 1; epoch >= lowestEpoch; epoch-- { span, err := s.slasherDB.EpochSpanByValidatorIndex(ctx, valIdx, epoch) if err != nil { return err @@ -170,13 +183,15 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, source uint64, target SigBytes: span.SigBytes, HasAttested: span.HasAttested, } - err := s.slasherDB.SaveValidatorEpochSpans(ctx, valIdx, epoch, span) - if err != nil { + if err := s.slasherDB.SaveValidatorEpochSpan(ctx, valIdx, epoch, span); err != nil { return err } } else { break } + if epoch == 0 { + break + } } return nil } @@ -184,6 +199,8 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, source uint64, target // Updates a max span for a validator index given a source and target epoch // for an attestation produced by the validator. Used for catching surrounded votes. func (s *SpanDetector) updateMaxSpan(ctx context.Context, source uint64, target uint64, valIdx uint64) error { + ctx, traceSpan := trace.StartSpan(ctx, "spanner.updateMaxSpan") + defer traceSpan.End() for epoch := source + 1; epoch < target; epoch++ { span, err := s.slasherDB.EpochSpanByValidatorIndex(ctx, valIdx, epoch) if err != nil { @@ -197,8 +214,7 @@ func (s *SpanDetector) updateMaxSpan(ctx context.Context, source uint64, target SigBytes: span.SigBytes, HasAttested: span.HasAttested, } - err := s.slasherDB.SaveValidatorEpochSpans(ctx, valIdx, epoch, span) - if err != nil { + if err := s.slasherDB.SaveValidatorEpochSpan(ctx, valIdx, epoch, span); err != nil { return err } } else { diff --git a/slasher/detection/attestations/spanner_test.go b/slasher/detection/attestations/spanner_test.go index 58e9ec2d86f..c8fd8c0cefa 100644 --- a/slasher/detection/attestations/spanner_test.go +++ b/slasher/detection/attestations/spanner_test.go @@ -2,23 +2,15 @@ package attestations import ( "context" - "flag" - "path" "reflect" "testing" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/prysm/shared/cmd" "github.com/prysmaticlabs/prysm/shared/sliceutil" - "github.com/prysmaticlabs/prysm/slasher/db" - "github.com/prysmaticlabs/prysm/slasher/db/kv" + testDB "github.com/prysmaticlabs/prysm/slasher/db/testing" "github.com/prysmaticlabs/prysm/slasher/detection/attestations/types" - "github.com/prysmaticlabs/prysm/slasher/flags" - "github.com/urfave/cli" ) -const slasherDBName = "slasherdata" - func TestSpanDetector_DetectSlashingForValidator_Double(t *testing.T) { type testStruct struct { name string @@ -223,24 +215,14 @@ func TestSpanDetector_DetectSlashingForValidator_Double(t *testing.T) { slashCount: 2, }, } - app := cli.NewApp() - set := flag.NewFlagSet("test", 0) - cliCtx := cli.NewContext(app, set, nil) - baseDir := cliCtx.GlobalString(cmd.DataDirFlag.Name) - dbPath := path.Join(baseDir, slasherDBName) - cfg := &kv.Config{SpanCacheEnabled: cliCtx.GlobalBool(flags.UseSpanCacheFlag.Name)} - d, err := db.NewDB(dbPath, cfg) - ctx := context.Background() - if err != nil { - t.Fatalf("Failed to init slasherDB: %v", err) - } - defer d.ClearDB() - defer d.Close() - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + db := testDB.SetupSlasherDB(t, false) + defer testDB.TeardownSlasherDB(t, db) + ctx := context.Background() + sd := &SpanDetector{ - slasherDB: d, + slasherDB: db, } if err := sd.UpdateSpans(ctx, tt.att); err != nil { @@ -267,7 +249,7 @@ func TestSpanDetector_DetectSlashingForValidator_Double(t *testing.T) { } } if slashTotal != tt.slashCount { - t.Fatalf("Unexpected amount of slashings found, received %d, expected %d", slashTotal, tt.slashCount) + t.Fatalf("Unexpected amount of slashings found, received %db, expected %db", slashTotal, tt.slashCount) } }) } @@ -465,12 +447,13 @@ func TestSpanDetector_DetectSlashingForValidator_Surround(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - slasherDB := setupSlasherDB(t) - defer teardownSlasherDB(t, slasherDB) + db := testDB.SetupSlasherDB(t, false) + ctx := context.Background() + defer testDB.TeardownSlasherDB(t, db) + sd := &SpanDetector{ - slasherDB: slasherDB, + slasherDB: db, } - ctx := context.Background() // We only care about validator index 0 for these tests for simplicity. validatorIndex := uint64(0) for k, v := range tt.spansByEpochForValidator { @@ -480,12 +463,11 @@ func TestSpanDetector_DetectSlashingForValidator_Surround(t *testing.T) { MaxSpan: v[1], }, } - err := sd.slasherDB.SaveEpochSpansMap(ctx, k, span) - if err != nil { + if err := sd.slasherDB.SaveEpochSpansMap(ctx, k, span); err != nil { t.Fatalf("Failed to save to slasherDB: %v", err) } - } + attData := ðpb.AttestationData{ Source: ðpb.Checkpoint{ Epoch: tt.sourceEpoch, @@ -583,12 +565,14 @@ func TestSpanDetector_DetectSlashingForValidator_MultipleValidators(t *testing.T } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - slasherDB := setupSlasherDB(t) - defer teardownSlasherDB(t, slasherDB) + db := testDB.SetupSlasherDB(t, false) + ctx := context.Background() + defer db.ClearDB() + defer db.Close() + sd := &SpanDetector{ - slasherDB: slasherDB, + slasherDB: db, } - ctx := context.Background() for i := 0; i < len(tt.spansByEpoch); i++ { epoch := uint64(i) err := sd.slasherDB.SaveEpochSpansMap(ctx, epoch, tt.spansByEpoch[epoch]) @@ -735,12 +719,14 @@ func TestNewSpanDetector_UpdateSpans(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - slasherDB := setupSlasherDB(t) - defer teardownSlasherDB(t, slasherDB) + db := testDB.SetupSlasherDB(t, false) + ctx := context.Background() + defer db.ClearDB() + defer db.Close() + sd := &SpanDetector{ - slasherDB: slasherDB, + slasherDB: db, } - ctx := context.Background() if err := sd.UpdateSpans(ctx, tt.att); err != nil { t.Fatal(err) } @@ -757,26 +743,3 @@ func TestNewSpanDetector_UpdateSpans(t *testing.T) { }) } } - -func setupSlasherDB(t *testing.T) *kv.Store { - app := cli.NewApp() - set := flag.NewFlagSet("test", 0) - cliCtx := cli.NewContext(app, set, nil) - baseDir := cliCtx.GlobalString(cmd.DataDirFlag.Name) - dbPath := path.Join(baseDir, slasherDBName) - cfg := &kv.Config{SpanCacheEnabled: cliCtx.GlobalBool(flags.UseSpanCacheFlag.Name)} - slasherDB, err := db.NewDB(dbPath, cfg) - if err != nil { - t.Fatalf("Failed to init slasher db: %v", err) - } - return slasherDB -} - -func teardownSlasherDB(t *testing.T, slasherDB *kv.Store) { - if err := slasherDB.ClearDB(); err != nil { - t.Fatal(err) - } - if err := slasherDB.Close(); err != nil { - t.Fatal(err) - } -} diff --git a/slasher/detection/detect.go b/slasher/detection/detect.go index 56540710e80..2dbfa01bb6f 100644 --- a/slasher/detection/detect.go +++ b/slasher/detection/detect.go @@ -9,12 +9,15 @@ import ( "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/sliceutil" "github.com/prysmaticlabs/prysm/slasher/detection/attestations/types" + "go.opencensus.io/trace" ) func (ds *Service) detectAttesterSlashings( ctx context.Context, att *ethpb.IndexedAttestation, ) ([]*ethpb.AttesterSlashing, error) { + ctx, span := trace.StartSpan(ctx, "detection.detectAttesterSlashings") + defer span.End() slashings := make([]*ethpb.AttesterSlashing, 0) for i := 0; i < len(att.AttestingIndices); i++ { valIdx := att.AttestingIndices[i] @@ -67,6 +70,8 @@ func (ds *Service) detectDoubleVote( incomingAtt *ethpb.IndexedAttestation, detectionResult *types.DetectionResult, ) (*ethpb.AttesterSlashing, error) { + ctx, span := trace.StartSpan(ctx, "detection.detectDoubleVote") + defer span.End() if detectionResult == nil || detectionResult.Kind != types.DoubleVote { return nil, nil } @@ -91,8 +96,7 @@ func (ds *Service) detectDoubleVote( }, nil } } - - return nil, errors.New("unexpected false positive in double vote detection") + return nil, nil } // detectSurroundVotes cross references the passed in attestation with the requested validator's @@ -102,6 +106,8 @@ func (ds *Service) detectSurroundVotes( incomingAtt *ethpb.IndexedAttestation, detectionResult *types.DetectionResult, ) (*ethpb.AttesterSlashing, error) { + ctx, span := trace.StartSpan(ctx, "detection.detectSurroundVotes") + defer span.End() if detectionResult == nil || detectionResult.Kind != types.SurroundVote { return nil, nil } diff --git a/slasher/detection/service.go b/slasher/detection/service.go index 262bccc7e6a..0ca8102861f 100644 --- a/slasher/detection/service.go +++ b/slasher/detection/service.go @@ -83,7 +83,7 @@ func (ds *Service) Start() { // The detection service runs detection on all historical // chain data since genesis. - go ds.detectHistoricalChainData(ds.ctx) + // TODO(#5030): Re-enable after issue is resolved. // We subscribe to incoming blocks from the beacon node via // our gRPC client to keep detecting slashable offenses. @@ -141,6 +141,8 @@ func (ds *Service) detectHistoricalChainData(ctx context.Context) { } func (ds *Service) submitAttesterSlashings(ctx context.Context, slashings []*ethpb.AttesterSlashing, epoch uint64) { + ctx, span := trace.StartSpan(ctx, "detection.submitAttesterSlashings") + defer span.End() var slashedIndices []uint64 for i := 0; i < len(slashings); i++ { slashableIndices := sliceutil.IntersectionUint64(slashings[i].Attestation_1.AttestingIndices, slashings[i].Attestation_2.AttestingIndices) @@ -151,6 +153,6 @@ func (ds *Service) submitAttesterSlashings(ctx context.Context, slashings []*eth log.WithFields(logrus.Fields{ "targetEpoch": epoch, "indices": slashedIndices, - }).Infof("Found %d attester slashings! Submitting to beacon node.", len(slashings)) + }).Infof("Found %d attester slashings! Submitting to beacon node", len(slashings)) } }