Skip to content

Commit

Permalink
Change StreamIndexedAttestations to use target root for state regen (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKiwi committed May 26, 2020
1 parent ad1d807 commit ecea979
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 69 deletions.
46 changes: 26 additions & 20 deletions beacon-chain/rpc/beacon/attestations.go
Expand Up @@ -38,7 +38,7 @@ func (s sortableAttestations) Less(i, j int) bool {
return s[i].Data.Slot < s[j].Data.Slot
}

func mapAttestationsByTargetRoot(ctx context.Context, atts []*ethpb.Attestation) map[[32]byte][]*ethpb.Attestation {
func mapAttestationsByTargetRoot(atts []*ethpb.Attestation) map[[32]byte][]*ethpb.Attestation {
attsMap := make(map[[32]byte][]*ethpb.Attestation)
if len(atts) == 0 {
return attsMap
Expand Down Expand Up @@ -155,7 +155,7 @@ func (bs *Server) ListIndexedAttestations(
}
// We use the retrieved committees for the block root to convert all attestations
// into indexed form effectively.
mappedAttestations := mapAttestationsByTargetRoot(ctx, attsArray)
mappedAttestations := mapAttestationsByTargetRoot(attsArray)
indexedAtts := make([]*ethpb.IndexedAttestation, 0, numAttestations)
for targetRoot, atts := range mappedAttestations {
attState, err := bs.StateGen.StateByRoot(ctx, targetRoot)
Expand Down Expand Up @@ -246,6 +246,7 @@ func (bs *Server) StreamIndexedAttestations(
case event, ok := <-attestationsChannel:
if !ok {
log.Error("Indexed attestations stream channel closed")
continue
}
if event.Type == operation.UnaggregatedAttReceived {
data, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
Expand Down Expand Up @@ -274,18 +275,10 @@ func (bs *Server) StreamIndexedAttestations(
}
bs.ReceivedAttestationsBuffer <- data.Attestation.Aggregate
}
case atts, ok := <-bs.CollectedAttestationsBuffer:
case aggAtts, ok := <-bs.CollectedAttestationsBuffer:
if !ok {
log.Error("Indexed attestations stream collected attestations channel closed")
}
// We aggregate the received attestations.
aggAtts, err := helpers.AggregateAttestations(atts)
if err != nil {
return status.Errorf(
codes.Internal,
"Could not aggregate attestations: %v",
err,
)
continue
}
if len(aggAtts) == 0 {
continue
Expand All @@ -294,19 +287,20 @@ func (bs *Server) StreamIndexedAttestations(
// have the same data root, so we just use the target epoch from
// the first one to determine committees for converting into indexed
// form.
epoch := aggAtts[0].Data.Target.Epoch
committeesBySlot, _, err := bs.retrieveCommitteesForEpoch(stream.Context(), epoch)
targetRoot := aggAtts[0].Data.Target.Root
targetEpoch := aggAtts[0].Data.Target.Epoch
committeesBySlot, _, err := bs.retrieveCommitteesForRoot(stream.Context(), targetRoot)
if err != nil {
return status.Errorf(
codes.Internal,
"Could not retrieve committees for epoch %d: %v",
epoch,
"Could not retrieve committees for target root %#x: %v",
targetRoot,
err,
)
}
// We use the retrieved committees for the epoch to convert all attestations
// into indexed form effectively.
startSlot := helpers.StartSlot(epoch)
startSlot := helpers.StartSlot(targetEpoch)
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch
for _, att := range aggAtts {
// Out of range check, the attestation slot cannot be greater
Expand Down Expand Up @@ -343,11 +337,23 @@ func (bs *Server) collectReceivedAttestations(ctx context.Context) {
for {
select {
case <-ticker.C:
aggregatedAttsByTarget := make(map[[32]byte][]*ethpb.Attestation)
for root, atts := range attsByRoot {
if len(atts) > 0 {
bs.CollectedAttestationsBuffer <- atts
attsByRoot[root] = make([]*ethpb.Attestation, 0)
// We aggregate the received attestations, we know they all have the same data root.
aggAtts, err := helpers.AggregateAttestations(atts)
if err != nil {
log.WithError(err).Error("Could not aggregate collected attestations")
continue
}
if len(aggAtts) == 0 {
continue
}
targetRoot := bytesutil.ToBytes32(atts[0].Data.Target.Root)
aggregatedAttsByTarget[targetRoot] = append(aggregatedAttsByTarget[targetRoot], aggAtts...)
attsByRoot[root] = make([]*ethpb.Attestation, 0)
}
for _, atts := range aggregatedAttsByTarget {
bs.CollectedAttestationsBuffer <- atts
}
case att := <-bs.ReceivedAttestationsBuffer:
attDataRoot, err := ssz.HashTreeRoot(att.Data)
Expand Down
106 changes: 62 additions & 44 deletions beacon-chain/rpc/beacon/attestations_test.go
Expand Up @@ -528,8 +528,6 @@ func TestServer_ListAttestations_Pagination_DefaultPageSize(t *testing.T) {
}

func TestServer_mapAttestationToTargetRoot(t *testing.T) {
ctx := context.Background()

count := uint64(100)
atts := make([]*ethpb.Attestation, count, count)
targetRoot1 := bytesutil.ToBytes32([]byte("root1"))
Expand All @@ -552,7 +550,7 @@ func TestServer_mapAttestationToTargetRoot(t *testing.T) {
}

}
mappedAtts := mapAttestationsByTargetRoot(ctx, atts)
mappedAtts := mapAttestationsByTargetRoot(atts)
wantedMapLen := 2
wantedMapNumberOfElements := 50
if len(mappedAtts) != wantedMapLen {
Expand Down Expand Up @@ -1047,27 +1045,10 @@ func TestServer_StreamIndexedAttestations_OK(t *testing.T) {

count := params.BeaconConfig().SlotsPerEpoch
// We generate attestations for each validator per slot per epoch.
atts := make([]*ethpb.Attestation, 0, count)
atts := make(map[[32]byte][]*ethpb.Attestation)
for i := uint64(0); i < count; i++ {
comms := committees[i].Committees
for j := 0; j < numValidators; j++ {
attExample := &ethpb.Attestation{
Data: &ethpb.AttestationData{
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
Slot: i,
Target: &ethpb.Checkpoint{
Epoch: 0,
Root: make([]byte, 32),
},
},
}
encoded, err := helpers.ComputeSigningRoot(attExample.Data, []byte{})
if err != nil {
t.Fatal(err)
}
sig := privKeys[j].Sign(encoded[:])
attExample.Signature = sig.Marshal()

var indexInCommittee uint64
var committeeIndex uint64
var committeeLength int
Expand All @@ -1086,28 +1067,34 @@ func TestServer_StreamIndexedAttestations_OK(t *testing.T) {
if !found {
continue
}
attExample := &ethpb.Attestation{
Data: &ethpb.AttestationData{
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
Slot: i,
Target: &ethpb.Checkpoint{
Epoch: 0,
Root: gRoot[:],
},
},
}
domain, err := helpers.Domain(headState.Fork(), 0, params.BeaconConfig().DomainBeaconAttester, headState.GenesisValidatorRoot())
if err != nil {
t.Fatal(err)
}
encoded, err := helpers.ComputeSigningRoot(attExample.Data, domain)
if err != nil {
t.Fatal(err)
}
sig := privKeys[j].Sign(encoded[:])
attExample.Signature = sig.Marshal()
attExample.Data.CommitteeIndex = committeeIndex
aggregationBitfield := bitfield.NewBitlist(uint64(committeeLength))
aggregationBitfield.SetBitAt(indexInCommittee, true)
attExample.AggregationBits = aggregationBitfield
atts = append(atts, attExample)
atts[encoded] = append(atts[encoded], attExample)
}
}

aggAtts, err := helpers.AggregateAttestations(atts)
if err != nil {
t.Fatal(err)
}

// Next up we convert the test attestations to indexed form.
indexedAtts := make([]*ethpb.IndexedAttestation, len(aggAtts), len(aggAtts))
for i := 0; i < len(indexedAtts); i++ {
att := aggAtts[i]
committee := committees[att.Data.Slot].Committees[att.Data.CommitteeIndex]
idxAtt := attestationutil.ConvertToIndexed(ctx, att, committee.ValidatorIndices)
indexedAtts[i] = idxAtt
}

chainService := &chainMock.ChainService{}
server := &Server{
BeaconDB: db,
Expand All @@ -1123,14 +1110,45 @@ func TestServer_StreamIndexedAttestations_OK(t *testing.T) {
StateGen: stategen.New(db, cache.NewStateSummaryCache()),
}

for dataRoot, sameDataAtts := range atts {
aggAtts, err := helpers.AggregateAttestations(sameDataAtts)
if err != nil {
t.Fatal(err)
}
atts[dataRoot] = aggAtts
}

// Next up we convert the test attestations to indexed form.
attsByTarget := make(map[[32]byte][]*ethpb.Attestation)
for _, dataRootAtts := range atts {
targetRoot := bytesutil.ToBytes32(dataRootAtts[0].Data.Target.Root)
attsByTarget[targetRoot] = append(attsByTarget[targetRoot], dataRootAtts...)
}

allAtts := make([]*ethpb.Attestation, 0)
indexedAtts := make(map[[32]byte][]*ethpb.IndexedAttestation)
for dataRoot, aggAtts := range attsByTarget {
allAtts = append(allAtts, aggAtts...)
for _, att := range aggAtts {
committee := committees[att.Data.Slot].Committees[att.Data.CommitteeIndex]
idxAtt := attestationutil.ConvertToIndexed(ctx, att, committee.ValidatorIndices)
indexedAtts[dataRoot] = append(indexedAtts[dataRoot], idxAtt)
}
}

attsSent := 0
mockStream := mock.NewMockBeaconChain_StreamIndexedAttestationsServer(ctrl)
for i := 0; i < len(indexedAtts); i++ {
if i == len(indexedAtts)-1 {
mockStream.EXPECT().Send(indexedAtts[i]).Do(func(arg0 interface{}) {
exitRoutine <- true
})
} else {
mockStream.EXPECT().Send(indexedAtts[i])
for _, atts := range indexedAtts {
for _, att := range atts {
if attsSent == len(allAtts)-1 {
mockStream.EXPECT().Send(att).Do(func(arg0 interface{}) {
exitRoutine <- true
})
t.Log("cancelled")
} else {
mockStream.EXPECT().Send(att)
attsSent++
}
}
}
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
Expand All @@ -1141,7 +1159,7 @@ func TestServer_StreamIndexedAttestations_OK(t *testing.T) {
}
}(t)

server.CollectedAttestationsBuffer <- atts
server.CollectedAttestationsBuffer <- allAtts
<-exitRoutine
}

Expand Down
34 changes: 34 additions & 0 deletions beacon-chain/rpc/beacon/committees.go
Expand Up @@ -105,6 +105,40 @@ func (bs *Server) retrieveCommitteesForEpoch(
return committeesListsBySlot, activeIndices, nil
}

// retrieveCommitteesForRoot uses the provided state root to get the current epoch committees.
// Note: This function is always recommended over retrieveCommitteesForEpoch as states are
// retrieved from the DB for this function, rather than generated.
func (bs *Server) retrieveCommitteesForRoot(
ctx context.Context,
root []byte,
) (map[uint64]*ethpb.BeaconCommittees_CommitteesList, []uint64, error) {
requestedState, err := bs.StateGen.StateByRoot(ctx, bytesutil.ToBytes32(root))
if err != nil {
return nil, nil, status.Error(codes.Internal, "Could not get state")
}
epoch := helpers.CurrentEpoch(requestedState)
seed, err := helpers.Seed(requestedState, epoch, params.BeaconConfig().DomainBeaconAttester)
if err != nil {
return nil, nil, status.Error(codes.Internal, "Could not get seed")
}
activeIndices, err := helpers.ActiveValidatorIndices(requestedState, epoch)
if err != nil {
return nil, nil, status.Error(codes.Internal, "Could not get active indices")
}

startSlot := helpers.StartSlot(epoch)
committeesListsBySlot, err := computeCommittees(startSlot, activeIndices, seed)
if err != nil {
return nil, nil, status.Errorf(
codes.InvalidArgument,
"Could not compute committees for epoch %d: %v",
epoch,
err,
)
}
return committeesListsBySlot, activeIndices, nil
}

func (bs *Server) retrieveCommitteesForEpochUsingOldArchival(
ctx context.Context,
epoch uint64,
Expand Down
80 changes: 80 additions & 0 deletions beacon-chain/rpc/beacon/committees_test.go
Expand Up @@ -257,6 +257,86 @@ func TestServer_ListBeaconCommittees_FromArchive(t *testing.T) {
}
}

func TestRetrieveCommitteesForRoot(t *testing.T) {
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{NewStateMgmt: true})
defer resetCfg()

db := dbTest.SetupDB(t)
helpers.ClearCache()
ctx := context.Background()

numValidators := 128
headState := setupActiveValidators(t, db, numValidators)

m := &mock.ChainService{
Genesis: roughtime.Now().Add(time.Duration(-1*int64((headState.Slot()*params.BeaconConfig().SecondsPerSlot))) * time.Second),
}
bs := &Server{
HeadFetcher: m,
GenesisTimeFetcher: m,
StateGen: stategen.New(db, cache.NewStateSummaryCache()),
}
b := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
if err := db.SaveBlock(ctx, b); err != nil {
t.Fatal(err)
}
gRoot, err := stateutil.BlockRoot(b.Block)
if err != nil {
t.Fatal(err)
}
if err := db.SaveGenesisBlockRoot(ctx, gRoot); err != nil {
t.Fatal(err)
}
if err := db.SaveState(ctx, headState, gRoot); err != nil {
t.Fatal(err)
}
stateSummary := &pbp2p.StateSummary{
Slot: 0,
Root: gRoot[:],
}
if err := db.SaveStateSummary(ctx, stateSummary); err != nil {
t.Fatal(err)
}

// Store the genesis seed.
seed, err := helpers.Seed(headState, 0, params.BeaconConfig().DomainBeaconAttester)
if err != nil {
t.Fatal(err)
}
if err := headState.SetSlot(params.BeaconConfig().SlotsPerEpoch * 10); err != nil {
t.Fatal(err)
}

activeIndices, err := helpers.ActiveValidatorIndices(headState, 0)
if err != nil {
t.Fatal(err)
}

wanted, err := computeCommittees(0, activeIndices, seed)
if err != nil {
t.Fatal(err)
}
committees, activeIndices, err := bs.retrieveCommitteesForRoot(context.Background(), gRoot[:])
if err != nil {
t.Fatal(err)
}

wantedRes := &ethpb.BeaconCommittees{
Epoch: 0,
Committees: wanted,
ActiveValidatorCount: uint64(numValidators),
}
receivedRes := &ethpb.BeaconCommittees{
Epoch: 0,
Committees: committees,
ActiveValidatorCount: uint64(len(activeIndices)),
}
if !reflect.DeepEqual(wantedRes, receivedRes) {
t.Errorf("Wanted %v", wantedRes)
t.Errorf("Received %v", receivedRes)
}
}

func setupActiveValidators(t *testing.T, db db.Database, count int) *stateTrie.BeaconState {
balances := make([]uint64, count)
validators := make([]*ethpb.Validator, 0, count)
Expand Down
1 change: 1 addition & 0 deletions slasher/beaconclient/BUILD.bazel
Expand Up @@ -56,6 +56,7 @@ go_test(
"//shared/event:go_default_library",
"//shared/mock:go_default_library",
"//shared/params:go_default_library",
"//shared/slotutil:go_default_library",
"//shared/testutil:go_default_library",
"//slasher/cache:go_default_library",
"//slasher/db/testing:go_default_library",
Expand Down

0 comments on commit ecea979

Please sign in to comment.