Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SubscribeCommitteeSubnet method #5299

Merged
merged 19 commits into from
Apr 4, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ go_repository(

go_repository(
name = "com_github_prysmaticlabs_ethereumapis",
commit = "90224d88dd65813ba2f35197bb1bb8915b2d0c6b",
commit = "3f6a75ac9460621b140270b90057a5a445d66436",
importpath = "github.com/prysmaticlabs/ethereumapis",
patch_args = ["-p1"],
patches = [
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_test(
"attestation_data_test.go",
"checkpoint_state_test.go",
"committee_fuzz_test.go",
"committee_ids_test.go",
"committee_test.go",
"eth1_data_test.go",
"feature_flag_test.go",
Expand Down
22 changes: 13 additions & 9 deletions beacon-chain/cache/committee_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

lru "github.com/hashicorp/golang-lru"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
)

Expand All @@ -16,27 +17,30 @@ type committeeIDs struct {
var CommitteeIDs = newCommitteeIDs()

func newCommitteeIDs() *committeeIDs {
cache, err := lru.New(8)
maxCommitteesPerEpoch := int(params.BeaconConfig().MaxCommitteesPerSlot * params.BeaconConfig().SlotsPerEpoch)
cache, err := lru.New(maxCommitteesPerEpoch)
if err != nil {
panic(err)
}
return &committeeIDs{cache: cache}
}

// AddIDs to the cache for attestation committees by epoch.
func (t *committeeIDs) AddIDs(indices []uint64, epoch uint64) {
// AddID adds committee ID for subscribing subnet for the attester and/or aggregator of a given slot.
func (t *committeeIDs) AddID(slot uint64, committeeID uint64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is the correct way for this , we restrict this to only 1 epoch. We need an epoch lookahead for our validators.

t.lock.Lock()
defer t.lock.Unlock()
val, exists := t.cache.Get(epoch)

committeeIDs := []uint64{committeeID}
val, exists := t.cache.Get(slot)
if exists {
indices = sliceutil.UnionUint64(append(indices, val.([]uint64)...))
committeeIDs = sliceutil.UnionUint64(append(val.([]uint64), committeeIDs...))
}
t.cache.Add(epoch, indices)
t.cache.Add(slot, committeeIDs)
}

// GetIDs from the cache for attestation committees by epoch.
func (t *committeeIDs) GetIDs(epoch uint64) []uint64 {
val, exists := t.cache.Get(epoch)
// GetIDs gets the committee ID for subscribing subnet for attester and/or aggregator of the slot.
func (t *committeeIDs) GetIDs(slot uint64) []uint64 {
val, exists := t.cache.Get(slot)
if !exists {
return []uint64{}
}
Expand Down
33 changes: 33 additions & 0 deletions beacon-chain/cache/committee_ids_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package cache

import (
"reflect"
"testing"
)

func TestCommitteeIDCache_RoundTrip(t *testing.T) {
c := newCommitteeIDs()
slot := uint64(100)
committeeIDs := c.GetIDs(slot)
if len(committeeIDs) != 0 {
t.Errorf("Empty cache returned an object: %v", committeeIDs)
}

c.AddID(slot, 1)
res := c.GetIDs(slot)
if !reflect.DeepEqual(res, []uint64{1}) {
t.Error("Expected equal value to return from cache")
}

c.AddID(slot, 2)
res = c.GetIDs(slot)
if !reflect.DeepEqual(res, []uint64{1, 2}) {
t.Error("Expected equal value to return from cache")
}

c.AddID(slot, 3)
res = c.GetIDs(slot)
if !reflect.DeepEqual(res, []uint64{1, 2, 3}) {
t.Error("Expected equal value to return from cache")
}
}
2 changes: 1 addition & 1 deletion beacon-chain/p2p/subnets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {

// Update ENR of a peer.
testService := &Service{dv5Listener: listeners[0]}
cache.CommitteeIDs.AddIDs([]uint64{10}, 0)
cache.CommitteeIDs.AddID(0, 10)
testService.RefreshENR(0)
time.Sleep(2 * time.Second)

Expand Down
7 changes: 0 additions & 7 deletions beacon-chain/rpc/validator/assignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -87,11 +85,6 @@ func (vs *Server) GetDuties(ctx context.Context, req *ethpb.DutiesRequest) (*eth

}

if featureconfig.Get().EnableDynamicCommitteeSubnets {
cache.CommitteeIDs.AddIDs(committeeIDs, req.Epoch)
cache.CommitteeIDs.AddIDs(nextCommitteeIDs, req.Epoch+1)
}

return &ethpb.DutiesResponse{
Duties: validatorAssignments,
}, nil
Expand Down
23 changes: 11 additions & 12 deletions beacon-chain/rpc/validator/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
Expand Down Expand Up @@ -36,12 +37,6 @@ func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.Attestation
trace.Int64Attribute("committeeIndex", int64(req.CommitteeIndex)),
)

// If attestation committee subnets are enabled, we track the committee
// index into a cache.
if featureconfig.Get().EnableDynamicCommitteeSubnets {
cache.CommitteeIDs.AddIDs([]uint64{req.CommitteeIndex}, helpers.SlotToEpoch(req.Slot))
}

if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
Expand Down Expand Up @@ -159,12 +154,6 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
return nil, status.Error(codes.InvalidArgument, "Incorrect attestation signature")
}

// If attestation committee subnets are enabled, we track the committee
// index into a cache.
if featureconfig.Get().EnableDynamicCommitteeSubnets {
cache.CommitteeIDs.AddIDs([]uint64{att.Data.CommitteeIndex}, helpers.SlotToEpoch(att.Data.Slot))
}

root, err := ssz.HashTreeRoot(att.Data)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not tree hash attestation: %v", err)
Expand Down Expand Up @@ -235,3 +224,13 @@ func (vs *Server) waitToOneThird(ctx context.Context, slot uint64) {
}
}
}

// SubscribeCommitteeSubnet subscribes to the committee ID subnet given subscribe request.
func (vs *Server) SubscribeCommitteeSubnet(ctx context.Context, req *ethpb.CommitteeSubnetSubscribeRequest) (*ptypes.Empty, error) {
if req.IsAggregator {
cache.CommitteeIDs.AddID(req.Slot, req.CommitteeId)
} else {
// What should be done for an attester?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this?

}
return &ptypes.Empty{}, nil
}
22 changes: 12 additions & 10 deletions third_party/com_github_prysmaticlabs_ethereumapis-tags.patch
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ index 8ee263b..60607de 100644

// The epoch for which this set of validator assignments is valid.
diff --git a/eth/v1alpha1/validator.proto b/eth/v1alpha1/validator.proto
index 068a04c..80f73b8 100644
index 3455de7..f4fab59 100644
--- a/eth/v1alpha1/validator.proto
+++ b/eth/v1alpha1/validator.proto
@@ -15,6 +15,7 @@ syntax = "proto3";
Expand All @@ -458,7 +458,7 @@ index 068a04c..80f73b8 100644
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "eth/v1alpha1/beacon_block.proto";
@@ -197,7 +198,7 @@ message DomainResponse {
@@ -208,7 +209,7 @@ message DomainResponse {

message ValidatorActivationRequest {
// A list of 48 byte validator public keys.
Expand All @@ -467,7 +467,7 @@ index 068a04c..80f73b8 100644
}

message ValidatorActivationResponse {
@@ -223,7 +224,7 @@ message ChainStartResponse {
@@ -234,7 +235,7 @@ message ChainStartResponse {

message ValidatorIndexRequest {
// A 48 byte validator public key.
Expand All @@ -476,7 +476,7 @@ index 068a04c..80f73b8 100644
}

message ValidatorIndexResponse {
@@ -233,7 +234,7 @@ message ValidatorIndexResponse {
@@ -244,7 +245,7 @@ message ValidatorIndexResponse {

message ValidatorStatusRequest {
// A 48 byte validator public key.
Expand All @@ -485,7 +485,7 @@ index 068a04c..80f73b8 100644
}

enum ValidatorStatus {
@@ -271,7 +272,7 @@ message DutiesRequest {
@@ -282,7 +283,7 @@ message DutiesRequest {
uint64 epoch = 1;

// Array of byte encoded BLS public keys.
Expand All @@ -494,7 +494,7 @@ index 068a04c..80f73b8 100644
}

message DutiesResponse {
@@ -290,7 +291,7 @@ message DutiesResponse {
@@ -301,7 +302,7 @@ message DutiesResponse {
uint64 proposer_slot = 4;

// 48 byte BLS public key for the validator who's assigned to perform a duty.
Expand All @@ -503,7 +503,7 @@ index 068a04c..80f73b8 100644

// The current status of the validator assigned to perform the duty.
ValidatorStatus status = 6;
@@ -305,15 +306,16 @@ message BlockRequest {
@@ -316,15 +317,16 @@ message BlockRequest {
uint64 slot = 1;

// Validator's 32 byte randao reveal secret of the current epoch.
Expand All @@ -523,7 +523,7 @@ index 068a04c..80f73b8 100644
}

message AttestationDataRequest {
@@ -326,7 +328,7 @@ message AttestationDataRequest {
@@ -337,7 +339,7 @@ message AttestationDataRequest {

message AttestResponse {
// The root of the attestation data successfully submitted to the beacon node.
Expand All @@ -532,7 +532,7 @@ index 068a04c..80f73b8 100644
}

message AggregateSelectionRequest {
@@ -335,10 +337,10 @@ message AggregateSelectionRequest {
@@ -346,10 +348,10 @@ message AggregateSelectionRequest {
// Committee index of the validator at the given slot.
uint64 committee_index = 2;
// 48 byte public key of the validator.
Expand All @@ -545,14 +545,16 @@ index 068a04c..80f73b8 100644
}

message AggregateSelectionResponse {
@@ -353,16 +355,16 @@ message SignedAggregateSubmitRequest {
@@ -364,7 +366,7 @@ message SignedAggregateSubmitRequest {

message SignedAggregateSubmitResponse {
// The 32 byte hash tree root of the aggregated attestation data.
- bytes attestation_data_root = 1;
+ bytes attestation_data_root = 1 [(gogoproto.moretags) = "ssz-size:\"32\""];
}

message CommitteeSubnetSubscribeRequest {
@@ -381,10 +383,10 @@ message CommitteeSubnetSubscribeRequest {
// An Ethereum 2.0 validator.
message Validator {
// 48 byte BLS public key used for the validator's activities.
Expand Down
7 changes: 7 additions & 0 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ func (v *validator) RolesAt(ctx context.Context, slot uint64) (map[[48]byte][]pb
roles = append(roles, pb.ValidatorRole_AGGREGATOR)
}

if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, &ethpb.CommitteeSubnetSubscribeRequest{
Slot: slot,
CommitteeId: duty.CommitteeIndex,
IsAggregator: aggregator,
}); err != nil {
return nil, err
}
}
if len(roles) == 0 {
roles = append(roles, pb.ValidatorRole_UNKNOWN)
Expand Down
16 changes: 16 additions & 0 deletions validator/client/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,22 @@ func TestRolesAt_OK(t *testing.T) {
gomock.Any(), // ctx
gomock.Any(), // epoch
).Return(&ethpb.DomainResponse{}, nil /*err*/)

m.validatorClient.EXPECT().DomainData(
gomock.Any(), // ctx
gomock.Any(), // epoch
).Return(&ethpb.DomainResponse{}, nil /*err*/)

m.validatorClient.EXPECT().SubscribeCommitteeSubnet(
gomock.Any(), // ctx
gomock.Any(), // epoch
).Return(nil, nil /*err*/)

m.validatorClient.EXPECT().SubscribeCommitteeSubnet(
gomock.Any(), // ctx
gomock.Any(), // epoch
).Return(nil, nil /*err*/)

roleMap, err := v.RolesAt(context.Background(), 1)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -671,6 +682,11 @@ func TestRolesAt_DoesNotAssignProposer_Slot0(t *testing.T) {
gomock.Any(), // epoch
).Return(&ethpb.DomainResponse{}, nil /*err*/)

m.validatorClient.EXPECT().SubscribeCommitteeSubnet(
gomock.Any(), // ctx
gomock.Any(), // epoch
).Return(nil, nil /*err*/)

roleMap, err := v.RolesAt(context.Background(), 0)
if err != nil {
t.Fatal(err)
Expand Down
21 changes: 21 additions & 0 deletions validator/internal/beacon_node_validator_service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.