Skip to content

Commit

Permalink
Add a step filter for beacon DB to retrieve blocks (#4488)
Browse files Browse the repository at this point in the history
* Add a step filter for beacon DB to retrieve blocks
* Add a step filter for beacon DB to retrieve blocks
* gofmt
* Merge branch 'master' into db-step-filter
* Merge refs/heads/master into db-step-filter
* Merge refs/heads/master into db-step-filter
* Merge refs/heads/master into db-step-filter
* Merge refs/heads/master into db-step-filter
* fix tests
* Merge branch 'db-step-filter' of github.com:prysmaticlabs/prysm into db-step-filter
* Merge refs/heads/master into db-step-filter
  • Loading branch information
prestonvanloon authored and prylabs-bulldozer[bot] committed Jan 11, 2020
1 parent f6eea8e commit 7919074
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 2 deletions.
9 changes: 9 additions & 0 deletions beacon-chain/db/filters/filter.go
Expand Up @@ -39,6 +39,8 @@ const (
TargetEpoch FilterType = 8
// TargetRoot defines a filter for the target root attribute of objects.
TargetRoot FilterType = 9
// SlotStep is used for range filters of objects by their slot in step increments.
SlotStep FilterType = 10
)

// QueryFilter defines a generic interface for type-asserting
Expand Down Expand Up @@ -121,3 +123,10 @@ func (q *QueryFilter) SetEndEpoch(val uint64) *QueryFilter {
q.queries[EndEpoch] = val
return q
}

// SetSlotStep enables filtering by slot for every step interval. For example, a slot range query
// for blocks from 0 to 9 with a step of 2 would return objects at slot 0, 2, 4, 6, 8.
func (q *QueryFilter) SetSlotStep(val uint64) *QueryFilter {
q.queries[SlotStep] = val
return q
}
21 changes: 20 additions & 1 deletion beacon-chain/db/kv/blocks.go
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"strconv"

"github.com/boltdb/bolt"
"github.com/pkg/errors"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

Expand Down Expand Up @@ -88,6 +90,7 @@ func (k *Store) Blocks(ctx context.Context, f *filters.QueryFilter) ([]*ethpb.Si
filtersMap[filters.EndSlot],
filtersMap[filters.StartEpoch],
filtersMap[filters.EndEpoch],
filtersMap[filters.SlotStep],
)

// Once we have a list of block roots that correspond to each
Expand Down Expand Up @@ -150,6 +153,7 @@ func (k *Store) BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]b
filtersMap[filters.EndSlot],
filtersMap[filters.StartEpoch],
filtersMap[filters.EndEpoch],
filtersMap[filters.SlotStep],
)

// Once we have a list of block roots that correspond to each
Expand Down Expand Up @@ -361,15 +365,19 @@ func fetchBlockRootsBySlotRange(
endSlotEncoded interface{},
startEpochEncoded interface{},
endEpochEncoded interface{},
slotStepEncoded interface{},
) [][]byte {
var startSlot, endSlot uint64
var startSlot, endSlot, step uint64
var ok bool
if startSlot, ok = startSlotEncoded.(uint64); !ok {
startSlot = 0
}
if endSlot, ok = endSlotEncoded.(uint64); !ok {
endSlot = 0
}
if step, ok = slotStepEncoded.(uint64); !ok || step == 0 {
step = 1
}
startEpoch, startEpochOk := startEpochEncoded.(uint64)
endEpoch, endEpochOk := endEpochEncoded.(uint64)
if startEpochOk && endEpochOk {
Expand All @@ -391,6 +399,16 @@ func fetchBlockRootsBySlotRange(
roots := make([][]byte, 0)
c := bkt.Cursor()
for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() {
if step > 1 {
slot, err := strconv.ParseUint(string(k), 10, 64)
if err != nil {
log.WithError(err).Error("Cannot parse key to uint")
continue
}
if (slot-startSlot)%step != 0 {
continue
}
}
splitRoots := make([][]byte, 0)
for i := 0; i < len(v); i += 32 {
splitRoots = append(splitRoots, v[i:i+32])
Expand Down Expand Up @@ -441,6 +459,7 @@ func createBlockIndicesFromFilters(f *filters.QueryFilter) (map[string][]byte, e
case filters.EndSlot:
case filters.StartEpoch:
case filters.EndEpoch:
case filters.SlotStep:
default:
return nil, fmt.Errorf("filter criterion %v not supported for blocks", k)
}
Expand Down
32 changes: 32 additions & 0 deletions beacon-chain/db/kv/blocks_test.go
Expand Up @@ -387,3 +387,35 @@ func TestStore_Blocks_Retrieve_Epoch(t *testing.T) {
t.Errorf("Wanted %d, received %d", want, len(retrieved))
}
}

func TestStore_Blocks_Retrieve_SlotRangeWithStep(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
b := make([]*ethpb.SignedBeaconBlock, 500)
for i := 0; i < 500; i++ {
b[i] = &ethpb.SignedBeaconBlock{
Block: &ethpb.BeaconBlock{
ParentRoot: []byte("parent"),
Slot: uint64(i),
},
}
}
const step = 2
ctx := context.Background()
if err := db.SaveBlocks(ctx, b); err != nil {
t.Fatal(err)
}
retrieved, err := db.Blocks(ctx, filters.NewFilter().SetStartSlot(100).SetEndSlot(399).SetSlotStep(step))
if err != nil {
t.Fatal(err)
}
want := 150
if len(retrieved) != want {
t.Errorf("Wanted %d, received %d", want, len(retrieved))
}
for _, b := range retrieved {
if (b.Block.Slot-100)%step != 0 {
t.Errorf("Unexpect block slot %d", b.Block.Slot)
}
}
}
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_beacon_blocks_by_range.go
Expand Up @@ -62,7 +62,7 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
}
}

filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot)
filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(m.Step)
blks, err := r.db.Blocks(ctx, filter)
if err != nil {
log.WithError(err).Error("Failed to retrieve blocks")
Expand Down

0 comments on commit 7919074

Please sign in to comment.