Skip to content

Commit

Permalink
Slasher batch db read and a write (#5534)
Browse files Browse the repository at this point in the history
* batch db read

* fix test

* debug

* write function and test

* test rename

* add to interface

* change order

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
shayzluf and prylabs-bulldozer[bot] committed Apr 21, 2020
1 parent 199c50b commit 8cba109
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 0 deletions.
2 changes: 2 additions & 0 deletions slasher/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ReadOnlyDatabase interface {
// MinMaxSpan related methods.
EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]detectionTypes.Span, error)
EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uint64, epoch uint64) (detectionTypes.Span, error)
EpochsSpanByValidatorsIndices(ctx context.Context, validatorIndices []uint64, maxEpoch uint64) (map[uint64]map[uint64]detectionTypes.Span, error)

// ProposerSlashing related methods.
ProposalSlashingsByStatus(ctx context.Context, status types.SlashingStatus) ([]*ethpb.ProposerSlashing, error)
Expand Down Expand Up @@ -64,6 +65,7 @@ type WriteAccessDatabase interface {
SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap map[uint64]detectionTypes.Span) error
SaveValidatorEpochSpan(ctx context.Context, validatorIdx uint64, epoch uint64, spans detectionTypes.Span) error
SaveCachedSpansMaps(ctx context.Context) error
SaveEpochsSpanByValidatorsIndices(ctx context.Context, epochsSpans map[uint64]map[uint64]detectionTypes.Span) error
DeleteEpochSpans(ctx context.Context, validatorIdx uint64) error
DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx uint64, epoch uint64) error

Expand Down
62 changes: 62 additions & 0 deletions slasher/db/kv/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,68 @@ func (db *Store) EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uin
return spans, err
}

// EpochsSpanByValidatorsIndices accepts validator indices and epoch and
// returns all their previous corresponding spans for slashing detection epoch=> validator index => spammap.
// Returns empty map if no values exists and error on db error.
func (db *Store) EpochsSpanByValidatorsIndices(ctx context.Context, validatorIndices []uint64, maxEpoch uint64) (map[uint64]map[uint64]types.Span, error) {
ctx, span := trace.StartSpan(ctx, "slasherDB.EpochsSpanByValidatorsIndices")
defer span.End()

var err error
epochsSpanMap := make(map[uint64]map[uint64]types.Span)
err = db.view(func(tx *bolt.Tx) error {
b := tx.Bucket(validatorsMinMaxSpanBucket)
epoch := maxEpoch
epochBucket := b.Bucket(bytesutil.Bytes8(epoch))

for epochBucket != nil {
valSpans := make(map[uint64]types.Span, len(validatorIndices))
for _, v := range validatorIndices {
enc := epochBucket.Get(bytesutil.Bytes8(v))
value, err := unmarshalSpan(ctx, enc)
if err != nil {
return err
}
valSpans[v] = value
}
epochsSpanMap[epoch] = valSpans
if epoch == 0 {
break
}
epoch--
epochBucket = b.Bucket(bytesutil.Bytes8(epoch))
}
return nil
})
return epochsSpanMap, err
}

// SaveEpochsSpanByValidatorsIndices accepts epochs span maps by validator indices and
// writes them to db.
// Returns error on db write error.
func (db *Store) SaveEpochsSpanByValidatorsIndices(ctx context.Context, epochsSpans map[uint64]map[uint64]types.Span) error {
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveEpochsSpanByValidatorsIndices")
defer span.End()

err := db.update(func(tx *bolt.Tx) error {
b := tx.Bucket(validatorsMinMaxSpanBucket)
for epoch, indicesSpanMaps := range epochsSpans {
epochBucket, err := b.CreateBucketIfNotExists(bytesutil.Bytes8(epoch))
if err != nil {
return err
}
for idx, v := range indicesSpanMaps {
enc := marshalSpan(v)
if err := epochBucket.Put(bytesutil.Bytes8(idx), enc); err != nil {
return err
}
}
}
return nil
})
return err
}

// SaveValidatorEpochSpan accepts validator index epoch and spans returns.
// it reads the epoch spans from cache, updates it and save it back to cache
// if caching is enabled.
Expand Down
45 changes: 45 additions & 0 deletions slasher/db/kv/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,48 @@ func TestValidatorSpanMap_SaveCachedSpansMaps(t *testing.T) {
}
}
}

func TestStore_ReadWriteEpochsSpanByValidatorsIndices(t *testing.T) {
app := cli.App{}
set := flag.NewFlagSet("test", 0)
db := setupDB(t, cli.NewContext(&app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()

for _, tt := range spanTests {
err := db.SaveEpochSpansMap(ctx, tt.epoch, tt.spanMap)
if err != nil {
t.Fatalf("Save validator span map failed: %v", err)
}
}
res, err := db.EpochsSpanByValidatorsIndices(ctx, []uint64{1, 2, 3}, 3)
if err != nil {
t.Fatal(err)
}
if len(res) != len(spanTests) {
t.Errorf("Wanted map of %d elemets, received map of %d elements", len(spanTests), len(res))
}
for _, tt := range spanTests {
if !reflect.DeepEqual(res[tt.epoch], tt.spanMap) {
t.Errorf("Wanted span map to be equal to: %v , received span map: %v ", spanTests[0].spanMap, res[1])
}
}
teardownDB(t, db)
db = setupDB(t, cli.NewContext(&app, set, nil))
if err := db.SaveEpochsSpanByValidatorsIndices(ctx, res); err != nil {
t.Fatal(err)
}
res, err = db.EpochsSpanByValidatorsIndices(ctx, []uint64{1, 2, 3}, 3)
if err != nil {
t.Fatal(err)
}
if len(res) != len(spanTests) {
t.Errorf("Wanted map of %d elemets, received map of %d elements", len(spanTests), len(res))
}
for _, tt := range spanTests {
if !reflect.DeepEqual(res[tt.epoch], tt.spanMap) {
t.Errorf("Wanted span map to be equal to: %v , received span map: %v ", spanTests[0].spanMap, res[1])
}
}

}

0 comments on commit 8cba109

Please sign in to comment.