Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slasher proto and function renames #4797

Merged
merged 10 commits into from Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
284 changes: 129 additions & 155 deletions proto/slashing/slashing.pb.go

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions proto/slashing/slashing.proto
Expand Up @@ -27,15 +27,19 @@ service Slasher {
rpc AttesterSlashings(SlashingStatusRequest) returns (AttesterSlashingResponse);
}

message ValidatorIDToIdxAtt {
// CompressedIdxAtt is an indexed attestation with the []byte data root
// in place of its AttestationData.
message CompressedIdxAtt {
repeated uint64 indices = 1 ;
bytes data_root = 2;
// 96 bytes aggregate signature.
bytes signature = 3;
}

message ValidatorIDToIdxAttList {
repeated ValidatorIDToIdxAtt indicesList = 1 ;
// CompressedIdxAttList is a list of CompressedIdxAtts used for
// accessing an array from the DB.
message CompressedIdxAttList {
repeated CompressedIdxAtt list = 1;
}

message ProposerSlashingRequest {
Expand Down Expand Up @@ -97,7 +101,7 @@ message SlashingStatusRequest {
// Included slashing proof that has been included in a block.
Included = 2;
// Reverted slashing proof that has been reverted and therefore is relevant again.
Reverted = 3;
Reverted = 3;
}
SlashingStatus status = 1;
}
4 changes: 2 additions & 2 deletions slasher/db/block_header.go
Expand Up @@ -20,9 +20,9 @@ func unmarshalBlockHeader(enc []byte) (*ethpb.SignedBeaconBlockHeader, error) {
return protoBlockHeader, nil
}

// BlockHeader accepts an epoch and validator id and returns the corresponding block header array.
// BlockHeaders accepts an epoch and validator id and returns the corresponding block header array.
// Returns nil if the block header for those values does not exist.
func (db *Store) BlockHeader(epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error) {
func (db *Store) BlockHeaders(epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error) {
var blockHeaders []*ethpb.SignedBeaconBlockHeader
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(historicBlockHeadersBucket).Cursor()
Expand Down
12 changes: 6 additions & 6 deletions slasher/db/block_header_test.go
Expand Up @@ -24,7 +24,7 @@ func TestNilDBHistoryBlkHdr(t *testing.T) {
t.Fatal("HasBlockHeader should return false")
}

bPrime, err := db.BlockHeader(epoch, validatorID)
bPrime, err := db.BlockHeaders(epoch, validatorID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestSaveHistoryBlkHdr(t *testing.T) {
t.Fatalf("save block failed: %v", err)
}

bha, err := db.BlockHeader(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestDeleteHistoryBlkHdr(t *testing.T) {
}

for _, tt := range tests {
bha, err := db.BlockHeader(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
Expand All @@ -127,7 +127,7 @@ func TestDeleteHistoryBlkHdr(t *testing.T) {
if err != nil {
t.Fatalf("save block failed: %v", err)
}
bh, err := db.BlockHeader(tt.epoch, tt.vID)
bh, err := db.BlockHeaders(tt.epoch, tt.vID)

if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestPruneHistoryBlkHdr(t *testing.T) {
t.Fatalf("save block header failed: %v", err)
}

bha, err := db.BlockHeader(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block header: %v", err)
}
Expand All @@ -253,7 +253,7 @@ func TestPruneHistoryBlkHdr(t *testing.T) {
}

for _, tt := range tests {
bha, err := db.BlockHeader(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block header: %v", err)
}
Expand Down
16 changes: 9 additions & 7 deletions slasher/db/db.go
Expand Up @@ -12,6 +12,9 @@ import (
)

var log = logrus.WithField("prefix", "slasherDB")

var databaseFileName = "slasher.db"

var d *Store

// Store defines an implementation of the Prysm Database interface
Expand Down Expand Up @@ -53,12 +56,12 @@ func NewDB(dirPath string, cfg *Config) (*Store, error) {
return d, err
}

// ClearDB removes the previously stored directory at the data directory.
// ClearDB removes any previously stored data at the configured data directory.
func (db *Store) ClearDB() error {
if _, err := os.Stat(db.databasePath); os.IsNotExist(err) {
return nil
}
return os.RemoveAll(db.databasePath)
return os.Remove(db.databasePath)
}

// DatabasePath at which this database writes files.
Expand All @@ -82,7 +85,7 @@ func NewKVStore(dirPath string, cfg *Config) (*Store, error) {
if err := os.MkdirAll(dirPath, 0700); err != nil {
return nil, err
}
datafile := path.Join(dirPath, "slasher.db")
datafile := path.Join(dirPath, databaseFileName)
boltDB, err := bolt.Open(datafile, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
if err == bolt.ErrTimeout {
Expand All @@ -103,17 +106,16 @@ func NewKVStore(dirPath string, cfg *Config) (*Store, error) {
OnEvict: saveToDB,
})
if err != nil {
errors.Wrap(err, "failed to start span cache")
return nil, err
return nil, errors.Wrap(err, "failed to start span cache")
}
kv := &Store{db: boltDB, databasePath: dirPath, spanCache: spanCache, spanCacheEnabled: cfg.SpanCacheEnabled}
kv := &Store{db: boltDB, databasePath: datafile, spanCache: spanCache, spanCacheEnabled: cfg.SpanCacheEnabled}

if err := kv.db.Update(func(tx *bolt.Tx) error {
return createBuckets(
tx,
historicIndexedAttestationsBucket,
historicBlockHeadersBucket,
indexedAttestationsIndicesBucket,
compressedIdxAttsBucket,
validatorsPublicKeysBucket,
validatorsMinMaxSpanBucket,
slashingBucket,
Expand Down
52 changes: 26 additions & 26 deletions slasher/db/indexed_attestations.go
Expand Up @@ -25,8 +25,8 @@ func unmarshalIdxAtt(enc []byte) (*ethpb.IndexedAttestation, error) {
return protoIdxAtt, nil
}

func unmarshalValIDsToIdxAttList(enc []byte) (*slashpb.ValidatorIDToIdxAttList, error) {
protoIdxAtt := &slashpb.ValidatorIDToIdxAttList{}
func unmarshalCompressedIdxAttList(enc []byte) (*slashpb.CompressedIdxAttList, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompressedIdx sounds too vague imo, is it validator index? attestation index? block index?... etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an indexed attestation, but the AttestationData is changed to its []byte attestation data root. That's why I refer to it as "compressed".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense 👍

protoIdxAtt := &slashpb.CompressedIdxAttList{}
err := proto.Unmarshal(enc, protoIdxAtt)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding")
Expand All @@ -41,17 +41,17 @@ func (db *Store) IdxAttsForTargetFromID(targetEpoch uint64, validatorID uint64)
var idxAtts []*ethpb.IndexedAttestation

err := db.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(indexedAttestationsIndicesBucket)
bucket := tx.Bucket(compressedIdxAttsBucket)
enc := bucket.Get(bytesutil.Bytes8(targetEpoch))
if enc == nil {
return nil
}
idToIdxAttsList, err := unmarshalValIDsToIdxAttList(enc)
idToIdxAttsList, err := unmarshalCompressedIdxAttList(enc)
if err != nil {
return err
}

for _, idxAtt := range idToIdxAttsList.IndicesList {
for _, idxAtt := range idToIdxAttsList.List {
i := sort.Search(len(idxAtt.Indices), func(i int) bool {
return idxAtt.Indices[i] >= validatorID
})
Expand Down Expand Up @@ -116,7 +116,7 @@ func (db *Store) LatestIndexedAttestationsTargetEpoch() (uint64, error) {
func (db *Store) LatestValidatorIdx() (uint64, error) {
var lt uint64
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(indexedAttestationsIndicesBucket).Cursor()
c := tx.Bucket(compressedIdxAttsBucket).Cursor()
k, _ := c.Last()
if k == nil {
return nil
Expand Down Expand Up @@ -167,16 +167,16 @@ func (db *Store) HasIndexedAttestation(targetEpoch uint64, validatorID uint64) (
var hasAttestation bool
// #nosec G104
err := db.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(indexedAttestationsIndicesBucket)
bucket := tx.Bucket(compressedIdxAttsBucket)
enc := bucket.Get(key)
if enc == nil {
return nil
}
iList, err := unmarshalValIDsToIdxAttList(enc)
iList, err := unmarshalCompressedIdxAttList(enc)
if err != nil {
return err
}
for _, idxAtt := range iList.IndicesList {
for _, idxAtt := range iList.List {
i := sort.Search(len(idxAtt.Indices), func(i int) bool {
return idxAtt.Indices[i] >= validatorID
})
Expand Down Expand Up @@ -205,7 +205,7 @@ func (db *Store) SaveIndexedAttestation(idxAttestation *ethpb.IndexedAttestation
if val != nil {
return nil
}
if err := saveIdxAttIndicesByEpochToDB(idxAttestation, tx); err != nil {
if err := saveCompressedIdxAttToEpochList(idxAttestation, tx); err != nil {
return errors.Wrap(err, "failed to save indices from indexed attestation")
}
if err := bucket.Put(key, enc); err != nil {
Expand All @@ -225,26 +225,26 @@ func (db *Store) SaveIndexedAttestation(idxAttestation *ethpb.IndexedAttestation
return err
}

func saveIdxAttIndicesByEpochToDB(idxAttestation *ethpb.IndexedAttestation, tx *bolt.Tx) error {
func saveCompressedIdxAttToEpochList(idxAttestation *ethpb.IndexedAttestation, tx *bolt.Tx) error {
dataRoot, err := hashutil.HashProto(idxAttestation.Data)
if err != nil {
return errors.Wrap(err, "failed to hash indexed attestation data.")
}
protoIdxAtt := &slashpb.ValidatorIDToIdxAtt{
protoIdxAtt := &slashpb.CompressedIdxAtt{
Signature: idxAttestation.Signature,
Indices: idxAttestation.AttestingIndices,
DataRoot: dataRoot[:],
}

key := bytesutil.Bytes8(idxAttestation.Data.Target.Epoch)
bucket := tx.Bucket(indexedAttestationsIndicesBucket)
bucket := tx.Bucket(compressedIdxAttsBucket)
enc := bucket.Get(key)
vIdxList, err := unmarshalValIDsToIdxAttList(enc)
compressedIdxAttList, err := unmarshalCompressedIdxAttList(enc)
if err != nil {
return errors.Wrap(err, "failed to decode value into ValidatorIDToIndexedAttestationList")
return errors.Wrap(err, "failed to decode value into CompressedIdxAtt")
}
vIdxList.IndicesList = append(vIdxList.IndicesList, protoIdxAtt)
enc, err = proto.Marshal(vIdxList)
compressedIdxAttList.List = append(compressedIdxAttList.List, protoIdxAtt)
enc, err = proto.Marshal(compressedIdxAttList)
if err != nil {
return errors.Wrap(err, "failed to marshal")
}
Expand Down Expand Up @@ -281,26 +281,26 @@ func removeIdxAttIndicesByEpochFromDB(idxAttestation *ethpb.IndexedAttestation,
if err != nil {
return err
}
protoIdxAtt := &slashpb.ValidatorIDToIdxAtt{
protoIdxAtt := &slashpb.CompressedIdxAtt{
Signature: idxAttestation.Signature,
Indices: idxAttestation.AttestingIndices,
DataRoot: dataRoot[:],
}
key := bytesutil.Bytes8(idxAttestation.Data.Target.Epoch)
bucket := tx.Bucket(indexedAttestationsIndicesBucket)
bucket := tx.Bucket(compressedIdxAttsBucket)
enc := bucket.Get(key)
if enc == nil {
return errors.New("requested to delete data that is not present")
}
vIdxList, err := unmarshalValIDsToIdxAttList(enc)
vIdxList, err := unmarshalCompressedIdxAttList(enc)
if err != nil {
return errors.Wrap(err, "failed to decode value into ValidatorIDToIndexedAttestationList")
}
for i, attIdx := range vIdxList.IndicesList {
for i, attIdx := range vIdxList.List {
if reflect.DeepEqual(attIdx, protoIdxAtt) {
copy(vIdxList.IndicesList[i:], vIdxList.IndicesList[i+1:])
vIdxList.IndicesList[len(vIdxList.IndicesList)-1] = nil // or the zero value of T
vIdxList.IndicesList = vIdxList.IndicesList[:len(vIdxList.IndicesList)-1]
copy(vIdxList.List[i:], vIdxList.List[i+1:])
vIdxList.List[len(vIdxList.List)-1] = nil // or the zero value of T
vIdxList.List = vIdxList.List[:len(vIdxList.List)-1]
break
}
}
Expand Down Expand Up @@ -330,8 +330,8 @@ func (db *Store) pruneAttHistory(currentEpoch uint64, historySize uint64) error
}
}

idxBucket := tx.Bucket(indexedAttestationsIndicesBucket)
c = tx.Bucket(indexedAttestationsIndicesBucket).Cursor()
idxBucket := tx.Bucket(compressedIdxAttsBucket)
c = tx.Bucket(compressedIdxAttsBucket).Cursor()
for k, _ := c.First(); k != nil && bytes.Compare(k[:8], max) <= 0; k, _ = c.Next() {
if err := idxBucket.Delete(k); err != nil {
return errors.Wrap(err, "failed to delete indexed attestation from historical bucket")
Expand Down
2 changes: 1 addition & 1 deletion slasher/db/schema.go
Expand Up @@ -11,7 +11,7 @@ var (
historicIndexedAttestationsBucket = []byte("historic-indexed-attestations-bucket")
historicBlockHeadersBucket = []byte("historic-block-headers-bucket")
slashingBucket = []byte("slashing-bucket")
indexedAttestationsIndicesBucket = []byte("indexed-attestations-indices-bucket")
compressedIdxAttsBucket = []byte("compressed-idx-atts-bucket")
validatorsPublicKeysBucket = []byte("validators-public-keys-bucket")
// In order to quickly detect surround and surrounded attestations we need to store
// the min and max span for each validator for each epoch.
Expand Down
2 changes: 1 addition & 1 deletion slasher/rpc/server.go
Expand Up @@ -141,7 +141,7 @@ func (ss *Server) UpdateSpanMaps(ctx context.Context, req *ethpb.IndexedAttestat
func (ss *Server) IsSlashableBlock(ctx context.Context, psr *slashpb.ProposerSlashingRequest) (*slashpb.ProposerSlashingResponse, error) {
//TODO(#3133): add signature validation
epoch := helpers.SlotToEpoch(psr.BlockHeader.Header.Slot)
blockHeaders, err := ss.SlasherDB.BlockHeader(epoch, psr.ValidatorIndex)
blockHeaders, err := ss.SlasherDB.BlockHeaders(epoch, psr.ValidatorIndex)
if err != nil {
return nil, errors.Wrap(err, "slasher service error while trying to retrieve blocks")
}
Expand Down
1 change: 1 addition & 0 deletions slasher/service/BUILD.bazel
Expand Up @@ -14,6 +14,7 @@ go_library(
"//shared/cmd:go_default_library",
"//shared/debug:go_default_library",
"//shared/params:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/version:go_default_library",
"//slasher/db:go_default_library",
"//slasher/flags:go_default_library",
Expand Down