From 1b8db882a45de89159bc1fe1761e49ade413a488 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Sat, 4 Apr 2020 08:00:34 -0700 Subject: [PATCH] Implement `SubscribeCommitteeSubnet` method (#5299) * Add client implementation * Update workspace * Update server * Update service * Gaz * Mocks * Fixed validator tests * Add round tirp tests * Fixed subnet test * Comment * Update committee cache * Comment * Update RPC * Fixed test * Nishant's comment * Gaz * Refresh ENR is for epoch * Needs to be append --- WORKSPACE | 2 +- beacon-chain/cache/BUILD.bazel | 1 + beacon-chain/cache/committee_ids.go | 70 +++++++++++++++---- beacon-chain/cache/committee_ids_test.go | 56 +++++++++++++++ beacon-chain/p2p/BUILD.bazel | 1 + beacon-chain/p2p/service.go | 9 ++- beacon-chain/p2p/subnets_test.go | 2 +- beacon-chain/rpc/validator/assignments.go | 7 -- beacon-chain/rpc/validator/attester.go | 24 +++---- ...iber_committee_index_beacon_attestation.go | 6 +- ...thub_prysmaticlabs_ethereumapis-tags.patch | 22 +++--- validator/client/validator.go | 7 ++ validator/client/validator_test.go | 16 +++++ .../beacon_node_validator_service_mock.go | 21 ++++++ 14 files changed, 194 insertions(+), 50 deletions(-) create mode 100644 beacon-chain/cache/committee_ids_test.go diff --git a/WORKSPACE b/WORKSPACE index afa53914d82a..3d086b3ee006 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -1309,7 +1309,7 @@ go_repository( go_repository( name = "com_github_prysmaticlabs_ethereumapis", - commit = "90224d88dd65813ba2f35197bb1bb8915b2d0c6b", + commit = "3f6a75ac9460621b140270b90057a5a445d66436", importpath = "github.com/prysmaticlabs/ethereumapis", patch_args = ["-p1"], patches = [ diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index 9b508498b301..87454c83b8c5 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -41,6 +41,7 @@ go_test( "attestation_data_test.go", "checkpoint_state_test.go", "committee_fuzz_test.go", + "committee_ids_test.go", "committee_test.go", "eth1_data_test.go", "feature_flag_test.go", diff --git a/beacon-chain/cache/committee_ids.go b/beacon-chain/cache/committee_ids.go index 85672884f166..4a6a00eb6897 100644 --- a/beacon-chain/cache/committee_ids.go +++ b/beacon-chain/cache/committee_ids.go @@ -4,39 +4,79 @@ import ( "sync" lru "github.com/hashicorp/golang-lru" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/sliceutil" ) type committeeIDs struct { - cache *lru.Cache - lock sync.RWMutex + attester *lru.Cache + attesterLock sync.RWMutex + aggregator *lru.Cache + aggregatorLock sync.RWMutex } -// CommitteeIDs for attestations. +// CommitteeIDs for attester and aggregator. var CommitteeIDs = newCommitteeIDs() func newCommitteeIDs() *committeeIDs { - cache, err := lru.New(8) + // Given a node can calculate committee assignments of current epoch and next epoch. + // Max size is set to 2 epoch length. + cacheSize := int(params.BeaconConfig().MaxCommitteesPerSlot * params.BeaconConfig().SlotsPerEpoch * 2) + attesterCache, err := lru.New(cacheSize) if err != nil { panic(err) } - return &committeeIDs{cache: cache} + aggregatorCache, err := lru.New(cacheSize) + if err != nil { + panic(err) + } + return &committeeIDs{attester: attesterCache, aggregator: aggregatorCache} } -// AddIDs to the cache for attestation committees by epoch. -func (t *committeeIDs) AddIDs(indices []uint64, epoch uint64) { - t.lock.Lock() - defer t.lock.Unlock() - val, exists := t.cache.Get(epoch) +// AddAttesterCommiteeID adds committee ID for subscribing subnet for the attester of a given slot. +func (c *committeeIDs) AddAttesterCommiteeID(slot uint64, committeeID uint64) { + c.attesterLock.Lock() + defer c.attesterLock.Unlock() + + ids := []uint64{committeeID} + val, exists := c.attester.Get(slot) if exists { - indices = sliceutil.UnionUint64(append(indices, val.([]uint64)...)) + ids = sliceutil.UnionUint64(append(val.([]uint64), ids...)) } - t.cache.Add(epoch, indices) + c.attester.Add(slot, ids) } -// GetIDs from the cache for attestation committees by epoch. -func (t *committeeIDs) GetIDs(epoch uint64) []uint64 { - val, exists := t.cache.Get(epoch) +// GetAttesterCommitteeIDs gets the committee ID for subscribing subnet for attester of the slot. +func (c *committeeIDs) GetAttesterCommitteeIDs(slot uint64) []uint64 { + c.attesterLock.RLock() + defer c.attesterLock.RUnlock() + + val, exists := c.attester.Get(slot) + if !exists { + return []uint64{} + } + return val.([]uint64) +} + +// AddAggregatorCommiteeID adds committee ID for subscribing subnet for the aggregator of a given slot. +func (c *committeeIDs) AddAggregatorCommiteeID(slot uint64, committeeID uint64) { + c.aggregatorLock.Lock() + defer c.aggregatorLock.Unlock() + + ids := []uint64{committeeID} + val, exists := c.aggregator.Get(slot) + if exists { + ids = sliceutil.UnionUint64(append(val.([]uint64), ids...)) + } + c.aggregator.Add(slot, ids) +} + +// GetAggregatorCommitteeIDs gets the committee ID for subscribing subnet for aggregator of the slot. +func (c *committeeIDs) GetAggregatorCommitteeIDs(slot uint64) []uint64 { + c.aggregatorLock.RLock() + defer c.aggregatorLock.RUnlock() + + val, exists := c.aggregator.Get(slot) if !exists { return []uint64{} } diff --git a/beacon-chain/cache/committee_ids_test.go b/beacon-chain/cache/committee_ids_test.go new file mode 100644 index 000000000000..3ae97533a825 --- /dev/null +++ b/beacon-chain/cache/committee_ids_test.go @@ -0,0 +1,56 @@ +package cache + +import ( + "reflect" + "testing" +) + +func TestCommitteeIDCache_RoundTrip(t *testing.T) { + c := newCommitteeIDs() + slot := uint64(100) + committeeIDs := c.GetAggregatorCommitteeIDs(slot) + if len(committeeIDs) != 0 { + t.Errorf("Empty cache returned an object: %v", committeeIDs) + } + + c.AddAggregatorCommiteeID(slot, 1) + res := c.GetAggregatorCommitteeIDs(slot) + if !reflect.DeepEqual(res, []uint64{1}) { + t.Error("Expected equal value to return from cache") + } + + c.AddAggregatorCommiteeID(slot, 2) + res = c.GetAggregatorCommitteeIDs(slot) + if !reflect.DeepEqual(res, []uint64{1, 2}) { + t.Error("Expected equal value to return from cache") + } + + c.AddAggregatorCommiteeID(slot, 3) + res = c.GetAggregatorCommitteeIDs(slot) + if !reflect.DeepEqual(res, []uint64{1, 2, 3}) { + t.Error("Expected equal value to return from cache") + } + + committeeIDs = c.GetAttesterCommitteeIDs(slot) + if len(committeeIDs) != 0 { + t.Errorf("Empty cache returned an object: %v", committeeIDs) + } + + c.AddAttesterCommiteeID(slot, 11) + res = c.GetAttesterCommitteeIDs(slot) + if !reflect.DeepEqual(res, []uint64{11}) { + t.Error("Expected equal value to return from cache") + } + + c.AddAttesterCommiteeID(slot, 22) + res = c.GetAttesterCommitteeIDs(slot) + if !reflect.DeepEqual(res, []uint64{11, 22}) { + t.Error("Expected equal value to return from cache") + } + + c.AddAttesterCommiteeID(slot, 33) + res = c.GetAttesterCommitteeIDs(slot) + if !reflect.DeepEqual(res, []uint64{11, 22, 33}) { + t.Error("Expected equal value to return from cache") + } +} diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 166513e9b002..23f9ace13559 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "//shared/iputils:go_default_library", "//shared/params:go_default_library", "//shared/runutil:go_default_library", + "//shared/sliceutil:go_default_library", "//shared/traceutil:go_default_library", "@com_github_btcsuite_btcd//btcec:go_default_library", "@com_github_dgraph_io_ristretto//:go_default_library", diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 189cf196f5a9..3e457d5c315c 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -34,6 +34,7 @@ import ( "github.com/prysmaticlabs/prysm/shared" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/runutil" + "github.com/prysmaticlabs/prysm/shared/sliceutil" "github.com/sirupsen/logrus" ) @@ -354,7 +355,13 @@ func (s *Service) RefreshENR(epoch uint64) { return } bitV := bitfield.NewBitvector64() - committees := cache.CommitteeIDs.GetIDs(epoch) + + var committees []uint64 + epochStartSlot := helpers.StartSlot(epoch) + for i := epochStartSlot; i < epochStartSlot+2*params.BeaconConfig().SlotsPerEpoch; i++ { + committees = append(committees, sliceutil.UnionUint64(cache.CommitteeIDs.GetAttesterCommitteeIDs(i), + cache.CommitteeIDs.GetAggregatorCommitteeIDs(i))...) + } for _, idx := range committees { bitV.SetBitAt(idx, true) } diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index a8a34270c0dd..a28259dad605 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -109,7 +109,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) { // Update ENR of a peer. testService := &Service{dv5Listener: listeners[0]} - cache.CommitteeIDs.AddIDs([]uint64{10}, 0) + cache.CommitteeIDs.AddAttesterCommiteeID(0, 10) testService.RefreshENR(0) time.Sleep(2 * time.Second) diff --git a/beacon-chain/rpc/validator/assignments.go b/beacon-chain/rpc/validator/assignments.go index c282e1e3cb0c..ceca4f6b28f7 100644 --- a/beacon-chain/rpc/validator/assignments.go +++ b/beacon-chain/rpc/validator/assignments.go @@ -4,10 +4,8 @@ import ( "context" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" - "github.com/prysmaticlabs/prysm/shared/featureconfig" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -87,11 +85,6 @@ func (vs *Server) GetDuties(ctx context.Context, req *ethpb.DutiesRequest) (*eth } - if featureconfig.Get().EnableDynamicCommitteeSubnets { - cache.CommitteeIDs.AddIDs(committeeIDs, req.Epoch) - cache.CommitteeIDs.AddIDs(nextCommitteeIDs, req.Epoch+1) - } - return ðpb.DutiesResponse{ Duties: validatorAssignments, }, nil diff --git a/beacon-chain/rpc/validator/attester.go b/beacon-chain/rpc/validator/attester.go index 1f3b2fe8c61b..184143fa757f 100644 --- a/beacon-chain/rpc/validator/attester.go +++ b/beacon-chain/rpc/validator/attester.go @@ -4,6 +4,7 @@ import ( "context" "time" + ptypes "github.com/gogo/protobuf/types" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/cache" @@ -36,12 +37,6 @@ func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.Attestation trace.Int64Attribute("committeeIndex", int64(req.CommitteeIndex)), ) - // If attestation committee subnets are enabled, we track the committee - // index into a cache. - if featureconfig.Get().EnableDynamicCommitteeSubnets { - cache.CommitteeIDs.AddIDs([]uint64{req.CommitteeIndex}, helpers.SlotToEpoch(req.Slot)) - } - if vs.SyncChecker.Syncing() { return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond") } @@ -159,12 +154,6 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation return nil, status.Error(codes.InvalidArgument, "Incorrect attestation signature") } - // If attestation committee subnets are enabled, we track the committee - // index into a cache. - if featureconfig.Get().EnableDynamicCommitteeSubnets { - cache.CommitteeIDs.AddIDs([]uint64{att.Data.CommitteeIndex}, helpers.SlotToEpoch(att.Data.Slot)) - } - root, err := ssz.HashTreeRoot(att.Data) if err != nil { return nil, status.Errorf(codes.Internal, "Could not tree hash attestation: %v", err) @@ -235,3 +224,14 @@ func (vs *Server) waitToOneThird(ctx context.Context, slot uint64) { } } } + +// SubscribeCommitteeSubnet subscribes to the committee ID subnet given subscribe request. +func (vs *Server) SubscribeCommitteeSubnet(ctx context.Context, req *ethpb.CommitteeSubnetSubscribeRequest) (*ptypes.Empty, error) { + cache.CommitteeIDs.AddAttesterCommiteeID(req.Slot, req.CommitteeId) + + if req.IsAggregator { + cache.CommitteeIDs.AddAggregatorCommiteeID(req.Slot, req.CommitteeId) + } + + return &ptypes.Empty{}, nil +} diff --git a/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go b/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go index a95c91711fd9..eb5b3e4ae1ad 100644 --- a/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go @@ -50,7 +50,7 @@ func (r *Service) committeesCount() int { } func (r *Service) committeeIndices() []uint64 { - currentEpoch := helpers.SlotToEpoch(r.chain.HeadSlot()) - return sliceutil.UnionUint64(cache.CommitteeIDs.GetIDs(currentEpoch), - cache.CommitteeIDs.GetIDs(currentEpoch+1)) + currentSlot := r.chain.CurrentSlot() + return sliceutil.UnionUint64(cache.CommitteeIDs.GetAttesterCommitteeIDs(currentSlot), + cache.CommitteeIDs.GetAggregatorCommitteeIDs(currentSlot)) } diff --git a/third_party/com_github_prysmaticlabs_ethereumapis-tags.patch b/third_party/com_github_prysmaticlabs_ethereumapis-tags.patch index 854a3fd74082..47c06b1940ed 100644 --- a/third_party/com_github_prysmaticlabs_ethereumapis-tags.patch +++ b/third_party/com_github_prysmaticlabs_ethereumapis-tags.patch @@ -447,7 +447,7 @@ index 8ee263b..60607de 100644 // The epoch for which this set of validator assignments is valid. diff --git a/eth/v1alpha1/validator.proto b/eth/v1alpha1/validator.proto -index 068a04c..80f73b8 100644 +index 3455de7..f4fab59 100644 --- a/eth/v1alpha1/validator.proto +++ b/eth/v1alpha1/validator.proto @@ -15,6 +15,7 @@ syntax = "proto3"; @@ -458,7 +458,7 @@ index 068a04c..80f73b8 100644 import "google/api/annotations.proto"; import "google/protobuf/empty.proto"; import "eth/v1alpha1/beacon_block.proto"; -@@ -197,7 +198,7 @@ message DomainResponse { +@@ -208,7 +209,7 @@ message DomainResponse { message ValidatorActivationRequest { // A list of 48 byte validator public keys. @@ -467,7 +467,7 @@ index 068a04c..80f73b8 100644 } message ValidatorActivationResponse { -@@ -223,7 +224,7 @@ message ChainStartResponse { +@@ -234,7 +235,7 @@ message ChainStartResponse { message ValidatorIndexRequest { // A 48 byte validator public key. @@ -476,7 +476,7 @@ index 068a04c..80f73b8 100644 } message ValidatorIndexResponse { -@@ -233,7 +234,7 @@ message ValidatorIndexResponse { +@@ -244,7 +245,7 @@ message ValidatorIndexResponse { message ValidatorStatusRequest { // A 48 byte validator public key. @@ -485,7 +485,7 @@ index 068a04c..80f73b8 100644 } enum ValidatorStatus { -@@ -271,7 +272,7 @@ message DutiesRequest { +@@ -282,7 +283,7 @@ message DutiesRequest { uint64 epoch = 1; // Array of byte encoded BLS public keys. @@ -494,7 +494,7 @@ index 068a04c..80f73b8 100644 } message DutiesResponse { -@@ -290,7 +291,7 @@ message DutiesResponse { +@@ -301,7 +302,7 @@ message DutiesResponse { uint64 proposer_slot = 4; // 48 byte BLS public key for the validator who's assigned to perform a duty. @@ -503,7 +503,7 @@ index 068a04c..80f73b8 100644 // The current status of the validator assigned to perform the duty. ValidatorStatus status = 6; -@@ -305,15 +306,16 @@ message BlockRequest { +@@ -316,15 +317,16 @@ message BlockRequest { uint64 slot = 1; // Validator's 32 byte randao reveal secret of the current epoch. @@ -523,7 +523,7 @@ index 068a04c..80f73b8 100644 } message AttestationDataRequest { -@@ -326,7 +328,7 @@ message AttestationDataRequest { +@@ -337,7 +339,7 @@ message AttestationDataRequest { message AttestResponse { // The root of the attestation data successfully submitted to the beacon node. @@ -532,7 +532,7 @@ index 068a04c..80f73b8 100644 } message AggregateSelectionRequest { -@@ -335,10 +337,10 @@ message AggregateSelectionRequest { +@@ -346,10 +348,10 @@ message AggregateSelectionRequest { // Committee index of the validator at the given slot. uint64 committee_index = 2; // 48 byte public key of the validator. @@ -545,7 +545,7 @@ index 068a04c..80f73b8 100644 } message AggregateSelectionResponse { -@@ -353,16 +355,16 @@ message SignedAggregateSubmitRequest { +@@ -364,7 +366,7 @@ message SignedAggregateSubmitRequest { message SignedAggregateSubmitResponse { // The 32 byte hash tree root of the aggregated attestation data. @@ -553,6 +553,8 @@ index 068a04c..80f73b8 100644 + bytes attestation_data_root = 1 [(gogoproto.moretags) = "ssz-size:\"32\""]; } + message CommitteeSubnetSubscribeRequest { +@@ -381,10 +383,10 @@ message CommitteeSubnetSubscribeRequest { // An Ethereum 2.0 validator. message Validator { // 48 byte BLS public key used for the validator's activities. diff --git a/validator/client/validator.go b/validator/client/validator.go index 0e8b5e75b720..0d1174490f97 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -337,6 +337,13 @@ func (v *validator) RolesAt(ctx context.Context, slot uint64) (map[[48]byte][]pb roles = append(roles, pb.ValidatorRole_AGGREGATOR) } + if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ + Slot: slot, + CommitteeId: duty.CommitteeIndex, + IsAggregator: aggregator, + }); err != nil { + return nil, err + } } if len(roles) == 0 { roles = append(roles, pb.ValidatorRole_UNKNOWN) diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 81f4a290416b..dae939574cd2 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -604,11 +604,22 @@ func TestRolesAt_OK(t *testing.T) { gomock.Any(), // ctx gomock.Any(), // epoch ).Return(ðpb.DomainResponse{}, nil /*err*/) + m.validatorClient.EXPECT().DomainData( gomock.Any(), // ctx gomock.Any(), // epoch ).Return(ðpb.DomainResponse{}, nil /*err*/) + m.validatorClient.EXPECT().SubscribeCommitteeSubnet( + gomock.Any(), // ctx + gomock.Any(), // epoch + ).Return(nil, nil /*err*/) + + m.validatorClient.EXPECT().SubscribeCommitteeSubnet( + gomock.Any(), // ctx + gomock.Any(), // epoch + ).Return(nil, nil /*err*/) + roleMap, err := v.RolesAt(context.Background(), 1) if err != nil { t.Fatal(err) @@ -671,6 +682,11 @@ func TestRolesAt_DoesNotAssignProposer_Slot0(t *testing.T) { gomock.Any(), // epoch ).Return(ðpb.DomainResponse{}, nil /*err*/) + m.validatorClient.EXPECT().SubscribeCommitteeSubnet( + gomock.Any(), // ctx + gomock.Any(), // epoch + ).Return(nil, nil /*err*/) + roleMap, err := v.RolesAt(context.Background(), 0) if err != nil { t.Fatal(err) diff --git a/validator/internal/beacon_node_validator_service_mock.go b/validator/internal/beacon_node_validator_service_mock.go index 554384c440dd..1be712ad60a4 100644 --- a/validator/internal/beacon_node_validator_service_mock.go +++ b/validator/internal/beacon_node_validator_service_mock.go @@ -10,6 +10,7 @@ import ( ptypes "github.com/gogo/protobuf/types" gomock "github.com/golang/mock/gomock" + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" grpc "google.golang.org/grpc" metadata "google.golang.org/grpc/metadata" @@ -967,3 +968,23 @@ func (mr *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder) RecvMsg(m mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBeaconNodeValidator_WaitForChainStartServer)(nil).RecvMsg), m) } + +// SubscribeCommitteeSubnet mocks base method +func (m *MockBeaconNodeValidatorClient) SubscribeCommitteeSubnet(arg0 context.Context, arg1 *eth.CommitteeSubnetSubscribeRequest, arg2 ...grpc.CallOption) (*ptypes.Empty, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SubscribeCommitteeSubnet", varargs...) + ret0, _ := ret[0].(*ptypes.Empty) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SubscribeCommitteeSubnet indicates an expected call of SubscribeCommitteeSubnet +func (mr *MockBeaconNodeValidatorClientMockRecorder) SubscribeCommitteeSubnet(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeCommitteeSubnet", reflect.TypeOf((*MockBeaconNodeValidatorClient)(nil).SubscribeCommitteeSubnet), varargs...) +}