Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Dynamic Subscriptions #5318

Merged
merged 8 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon-chain/sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (r *Service) updateMetrics() {
if err != nil {
log.WithError(err).Errorf("Could not compute fork digest")
}
indices := r.committeeIndices()
indices := r.aggregatorCommitteeIndices(r.chain.CurrentSlot())
attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
attTopic += r.p2p.Encoding().ProtocolSuffix()
for _, committeeIdx := range indices {
Expand Down
130 changes: 85 additions & 45 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/messagehandler"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"go.opencensus.io/trace"
)
Expand Down Expand Up @@ -94,7 +95,6 @@ func (r *Service) registerSubscribers() {
if featureconfig.Get().EnableDynamicCommitteeSubnets {
r.subscribeDynamicWithSubnets(
"/eth2/%x/committee_index%d_beacon_attestation",
r.committeeIndices, /* determineSubsLen */
r.validateCommitteeIndexBeaconAttestation, /* validator */
r.committeeIndexBeaconAttestationSubscriber, /* message handler */
)
Expand Down Expand Up @@ -207,7 +207,6 @@ func wrapAndReportValidation(topic string, v pubsub.Validator) (string, pubsub.V
// maintained.
func (r *Service) subscribeDynamicWithSubnets(
topicFormat string,
determineSubIndices func() []uint64,
validate pubsub.Validator,
handle subHandler,
) {
Expand All @@ -220,63 +219,34 @@ func (r *Service) subscribeDynamicWithSubnets(
log.WithError(err).Fatal("Could not compute fork digest")
}
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot)
genesis := r.chain.GenesisTime()
ticker := slotutil.GetSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)

stateChannel := make(chan *feed.Event, 1)
stateSub := r.stateNotifier.StateFeed().Subscribe(stateChannel)
go func() {
for {
select {
case <-r.ctx.Done():
stateSub.Unsubscribe()
ticker.Done()
return
case <-stateChannel:
case currentSlot := <-ticker.C():
if r.chainStarted && r.initialSync.Syncing() {
continue
}
// Update desired topic indices.
wantedSubs := determineSubIndices()
// Update desired topic indices for aggregator
wantedSubs := r.aggregatorCommitteeIndices(currentSlot)
// Resize as appropriate.
for k, v := range subscriptions {
var wanted bool
for _, idx := range wantedSubs {
if k == idx {
wanted = true
break
}
}
if !wanted && v != nil {
v.Cancel()
r.p2p.PubSub().UnregisterTopicValidator(fmt.Sprintf(topicFormat, k))
delete(subscriptions, k)
}
}
r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat)

for _, idx := range wantedSubs {
if _, exists := subscriptions[idx]; !exists {
// do not subscribe if we have no peers in the same
// subnet
topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
numOfPeers := r.p2p.PubSub().ListPeers(subnetTopic + r.p2p.Encoding().ProtocolSuffix())
if len(r.p2p.Peers().SubscribedToSubnet(idx)) == 0 && len(numOfPeers) == 0 {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
go func(idx uint64) {
peerExists, err := r.p2p.FindPeersWithSubnet(idx)
if err != nil {
log.Errorf("Could not search for peers: %v", err)
return
}
// do not subscribe if we couldn't find a connected peer.
if !peerExists {
return
}
subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle)
}(idx)
continue
}
subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle)
r.subscribeMissingSubnet(subscriptions, idx, base, digest, validate, handle)
}
}
// find desired subs for attesters
attesterSubs := r.attesterCommitteeIndices(currentSlot)
for _, idx := range attesterSubs {
r.lookupAttesterSubnets(digest, idx)
}
}
}
}()
Expand Down Expand Up @@ -331,6 +301,76 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i
}()
}

// revalidate that our currently connected subnets are valid.
func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription,
wantedSubs []uint64, topicFormat string) {
for k, v := range subscriptions {
var wanted bool
for _, idx := range wantedSubs {
if k == idx {
wanted = true
break
}
}
if !wanted && v != nil {
v.Cancel()
r.p2p.PubSub().UnregisterTopicValidator(fmt.Sprintf(topicFormat, k))
delete(subscriptions, k)
}
}
}

// subscribe missing subnets for our aggregators.
func (r *Service) subscribeMissingSubnet(subscriptions map[uint64]*pubsub.Subscription, idx uint64,
base proto.Message, digest [4]byte, validate pubsub.Validator, handle subHandler) {
// do not subscribe if we have no peers in the same
// subnet
topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
if !r.validPeersExist(subnetTopic, idx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should FindPeersWithSubnet even if we have valid peers already that way we can add more peers if we find ones

Copy link
Contributor

@shayzluf shayzluf Apr 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i now see that we do it on lookupAttesterSubnets

log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
go func(idx uint64) {
peerExists, err := r.p2p.FindPeersWithSubnet(idx)
if err != nil {
log.Errorf("Could not search for peers: %v", err)
return
}
// do not subscribe if we couldn't find a connected peer.
if !peerExists {
return
}
subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle)
}(idx)
return
}
subscriptions[idx] = r.subscribeWithBase(base, subnetTopic, validate, handle)
}

// lookup peers for attester specific subnets.
func (r *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
if !r.validPeersExist(subnetTopic, idx) {
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
"committee index %d. Searching network for peers subscribed to the subnet.", idx)
go func(idx uint64) {
// perform a search for peers with the desired committee index.
_, err := r.p2p.FindPeersWithSubnet(idx)
if err != nil {
log.Errorf("Could not search for peers: %v", err)
return
}
}(idx)
}
}

// find if we have peers who are subscribed to the same subnet
func (r *Service) validPeersExist(subnetTopic string, idx uint64) bool {
numOfPeers := r.p2p.PubSub().ListPeers(subnetTopic + r.p2p.Encoding().ProtocolSuffix())
return len(r.p2p.Peers().SubscribedToSubnet(idx)) > 0 || len(numOfPeers) > 0
}

// Add fork digest to topic.
func (r *Service) addDigestToTopic(topic string) string {
if !strings.Contains(topic, "%x") {
Expand Down
24 changes: 19 additions & 5 deletions beacon-chain/sync/subscriber_committee_index_beacon_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/prysmaticlabs/prysm/shared/params"
)

func (r *Service) committeeIndexBeaconAttestationSubscriber(ctx context.Context, msg proto.Message) error {
Expand Down Expand Up @@ -49,8 +49,22 @@ func (r *Service) committeesCount() int {
return int(helpers.SlotCommitteeCount(uint64(len(activeValidatorIndices))))
}

func (r *Service) committeeIndices() []uint64 {
currentSlot := r.chain.CurrentSlot()
return sliceutil.UnionUint64(cache.CommitteeIDs.GetAttesterCommitteeIDs(currentSlot),
cache.CommitteeIDs.GetAggregatorCommitteeIDs(currentSlot))
func (r *Service) aggregatorCommitteeIndices(currentSlot uint64) []uint64 {
endEpoch := helpers.SlotToEpoch(currentSlot) + 1
endSlot := endEpoch * params.BeaconConfig().SlotsPerEpoch
commIds := []uint64{}
for i := currentSlot; i <= endSlot; i++ {
commIds = append(commIds, cache.CommitteeIDs.GetAggregatorCommitteeIDs(i)...)
}
return commIds
}

func (r *Service) attesterCommitteeIndices(currentSlot uint64) []uint64 {
endEpoch := helpers.SlotToEpoch(currentSlot) + 1
endSlot := endEpoch * params.BeaconConfig().SlotsPerEpoch
commIds := []uint64{}
for i := currentSlot; i <= endSlot; i++ {
commIds = append(commIds, cache.CommitteeIDs.GetAttesterCommitteeIDs(i)...)
}
return commIds
}