Skip to content

Commit

Permalink
Stream Indexed Attestations RPC Implementation (#4941)
Browse files Browse the repository at this point in the history
* stream indexed attestations impl
* mock regen
* test for stream indexed
* atts test
* no bls
* gaz
* Merge refs/heads/master into implement-stream-indexed
* use feed for atts instead
* remove unused imports
* Merge refs/heads/master into implement-stream-indexed
* fix tests in beacon
* properly use pointers
* imports
* import
  • Loading branch information
rauljordan committed Feb 26, 2020
1 parent b1231f3 commit 22bbed0
Show file tree
Hide file tree
Showing 18 changed files with 450 additions and 70 deletions.
17 changes: 9 additions & 8 deletions beacon-chain/node/node.go
Expand Up @@ -408,14 +408,15 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
}

rs := prysmsync.NewRegularSync(&prysmsync.Config{
DB: b.db,
P2P: b.fetchP2P(ctx),
Chain: chainService,
InitialSync: initSync,
StateNotifier: b,
BlockNotifier: b,
AttPool: b.attestationPool,
ExitPool: b.exitPool,
DB: b.db,
P2P: b.fetchP2P(ctx),
Chain: chainService,
InitialSync: initSync,
StateNotifier: b,
BlockNotifier: b,
AttestationNotifier: b,
AttPool: b.attestationPool,
ExitPool: b.exitPool,
})

return b.services.RegisterService(rs)
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/rpc/BUILD.bazel
Expand Up @@ -28,7 +28,6 @@ go_library(
"//proto/slashing:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"//shared/slotutil:go_default_library",
"//shared/traceutil:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//recovery:go_default_library",
Expand Down
5 changes: 3 additions & 2 deletions beacon-chain/rpc/beacon/BUILD.bazel
Expand Up @@ -21,6 +21,7 @@ go_library(
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
Expand All @@ -40,7 +41,6 @@ go_library(
"//shared/pagination:go_default_library",
"//shared/params:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/slotutil:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
Expand Down Expand Up @@ -73,6 +73,7 @@ go_test(
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library",
Expand All @@ -85,7 +86,7 @@ go_test(
"//proto/beacon/p2p/v1:go_default_library",
"//shared/attestationutil:go_default_library",
"//shared/params:go_default_library",
"//shared/slotutil/testing:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
Expand Down
80 changes: 75 additions & 5 deletions beacon-chain/rpc/beacon/attestations.go
Expand Up @@ -8,6 +8,8 @@ import (
ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
Expand Down Expand Up @@ -210,12 +212,23 @@ func (bs *Server) ListIndexedAttestations(
func (bs *Server) StreamAttestations(
_ *ptypes.Empty, stream ethpb.BeaconChain_StreamAttestationsServer,
) error {
attestationsChannel := make(chan *feed.Event, 1)
attSub := bs.AttestationNotifier.OperationFeed().Subscribe(attestationsChannel)
defer attSub.Unsubscribe()
for {
select {
case <-bs.SlotTicker.C():
atts := bs.AttestationsPool.AggregatedAttestations()
for i := 0; i < len(atts); i++ {
if err := stream.Send(atts[i]); err != nil {
case event := <-attestationsChannel:
if event.Type == operation.UnaggregatedAttReceived {
data, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
if !ok {
// Got bad data over the stream.
continue
}
if data.Attestation == nil {
// One nil attestation shouldn't stop the stream.
continue
}
if err := stream.Send(data.Attestation); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
Expand All @@ -233,7 +246,64 @@ func (bs *Server) StreamAttestations(
func (bs *Server) StreamIndexedAttestations(
_ *ptypes.Empty, stream ethpb.BeaconChain_StreamIndexedAttestationsServer,
) error {
return status.Error(codes.Unimplemented, "Unimplemented")
attestationsChannel := make(chan *feed.Event, 1)
attSub := bs.AttestationNotifier.OperationFeed().Subscribe(attestationsChannel)
defer attSub.Unsubscribe()
for {
select {
case event := <-attestationsChannel:
if event.Type == operation.UnaggregatedAttReceived {
data, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
if !ok {
// Got bad data over the stream.
continue
}
if data.Attestation == nil {
// One nil attestation shouldn't stop the stream.
continue
}
epoch := helpers.SlotToEpoch(bs.HeadFetcher.HeadSlot())
committeesBySlot, _, err := bs.retrieveCommitteesForEpoch(stream.Context(), epoch)
if err != nil {
return status.Errorf(
codes.Internal,
"Could not retrieve committees for epoch %d: %v",
epoch,
err,
)
}
// We use the retrieved committees for the epoch to convert all attestations
// into indexed form effectively.
startSlot := helpers.StartSlot(epoch)
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch
att := data.Attestation
// Out of range check, the attestation slot cannot be greater
// the last slot of the requested epoch or smaller than its start slot
// given committees are accessed as a map of slot -> commitees list, where there are
// SLOTS_PER_EPOCH keys in the map.
if att.Data.Slot < startSlot || att.Data.Slot > endSlot {
continue
}
committee := committeesBySlot[att.Data.Slot].Committees[att.Data.CommitteeIndex]
idxAtt, err := attestationutil.ConvertToIndexed(stream.Context(), att, committee.ValidatorIndices)
if err != nil {
return status.Errorf(
codes.Internal,
"Could not convert attestation with slot %d to indexed form: %v",
att.Data.Slot,
err,
)
}
if err := stream.Send(idxAtt); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
case <-bs.Ctx.Done():
return status.Error(codes.Canceled, "Context canceled")
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
}
}
}

// AttestationPool retrieves pending attestations.
Expand Down

0 comments on commit 22bbed0

Please sign in to comment.