Skip to content

Commit

Permalink
Implement ListIndexedAttestations Endpoint in Prysm (#4892)
Browse files Browse the repository at this point in the history
* update patch and workspace

* stub methods

* implementation of indexed attestations list

* include latest ethereumapis

* update request type

* compute committee pure function

* use compute committee helper

* add test into list indexed attestations

* regenerate mock

* imports and out of range check

* test passing for archived epoch

* add comment

* comment

* better comment on func

* throw in continue instead
  • Loading branch information
rauljordan committed Feb 17, 2020
1 parent d7db8b1 commit 5db8c5a
Show file tree
Hide file tree
Showing 8 changed files with 546 additions and 122 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Expand Up @@ -1272,7 +1272,7 @@ go_repository(

go_repository(
name = "com_github_prysmaticlabs_ethereumapis",
commit = "b7452dde4ca361809def4ed5924ab3cb7ad1299a",
commit = "53ccc146f7f488c5c7634530057f4aedf510a9ac",
importpath = "github.com/prysmaticlabs/ethereumapis",
patch_args = ["-p1"],
patches = [
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/beacon/BUILD.bazel
Expand Up @@ -30,6 +30,7 @@ go_library(
"//beacon-chain/operations/slashings:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/attestationutil:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/pagination:go_default_library",
Expand Down Expand Up @@ -73,6 +74,7 @@ go_test(
"//beacon-chain/rpc/testing:go_default_library",
"//beacon-chain/state:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/attestationutil:go_default_library",
"//shared/params:go_default_library",
"//shared/slotutil/testing:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
Expand Down
103 changes: 103 additions & 0 deletions beacon-chain/rpc/beacon/attestations.go
Expand Up @@ -8,9 +8,12 @@ import (
ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/pagination"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -110,6 +113,97 @@ func (bs *Server) ListAttestations(
}, nil
}

// ListIndexedAttestations retrieves indexed attestations by target epoch.
// IndexedAttestations are sorted by data slot by default. Either a target epoch filter
// or a boolean filter specifying a request for genesis epoch attestations may be used.
//
// The server may return an empty list when no attestations match the given
// filter criteria. This RPC should not return NOT_FOUND. Only one filter
// criteria should be used.
func (bs *Server) ListIndexedAttestations(
ctx context.Context, req *ethpb.ListIndexedAttestationsRequest,
) (*ethpb.ListIndexedAttestationsResponse, error) {
atts := make([]*ethpb.Attestation, 0)
var err error
epoch := helpers.SlotToEpoch(bs.GenesisTimeFetcher.CurrentSlot())
switch q := req.QueryFilter.(type) {
case *ethpb.ListIndexedAttestationsRequest_TargetEpoch:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(q.TargetEpoch))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
epoch = q.TargetEpoch
case *ethpb.ListIndexedAttestationsRequest_GenesisEpoch:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(0))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
epoch = 0
default:
return nil, status.Error(codes.InvalidArgument, "Must specify a filter criteria for fetching attestations")
}
// We sort attestations according to the Sortable interface.
sort.Sort(sortableAttestations(atts))
numAttestations := len(atts)

// If there are no attestations, we simply return a response specifying this.
// Otherwise, attempting to paginate 0 attestations below would result in an error.
if numAttestations == 0 {
return &ethpb.ListIndexedAttestationsResponse{
IndexedAttestations: make([]*ethpb.IndexedAttestation, 0),
TotalSize: int32(0),
NextPageToken: strconv.Itoa(0),
}, nil
}

committeesBySlot, _, err := bs.retrieveCommitteesForEpoch(ctx, epoch)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"Could not retrieve committees for epoch %d: %v",
epoch,
err,
)
}

// We use the retrieved committees for the epoch to convert all attestations
// into indexed form effectively.
indexedAtts := make([]*ethpb.IndexedAttestation, numAttestations, numAttestations)
startSlot := helpers.StartSlot(epoch)
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch
for i := 0; i < len(indexedAtts); i++ {
att := atts[i]
// Out of range check, the attestation slot cannot be greater
// the last slot of the requested epoch or smaller than its start slot
// given committees are accessed as a map of slot -> commitees list, where there are
// SLOTS_PER_EPOCH keys in the map.
if att.Data.Slot < startSlot || att.Data.Slot > endSlot {
continue
}
committee := committeesBySlot[att.Data.Slot].Committees[att.Data.CommitteeIndex]
idxAtt, err := attestationutil.ConvertToIndexed(ctx, atts[i], committee.ValidatorIndices)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"Could not convert attestation with slot %d to indexed form: %v",
att.Data.Slot,
err,
)
}
indexedAtts[i] = idxAtt
}

start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), len(indexedAtts))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not paginate attestations: %v", err)
}
return &ethpb.ListIndexedAttestationsResponse{
IndexedAttestations: indexedAtts[start:end],
TotalSize: int32(len(indexedAtts)),
NextPageToken: nextPageToken,
}, nil
}

// StreamAttestations to clients at the end of every slot. This method retrieves the
// aggregated attestations currently in the pool at the start of a slot and sends
// them over a gRPC stream.
Expand All @@ -133,6 +227,15 @@ func (bs *Server) StreamAttestations(
}
}

// StreamIndexedAttestations to clients at the end of every slot. This method retrieves the
// aggregated attestations currently in the pool, converts them into indexed form, and
// sends them over a gRPC stream.
func (bs *Server) StreamIndexedAttestations(
_ *ptypes.Empty, stream ethpb.BeaconChain_StreamIndexedAttestationsServer,
) error {
return status.Error(codes.Unimplemented, "Unimplemented")
}

// AttestationPool retrieves pending attestations.
//
// The server returns a list of attestations that have been seen but not
Expand Down
193 changes: 193 additions & 0 deletions beacon-chain/rpc/beacon/attestations_test.go
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/gogo/protobuf/proto"
ptypes "github.com/gogo/protobuf/types"
Expand All @@ -15,12 +16,14 @@ import (
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
dbTest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
mockRPC "github.com/prysmaticlabs/prysm/beacon-chain/rpc/testing"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/params"
mocktick "github.com/prysmaticlabs/prysm/shared/slotutil/testing"
)
Expand Down Expand Up @@ -536,6 +539,196 @@ func TestServer_ListAttestations_Pagination_DefaultPageSize(t *testing.T) {
}
}

func TestServer_ListIndexedAttestations_GenesisEpoch(t *testing.T) {
db := dbTest.SetupDB(t)
defer dbTest.TeardownDB(t, db)
helpers.ClearCache()
ctx := context.Background()

count := params.BeaconConfig().SlotsPerEpoch
atts := make([]*ethpb.Attestation, 0, count)
for i := uint64(0); i < count; i++ {
attExample := &ethpb.Attestation{
Data: &ethpb.AttestationData{
BeaconBlockRoot: []byte("root"),
Slot: i,
CommitteeIndex: 0,
Target: &ethpb.Checkpoint{
Epoch: 0,
Root: make([]byte, 32),
},
},
AggregationBits: bitfield.Bitlist{0b11},
}
atts = append(atts, attExample)
}
if err := db.SaveAttestations(ctx, atts); err != nil {
t.Fatal(err)
}

// We setup 128 validators.
numValidators := 128
headState := setupActiveValidators(t, db, numValidators)

randaoMixes := make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector)
for i := 0; i < len(randaoMixes); i++ {
randaoMixes[i] = make([]byte, 32)
}
if err := headState.SetRandaoMixes(randaoMixes); err != nil {
t.Fatal(err)
}

activeIndices, err := helpers.ActiveValidatorIndices(headState, 0)
if err != nil {
t.Fatal(err)
}
epoch := uint64(0)
attesterSeed, err := helpers.Seed(headState, epoch, params.BeaconConfig().DomainBeaconAttester)
if err != nil {
t.Fatal(err)
}
committees, err := computeCommittees(helpers.StartSlot(epoch), activeIndices, attesterSeed)
if err != nil {
t.Fatal(err)
}

// Next up we convert the test attestations to indexed form:
indexedAtts := make([]*ethpb.IndexedAttestation, len(atts), len(atts))
for i := 0; i < len(indexedAtts); i++ {
att := atts[i]
committee := committees[att.Data.Slot].Committees[att.Data.CommitteeIndex]
idxAtt, err := attestationutil.ConvertToIndexed(ctx, atts[i], committee.ValidatorIndices)
if err != nil {
t.Fatalf("Could not convert attestation to indexed: %v", err)
}
indexedAtts[i] = idxAtt
}

bs := &Server{
BeaconDB: db,
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{
Genesis: time.Now(),
},
}

res, err := bs.ListIndexedAttestations(ctx, &ethpb.ListIndexedAttestationsRequest{
QueryFilter: &ethpb.ListIndexedAttestationsRequest_GenesisEpoch{
GenesisEpoch: true,
},
})
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(indexedAtts, res.IndexedAttestations) {
t.Fatalf(
"Incorrect list indexed attestations response: wanted %v, received %v",
indexedAtts,
res.IndexedAttestations,
)
}
}

func TestServer_ListIndexedAttestations_ArchivedEpoch(t *testing.T) {
db := dbTest.SetupDB(t)
defer dbTest.TeardownDB(t, db)
helpers.ClearCache()
ctx := context.Background()

count := params.BeaconConfig().SlotsPerEpoch
atts := make([]*ethpb.Attestation, 0, count)
startSlot := helpers.StartSlot(50)
epoch := uint64(50)
for i := startSlot; i < count; i++ {
attExample := &ethpb.Attestation{
Data: &ethpb.AttestationData{
BeaconBlockRoot: []byte("root"),
Slot: i,
CommitteeIndex: 0,
Target: &ethpb.Checkpoint{
Epoch: epoch,
Root: make([]byte, 32),
},
},
AggregationBits: bitfield.Bitlist{0b11},
}
atts = append(atts, attExample)
}
if err := db.SaveAttestations(ctx, atts); err != nil {
t.Fatal(err)
}

// We setup 128 validators.
numValidators := 128
headState := setupActiveValidators(t, db, numValidators)

randaoMixes := make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector)
for i := 0; i < len(randaoMixes); i++ {
randaoMixes[i] = make([]byte, 32)
}
if err := headState.SetRandaoMixes(randaoMixes); err != nil {
t.Fatal(err)
}
if err := headState.SetSlot(startSlot); err != nil {
t.Fatal(err)
}

activeIndices, err := helpers.ActiveValidatorIndices(headState, epoch)
if err != nil {
t.Fatal(err)
}
attesterSeed, err := helpers.Seed(headState, epoch, params.BeaconConfig().DomainBeaconAttester)
if err != nil {
t.Fatal(err)
}
committees, err := computeCommittees(epoch, activeIndices, attesterSeed)
if err != nil {
t.Fatal(err)
}

// Next up we convert the test attestations to indexed form:
indexedAtts := make([]*ethpb.IndexedAttestation, len(atts), len(atts))
for i := 0; i < len(indexedAtts); i++ {
att := atts[i]
committee := committees[att.Data.Slot].Committees[att.Data.CommitteeIndex]
idxAtt, err := attestationutil.ConvertToIndexed(ctx, atts[i], committee.ValidatorIndices)
if err != nil {
t.Fatalf("Could not convert attestation to indexed: %v", err)
}
indexedAtts[i] = idxAtt
}

bs := &Server{
BeaconDB: db,
HeadFetcher: &mock.ChainService{
State: headState,
},
GenesisTimeFetcher: &mock.ChainService{
Genesis: time.Now(),
},
}

res, err := bs.ListIndexedAttestations(ctx, &ethpb.ListIndexedAttestationsRequest{
QueryFilter: &ethpb.ListIndexedAttestationsRequest_TargetEpoch{
TargetEpoch: epoch,
},
})
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(indexedAtts, res.IndexedAttestations) {
t.Fatalf(
"Incorrect list indexed attestations response: wanted %v, received %v",
indexedAtts,
res.IndexedAttestations,
)
}
}

func TestServer_AttestationPool_Pagination_ExceedsMaxPageSize(t *testing.T) {
ctx := context.Background()
bs := &Server{}
Expand Down

0 comments on commit 5db8c5a

Please sign in to comment.