Skip to content

Commit

Permalink
Implement SubscribeCommitteeSubnet method (#5299)
Browse files Browse the repository at this point in the history
* Add client implementation

* Update workspace

* Update server

* Update service

* Gaz

* Mocks

* Fixed validator tests

* Add round tirp tests

* Fixed subnet test

* Comment

* Update committee cache

* Comment

* Update RPC

* Fixed test

* Nishant's comment

* Gaz

* Refresh ENR is for epoch

* Needs to be append
  • Loading branch information
terencechain committed Apr 4, 2020
1 parent a0c62b1 commit 1b8db88
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 50 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ go_repository(

go_repository(
name = "com_github_prysmaticlabs_ethereumapis",
commit = "90224d88dd65813ba2f35197bb1bb8915b2d0c6b",
commit = "3f6a75ac9460621b140270b90057a5a445d66436",
importpath = "github.com/prysmaticlabs/ethereumapis",
patch_args = ["-p1"],
patches = [
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_test(
"attestation_data_test.go",
"checkpoint_state_test.go",
"committee_fuzz_test.go",
"committee_ids_test.go",
"committee_test.go",
"eth1_data_test.go",
"feature_flag_test.go",
Expand Down
70 changes: 55 additions & 15 deletions beacon-chain/cache/committee_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,79 @@ import (
"sync"

lru "github.com/hashicorp/golang-lru"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
)

type committeeIDs struct {
cache *lru.Cache
lock sync.RWMutex
attester *lru.Cache
attesterLock sync.RWMutex
aggregator *lru.Cache
aggregatorLock sync.RWMutex
}

// CommitteeIDs for attestations.
// CommitteeIDs for attester and aggregator.
var CommitteeIDs = newCommitteeIDs()

func newCommitteeIDs() *committeeIDs {
cache, err := lru.New(8)
// Given a node can calculate committee assignments of current epoch and next epoch.
// Max size is set to 2 epoch length.
cacheSize := int(params.BeaconConfig().MaxCommitteesPerSlot * params.BeaconConfig().SlotsPerEpoch * 2)
attesterCache, err := lru.New(cacheSize)
if err != nil {
panic(err)
}
return &committeeIDs{cache: cache}
aggregatorCache, err := lru.New(cacheSize)
if err != nil {
panic(err)
}
return &committeeIDs{attester: attesterCache, aggregator: aggregatorCache}
}

// AddIDs to the cache for attestation committees by epoch.
func (t *committeeIDs) AddIDs(indices []uint64, epoch uint64) {
t.lock.Lock()
defer t.lock.Unlock()
val, exists := t.cache.Get(epoch)
// AddAttesterCommiteeID adds committee ID for subscribing subnet for the attester of a given slot.
func (c *committeeIDs) AddAttesterCommiteeID(slot uint64, committeeID uint64) {
c.attesterLock.Lock()
defer c.attesterLock.Unlock()

ids := []uint64{committeeID}
val, exists := c.attester.Get(slot)
if exists {
indices = sliceutil.UnionUint64(append(indices, val.([]uint64)...))
ids = sliceutil.UnionUint64(append(val.([]uint64), ids...))
}
t.cache.Add(epoch, indices)
c.attester.Add(slot, ids)
}

// GetIDs from the cache for attestation committees by epoch.
func (t *committeeIDs) GetIDs(epoch uint64) []uint64 {
val, exists := t.cache.Get(epoch)
// GetAttesterCommitteeIDs gets the committee ID for subscribing subnet for attester of the slot.
func (c *committeeIDs) GetAttesterCommitteeIDs(slot uint64) []uint64 {
c.attesterLock.RLock()
defer c.attesterLock.RUnlock()

val, exists := c.attester.Get(slot)
if !exists {
return []uint64{}
}
return val.([]uint64)
}

// AddAggregatorCommiteeID adds committee ID for subscribing subnet for the aggregator of a given slot.
func (c *committeeIDs) AddAggregatorCommiteeID(slot uint64, committeeID uint64) {
c.aggregatorLock.Lock()
defer c.aggregatorLock.Unlock()

ids := []uint64{committeeID}
val, exists := c.aggregator.Get(slot)
if exists {
ids = sliceutil.UnionUint64(append(val.([]uint64), ids...))
}
c.aggregator.Add(slot, ids)
}

// GetAggregatorCommitteeIDs gets the committee ID for subscribing subnet for aggregator of the slot.
func (c *committeeIDs) GetAggregatorCommitteeIDs(slot uint64) []uint64 {
c.aggregatorLock.RLock()
defer c.aggregatorLock.RUnlock()

val, exists := c.aggregator.Get(slot)
if !exists {
return []uint64{}
}
Expand Down
56 changes: 56 additions & 0 deletions beacon-chain/cache/committee_ids_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cache

import (
"reflect"
"testing"
)

func TestCommitteeIDCache_RoundTrip(t *testing.T) {
c := newCommitteeIDs()
slot := uint64(100)
committeeIDs := c.GetAggregatorCommitteeIDs(slot)
if len(committeeIDs) != 0 {
t.Errorf("Empty cache returned an object: %v", committeeIDs)
}

c.AddAggregatorCommiteeID(slot, 1)
res := c.GetAggregatorCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{1}) {
t.Error("Expected equal value to return from cache")
}

c.AddAggregatorCommiteeID(slot, 2)
res = c.GetAggregatorCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{1, 2}) {
t.Error("Expected equal value to return from cache")
}

c.AddAggregatorCommiteeID(slot, 3)
res = c.GetAggregatorCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{1, 2, 3}) {
t.Error("Expected equal value to return from cache")
}

committeeIDs = c.GetAttesterCommitteeIDs(slot)
if len(committeeIDs) != 0 {
t.Errorf("Empty cache returned an object: %v", committeeIDs)
}

c.AddAttesterCommiteeID(slot, 11)
res = c.GetAttesterCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{11}) {
t.Error("Expected equal value to return from cache")
}

c.AddAttesterCommiteeID(slot, 22)
res = c.GetAttesterCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{11, 22}) {
t.Error("Expected equal value to return from cache")
}

c.AddAttesterCommiteeID(slot, 33)
res = c.GetAttesterCommitteeIDs(slot)
if !reflect.DeepEqual(res, []uint64{11, 22, 33}) {
t.Error("Expected equal value to return from cache")
}
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//shared/iputils:go_default_library",
"//shared/params:go_default_library",
"//shared/runutil:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/traceutil:go_default_library",
"@com_github_btcsuite_btcd//btcec:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
Expand Down
9 changes: 8 additions & 1 deletion beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -354,7 +355,13 @@ func (s *Service) RefreshENR(epoch uint64) {
return
}
bitV := bitfield.NewBitvector64()
committees := cache.CommitteeIDs.GetIDs(epoch)

var committees []uint64
epochStartSlot := helpers.StartSlot(epoch)
for i := epochStartSlot; i < epochStartSlot+2*params.BeaconConfig().SlotsPerEpoch; i++ {
committees = append(committees, sliceutil.UnionUint64(cache.CommitteeIDs.GetAttesterCommitteeIDs(i),
cache.CommitteeIDs.GetAggregatorCommitteeIDs(i))...)
}
for _, idx := range committees {
bitV.SetBitAt(idx, true)
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/subnets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {

// Update ENR of a peer.
testService := &Service{dv5Listener: listeners[0]}
cache.CommitteeIDs.AddIDs([]uint64{10}, 0)
cache.CommitteeIDs.AddAttesterCommiteeID(0, 10)
testService.RefreshENR(0)
time.Sleep(2 * time.Second)

Expand Down
7 changes: 0 additions & 7 deletions beacon-chain/rpc/validator/assignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -87,11 +85,6 @@ func (vs *Server) GetDuties(ctx context.Context, req *ethpb.DutiesRequest) (*eth

}

if featureconfig.Get().EnableDynamicCommitteeSubnets {
cache.CommitteeIDs.AddIDs(committeeIDs, req.Epoch)
cache.CommitteeIDs.AddIDs(nextCommitteeIDs, req.Epoch+1)
}

return &ethpb.DutiesResponse{
Duties: validatorAssignments,
}, nil
Expand Down
24 changes: 12 additions & 12 deletions beacon-chain/rpc/validator/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
Expand Down Expand Up @@ -36,12 +37,6 @@ func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.Attestation
trace.Int64Attribute("committeeIndex", int64(req.CommitteeIndex)),
)

// If attestation committee subnets are enabled, we track the committee
// index into a cache.
if featureconfig.Get().EnableDynamicCommitteeSubnets {
cache.CommitteeIDs.AddIDs([]uint64{req.CommitteeIndex}, helpers.SlotToEpoch(req.Slot))
}

if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
Expand Down Expand Up @@ -159,12 +154,6 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
return nil, status.Error(codes.InvalidArgument, "Incorrect attestation signature")
}

// If attestation committee subnets are enabled, we track the committee
// index into a cache.
if featureconfig.Get().EnableDynamicCommitteeSubnets {
cache.CommitteeIDs.AddIDs([]uint64{att.Data.CommitteeIndex}, helpers.SlotToEpoch(att.Data.Slot))
}

root, err := ssz.HashTreeRoot(att.Data)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not tree hash attestation: %v", err)
Expand Down Expand Up @@ -235,3 +224,14 @@ func (vs *Server) waitToOneThird(ctx context.Context, slot uint64) {
}
}
}

// SubscribeCommitteeSubnet subscribes to the committee ID subnet given subscribe request.
func (vs *Server) SubscribeCommitteeSubnet(ctx context.Context, req *ethpb.CommitteeSubnetSubscribeRequest) (*ptypes.Empty, error) {
cache.CommitteeIDs.AddAttesterCommiteeID(req.Slot, req.CommitteeId)

if req.IsAggregator {
cache.CommitteeIDs.AddAggregatorCommiteeID(req.Slot, req.CommitteeId)
}

return &ptypes.Empty{}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (r *Service) committeesCount() int {
}

func (r *Service) committeeIndices() []uint64 {
currentEpoch := helpers.SlotToEpoch(r.chain.HeadSlot())
return sliceutil.UnionUint64(cache.CommitteeIDs.GetIDs(currentEpoch),
cache.CommitteeIDs.GetIDs(currentEpoch+1))
currentSlot := r.chain.CurrentSlot()
return sliceutil.UnionUint64(cache.CommitteeIDs.GetAttesterCommitteeIDs(currentSlot),
cache.CommitteeIDs.GetAggregatorCommitteeIDs(currentSlot))
}
22 changes: 12 additions & 10 deletions third_party/com_github_prysmaticlabs_ethereumapis-tags.patch
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ index 8ee263b..60607de 100644

// The epoch for which this set of validator assignments is valid.
diff --git a/eth/v1alpha1/validator.proto b/eth/v1alpha1/validator.proto
index 068a04c..80f73b8 100644
index 3455de7..f4fab59 100644
--- a/eth/v1alpha1/validator.proto
+++ b/eth/v1alpha1/validator.proto
@@ -15,6 +15,7 @@ syntax = "proto3";
Expand All @@ -458,7 +458,7 @@ index 068a04c..80f73b8 100644
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "eth/v1alpha1/beacon_block.proto";
@@ -197,7 +198,7 @@ message DomainResponse {
@@ -208,7 +209,7 @@ message DomainResponse {

message ValidatorActivationRequest {
// A list of 48 byte validator public keys.
Expand All @@ -467,7 +467,7 @@ index 068a04c..80f73b8 100644
}

message ValidatorActivationResponse {
@@ -223,7 +224,7 @@ message ChainStartResponse {
@@ -234,7 +235,7 @@ message ChainStartResponse {

message ValidatorIndexRequest {
// A 48 byte validator public key.
Expand All @@ -476,7 +476,7 @@ index 068a04c..80f73b8 100644
}

message ValidatorIndexResponse {
@@ -233,7 +234,7 @@ message ValidatorIndexResponse {
@@ -244,7 +245,7 @@ message ValidatorIndexResponse {

message ValidatorStatusRequest {
// A 48 byte validator public key.
Expand All @@ -485,7 +485,7 @@ index 068a04c..80f73b8 100644
}

enum ValidatorStatus {
@@ -271,7 +272,7 @@ message DutiesRequest {
@@ -282,7 +283,7 @@ message DutiesRequest {
uint64 epoch = 1;

// Array of byte encoded BLS public keys.
Expand All @@ -494,7 +494,7 @@ index 068a04c..80f73b8 100644
}

message DutiesResponse {
@@ -290,7 +291,7 @@ message DutiesResponse {
@@ -301,7 +302,7 @@ message DutiesResponse {
uint64 proposer_slot = 4;

// 48 byte BLS public key for the validator who's assigned to perform a duty.
Expand All @@ -503,7 +503,7 @@ index 068a04c..80f73b8 100644

// The current status of the validator assigned to perform the duty.
ValidatorStatus status = 6;
@@ -305,15 +306,16 @@ message BlockRequest {
@@ -316,15 +317,16 @@ message BlockRequest {
uint64 slot = 1;

// Validator's 32 byte randao reveal secret of the current epoch.
Expand All @@ -523,7 +523,7 @@ index 068a04c..80f73b8 100644
}

message AttestationDataRequest {
@@ -326,7 +328,7 @@ message AttestationDataRequest {
@@ -337,7 +339,7 @@ message AttestationDataRequest {

message AttestResponse {
// The root of the attestation data successfully submitted to the beacon node.
Expand All @@ -532,7 +532,7 @@ index 068a04c..80f73b8 100644
}

message AggregateSelectionRequest {
@@ -335,10 +337,10 @@ message AggregateSelectionRequest {
@@ -346,10 +348,10 @@ message AggregateSelectionRequest {
// Committee index of the validator at the given slot.
uint64 committee_index = 2;
// 48 byte public key of the validator.
Expand All @@ -545,14 +545,16 @@ index 068a04c..80f73b8 100644
}

message AggregateSelectionResponse {
@@ -353,16 +355,16 @@ message SignedAggregateSubmitRequest {
@@ -364,7 +366,7 @@ message SignedAggregateSubmitRequest {

message SignedAggregateSubmitResponse {
// The 32 byte hash tree root of the aggregated attestation data.
- bytes attestation_data_root = 1;
+ bytes attestation_data_root = 1 [(gogoproto.moretags) = "ssz-size:\"32\""];
}

message CommitteeSubnetSubscribeRequest {
@@ -381,10 +383,10 @@ message CommitteeSubnetSubscribeRequest {
// An Ethereum 2.0 validator.
message Validator {
// 48 byte BLS public key used for the validator's activities.
Expand Down
Loading

0 comments on commit 1b8db88

Please sign in to comment.