Skip to content

Commit

Permalink
Prepare Slasher for Production (#5020)
Browse files Browse the repository at this point in the history
* rem slasher proto
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* Merge branch 'master' of github.com:prysmaticlabs/prysm
* add a bit more better logging
* Empty db fix
* Improve logs
* Fix small issues in spanner, improvements
* Change costs back to 1 for now
* Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher
* Change the cache back to 0
* Cleanup
* Merge branch 'master' into cleanup-slasher
* lint
* added in better spans
* log
* rem spanner in super intensive operation
* Merge branch 'master' into cleanup-slasher
* add todo
* Merge branch 'cleanup-slasher' of github.com:prysmaticlabs/prysm into cleanup-slasher
* Merge branch 'master' into cleanup-slasher
* Apply suggestions from code review
* no logrus
* Merge branch 'master' into cleanup-slasher
* Merge branch 'cleanup-slasher' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher
* Remove spammy logs
* Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into cleanup-slasher
* gaz
* Rename func
* Add back needed code
* Add todo
* Add span to cache func
  • Loading branch information
rauljordan committed Mar 8, 2020
1 parent 300d072 commit eddaea8
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 161 deletions.
2 changes: 1 addition & 1 deletion slasher/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type WriteAccessDatabase interface {

// MinMaxSpan related methods.
SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap map[uint64]detectionTypes.Span) error
SaveValidatorEpochSpans(ctx context.Context, validatorIdx uint64, epoch uint64, spans detectionTypes.Span) error
SaveValidatorEpochSpan(ctx context.Context, validatorIdx uint64, epoch uint64, spans detectionTypes.Span) error
SaveCachedSpansMaps(ctx context.Context) error
DeleteEpochSpans(ctx context.Context, validatorIdx uint64) error
DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx uint64, epoch uint64) error
Expand Down
14 changes: 7 additions & 7 deletions slasher/db/kv/attester_slashings.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func unmarshalAttSlashings(encoded [][]byte) ([]*ethpb.AttesterSlashing, error)
// AttesterSlashings accepts a status and returns all slashings with this status.
// returns empty []*ethpb.AttesterSlashing if no slashing has been found with this status.
func (db *Store) AttesterSlashings(ctx context.Context, status types.SlashingStatus) ([]*ethpb.AttesterSlashing, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.AttesterSlashings")
ctx, span := trace.StartSpan(ctx, "slasherDB.AttesterSlashings")
defer span.End()
encoded := make([][]byte, 0)
err := db.view(func(tx *bolt.Tx) error {
Expand All @@ -59,7 +59,7 @@ func (db *Store) AttesterSlashings(ctx context.Context, status types.SlashingSta

// DeleteAttesterSlashing deletes an attester slashing proof from db.
func (db *Store) DeleteAttesterSlashing(ctx context.Context, attesterSlashing *ethpb.AttesterSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteAttesterSlashing")
ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteAttesterSlashing")
defer span.End()
root, err := hashutil.HashProto(attesterSlashing)
if err != nil {
Expand All @@ -80,7 +80,7 @@ func (db *Store) DeleteAttesterSlashing(ctx context.Context, attesterSlashing *e

// HasAttesterSlashing returns true and slashing status if a slashing is found in the db.
func (db *Store) HasAttesterSlashing(ctx context.Context, slashing *ethpb.AttesterSlashing) (bool, types.SlashingStatus, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.HasAttesterSlashing")
ctx, span := trace.StartSpan(ctx, "slasherDB.HasAttesterSlashing")
defer span.End()
var status types.SlashingStatus
var found bool
Expand All @@ -103,7 +103,7 @@ func (db *Store) HasAttesterSlashing(ctx context.Context, slashing *ethpb.Attest

// SaveAttesterSlashing accepts a slashing proof and its status and writes it to disk.
func (db *Store) SaveAttesterSlashing(ctx context.Context, status types.SlashingStatus, slashing *ethpb.AttesterSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveAttesterSlashing")
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveAttesterSlashing")
defer span.End()
enc, err := proto.Marshal(slashing)
if err != nil {
Expand All @@ -120,7 +120,7 @@ func (db *Store) SaveAttesterSlashing(ctx context.Context, status types.Slashing

// SaveAttesterSlashings accepts a slice of slashing proof and its status and writes it to disk.
func (db *Store) SaveAttesterSlashings(ctx context.Context, status types.SlashingStatus, slashings []*ethpb.AttesterSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveAttesterSlashings")
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveAttesterSlashings")
defer span.End()
enc := make([][]byte, len(slashings))
key := make([][]byte, len(slashings))
Expand Down Expand Up @@ -148,7 +148,7 @@ func (db *Store) SaveAttesterSlashings(ctx context.Context, status types.Slashin

// GetLatestEpochDetected returns the latest detected epoch from db.
func (db *Store) GetLatestEpochDetected(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.GetLatestEpochDetected")
ctx, span := trace.StartSpan(ctx, "slasherDB.GetLatestEpochDetected")
defer span.End()
var epoch uint64
err := db.view(func(tx *bolt.Tx) error {
Expand All @@ -166,7 +166,7 @@ func (db *Store) GetLatestEpochDetected(ctx context.Context) (uint64, error) {

// SetLatestEpochDetected sets the latest slashing detected epoch in db.
func (db *Store) SetLatestEpochDetected(ctx context.Context, epoch uint64) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SetLatestEpochDetected")
ctx, span := trace.StartSpan(ctx, "slasherDB.SetLatestEpochDetected")
defer span.End()
return db.update(func(tx *bolt.Tx) error {
b := tx.Bucket(slashingBucket)
Expand Down
12 changes: 6 additions & 6 deletions slasher/db/kv/block_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func unmarshalBlockHeader(ctx context.Context, enc []byte) (*ethpb.SignedBeaconBlockHeader, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalBlockHeader")
ctx, span := trace.StartSpan(ctx, "slasherDB.unmarshalBlockHeader")
defer span.End()
protoBlockHeader := &ethpb.SignedBeaconBlockHeader{}
err := proto.Unmarshal(enc, protoBlockHeader)
Expand All @@ -27,7 +27,7 @@ func unmarshalBlockHeader(ctx context.Context, enc []byte) (*ethpb.SignedBeaconB
// BlockHeaders accepts an epoch and validator id and returns the corresponding block header array.
// Returns nil if the block header for those values does not exist.
func (db *Store) BlockHeaders(ctx context.Context, epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.BlockHeaders")
ctx, span := trace.StartSpan(ctx, "slasherDB.BlockHeaders")
defer span.End()
var blockHeaders []*ethpb.SignedBeaconBlockHeader
err := db.view(func(tx *bolt.Tx) error {
Expand All @@ -47,7 +47,7 @@ func (db *Store) BlockHeaders(ctx context.Context, epoch uint64, validatorID uin

// HasBlockHeader accepts an epoch and validator id and returns true if the block header exists.
func (db *Store) HasBlockHeader(ctx context.Context, epoch uint64, validatorID uint64) bool {
ctx, span := trace.StartSpan(ctx, "SlasherDB.HasBlockHeader")
ctx, span := trace.StartSpan(ctx, "slasherDB.HasBlockHeader")
defer span.End()
prefix := encodeEpochValidatorID(epoch, validatorID)
var hasBlockHeader bool
Expand All @@ -67,7 +67,7 @@ func (db *Store) HasBlockHeader(ctx context.Context, epoch uint64, validatorID u

// SaveBlockHeader accepts a block header and writes it to disk.
func (db *Store) SaveBlockHeader(ctx context.Context, epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveBlockHeader")
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveBlockHeader")
defer span.End()
key := encodeEpochValidatorIDSig(epoch, validatorID, blockHeader.Signature)
enc, err := proto.Marshal(blockHeader)
Expand Down Expand Up @@ -96,7 +96,7 @@ func (db *Store) SaveBlockHeader(ctx context.Context, epoch uint64, validatorID

// DeleteBlockHeader deletes a block header using the epoch and validator id.
func (db *Store) DeleteBlockHeader(ctx context.Context, epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteBlockHeader")
ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteBlockHeader")
defer span.End()
key := encodeEpochValidatorIDSig(epoch, validatorID, blockHeader.Signature)
return db.update(func(tx *bolt.Tx) error {
Expand All @@ -110,7 +110,7 @@ func (db *Store) DeleteBlockHeader(ctx context.Context, epoch uint64, validatorI

// PruneBlockHistory leaves only records younger then history size.
func (db *Store) PruneBlockHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.pruneBlockHistory")
ctx, span := trace.StartSpan(ctx, "slasherDB.pruneBlockHistory")
defer span.End()
pruneTill := int64(currentEpoch) - int64(pruningEpochAge)
if pruneTill <= 0 {
Expand Down
4 changes: 2 additions & 2 deletions slasher/db/kv/chain_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// ChainHead retrieves the persisted chain head from the database accordingly.
func (db *Store) ChainHead(ctx context.Context) (*ethpb.ChainHead, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.ChainHead")
ctx, span := trace.StartSpan(ctx, "slasherDB.ChainHead")
defer span.End()
var res *ethpb.ChainHead
if err := db.update(func(tx *bolt.Tx) error {
Expand All @@ -31,7 +31,7 @@ func (db *Store) ChainHead(ctx context.Context) (*ethpb.ChainHead, error) {

// SaveChainHead accepts a beacon chain head object and persists it to the DB.
func (db *Store) SaveChainHead(ctx context.Context, head *ethpb.ChainHead) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveChainHead")
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveChainHead")
defer span.End()
enc, err := proto.Marshal(head)
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions slasher/db/kv/indexed_attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func unmarshalIndexedAttestation(ctx context.Context, enc []byte) (*ethpb.IndexedAttestation, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalIndexedAttestation")
ctx, span := trace.StartSpan(ctx, "slasherDB.unmarshalIndexedAttestation")
defer span.End()
protoIdxAtt := &ethpb.IndexedAttestation{}
err := proto.Unmarshal(enc, protoIdxAtt)
Expand All @@ -27,7 +27,7 @@ func unmarshalIndexedAttestation(ctx context.Context, enc []byte) (*ethpb.Indexe
// indexed attestations.
// Returns nil if the indexed attestation does not exist with that target epoch.
func (db *Store) IndexedAttestationsForTarget(ctx context.Context, targetEpoch uint64) ([]*ethpb.IndexedAttestation, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.IndexedAttestationsForTarget")
ctx, span := trace.StartSpan(ctx, "slasherDB.IndexedAttestationsForTarget")
defer span.End()
var idxAtts []*ethpb.IndexedAttestation
key := bytesutil.Bytes8(targetEpoch)
Expand All @@ -48,7 +48,7 @@ func (db *Store) IndexedAttestationsForTarget(ctx context.Context, targetEpoch u
// IndexedAttestationsWithPrefix accepts a target epoch and signature bytes to find all attestations with the requested prefix.
// Returns nil if the indexed attestation does not exist with that target epoch.
func (db *Store) IndexedAttestationsWithPrefix(ctx context.Context, targetEpoch uint64, sigBytes []byte) ([]*ethpb.IndexedAttestation, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.IndexedAttestationsWithPrefix")
ctx, span := trace.StartSpan(ctx, "slasherDB.IndexedAttestationsWithPrefix")
defer span.End()
var idxAtts []*ethpb.IndexedAttestation
key := encodeEpochSig(targetEpoch, sigBytes[:])
Expand All @@ -68,7 +68,7 @@ func (db *Store) IndexedAttestationsWithPrefix(ctx context.Context, targetEpoch

// HasIndexedAttestation accepts an attestation and returns true if it exists in the DB.
func (db *Store) HasIndexedAttestation(ctx context.Context, att *ethpb.IndexedAttestation) (bool, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.HasIndexedAttestation")
ctx, span := trace.StartSpan(ctx, "slasherDB.HasIndexedAttestation")
defer span.End()
key := encodeEpochSig(att.Data.Target.Epoch, att.Signature)
var hasAttestation bool
Expand All @@ -88,7 +88,7 @@ func (db *Store) HasIndexedAttestation(ctx context.Context, att *ethpb.IndexedAt

// SaveIndexedAttestation accepts an indexed attestation and writes it to the DB.
func (db *Store) SaveIndexedAttestation(ctx context.Context, idxAttestation *ethpb.IndexedAttestation) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveIndexedAttestation")
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveIndexedAttestation")
defer span.End()
key := encodeEpochSig(idxAttestation.Data.Target.Epoch, idxAttestation.Signature)
enc, err := proto.Marshal(idxAttestation)
Expand All @@ -113,7 +113,7 @@ func (db *Store) SaveIndexedAttestation(ctx context.Context, idxAttestation *eth

// SaveIndexedAttestations accepts multiple indexed attestations and writes them to the DB.
func (db *Store) SaveIndexedAttestations(ctx context.Context, idxAttestations []*ethpb.IndexedAttestation) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveIndexedAttestations")
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveIndexedAttestations")
defer span.End()
keys := make([][]byte, len(idxAttestations))
marshaledAtts := make([][]byte, len(idxAttestations))
Expand Down Expand Up @@ -145,7 +145,7 @@ func (db *Store) SaveIndexedAttestations(ctx context.Context, idxAttestations []

// DeleteIndexedAttestation deletes a indexed attestation using the slot and its root as keys in their respective buckets.
func (db *Store) DeleteIndexedAttestation(ctx context.Context, idxAttestation *ethpb.IndexedAttestation) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteIndexedAttestation")
ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteIndexedAttestation")
defer span.End()
key := encodeEpochSig(idxAttestation.Data.Target.Epoch, idxAttestation.Signature)
return db.update(func(tx *bolt.Tx) error {
Expand All @@ -163,7 +163,7 @@ func (db *Store) DeleteIndexedAttestation(ctx context.Context, idxAttestation *e

// PruneAttHistory removes all attestations from the DB older than the pruning epoch age.
func (db *Store) PruneAttHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.pruneAttHistory")
ctx, span := trace.StartSpan(ctx, "slasherDB.pruneAttHistory")
defer span.End()
pruneFromEpoch := int64(currentEpoch) - int64(pruningEpochAge)
if pruneFromEpoch <= 0 {
Expand All @@ -186,7 +186,7 @@ func (db *Store) PruneAttHistory(ctx context.Context, currentEpoch uint64, pruni
// LatestIndexedAttestationsTargetEpoch returns latest target epoch in db
// returns 0 if there is no indexed attestations in db.
func (db *Store) LatestIndexedAttestationsTargetEpoch(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.LatestIndexedAttestationsTargetEpoch")
ctx, span := trace.StartSpan(ctx, "slasherDB.LatestIndexedAttestationsTargetEpoch")
defer span.End()
var lt uint64
err := db.view(func(tx *bolt.Tx) error {
Expand Down
14 changes: 7 additions & 7 deletions slasher/db/kv/proposer_slashings.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func unmarshalProposerSlashing(ctx context.Context, enc []byte) (*ethpb.ProposerSlashing, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalProposerSlashing")
ctx, span := trace.StartSpan(ctx, "slasherDB.unmarshalProposerSlashing")
defer span.End()
protoSlashing := &ethpb.ProposerSlashing{}
if err := proto.Unmarshal(enc, protoSlashing); err != nil {
Expand All @@ -24,7 +24,7 @@ func unmarshalProposerSlashing(ctx context.Context, enc []byte) (*ethpb.Proposer
}

func unmarshalProposerSlashingArray(ctx context.Context, encoded [][]byte) ([]*ethpb.ProposerSlashing, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalProposerSlashingArray")
ctx, span := trace.StartSpan(ctx, "slasherDB.unmarshalProposerSlashingArray")
defer span.End()
proposerSlashings := make([]*ethpb.ProposerSlashing, len(encoded))
for i, enc := range encoded {
Expand All @@ -39,7 +39,7 @@ func unmarshalProposerSlashingArray(ctx context.Context, encoded [][]byte) ([]*e

// ProposalSlashingsByStatus returns all the proposal slashing proofs with a certain status.
func (db *Store) ProposalSlashingsByStatus(ctx context.Context, status types.SlashingStatus) ([]*ethpb.ProposerSlashing, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.ProposalSlashingsByStatus")
ctx, span := trace.StartSpan(ctx, "slasherDB.ProposalSlashingsByStatus")
defer span.End()
encoded := make([][]byte, 0)
err := db.view(func(tx *bolt.Tx) error {
Expand All @@ -60,7 +60,7 @@ func (db *Store) ProposalSlashingsByStatus(ctx context.Context, status types.Sla

// DeleteProposerSlashing deletes a proposer slashing proof.
func (db *Store) DeleteProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteProposerSlashing")
ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteProposerSlashing")
defer span.End()
root, err := hashutil.HashProto(slashing)
if err != nil {
Expand All @@ -79,7 +79,7 @@ func (db *Store) DeleteProposerSlashing(ctx context.Context, slashing *ethpb.Pro

// HasProposerSlashing returns the slashing key if it is found in db.
func (db *Store) HasProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) (bool, types.SlashingStatus, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.HasProposerSlashing")
ctx, span := trace.StartSpan(ctx, "slasherDB.HasProposerSlashing")
defer span.End()
var status types.SlashingStatus
var found bool
Expand All @@ -103,7 +103,7 @@ func (db *Store) HasProposerSlashing(ctx context.Context, slashing *ethpb.Propos

// SaveProposerSlashing accepts a proposer slashing and its status header and writes it to disk.
func (db *Store) SaveProposerSlashing(ctx context.Context, status types.SlashingStatus, slashing *ethpb.ProposerSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveProposerSlashing")
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveProposerSlashing")
defer span.End()
enc, err := proto.Marshal(slashing)
if err != nil {
Expand All @@ -120,7 +120,7 @@ func (db *Store) SaveProposerSlashing(ctx context.Context, status types.Slashing

// SaveProposerSlashings accepts a slice of slashing proof and its status and writes it to disk.
func (db *Store) SaveProposerSlashings(ctx context.Context, status types.SlashingStatus, slashings []*ethpb.ProposerSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveProposerSlashings")
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveProposerSlashings")
defer span.End()
encSlashings := make([][]byte, len(slashings))
keys := make([][]byte, len(slashings))
Expand Down
Loading

0 comments on commit eddaea8

Please sign in to comment.