Skip to content

Commit

Permalink
Optimize GetDuties VC action (#13789)
Browse files Browse the repository at this point in the history
* wait groups

* errgroup

* tests

* bzl

* review
  • Loading branch information
rkapka committed Mar 22, 2024
1 parent a6e86c6 commit 63c2b35
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 235 deletions.
2 changes: 1 addition & 1 deletion validator/client/beacon-api/BUILD.bazel
Expand Up @@ -66,6 +66,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

Expand Down Expand Up @@ -129,7 +130,6 @@ go_test(
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/validator-mock:go_default_library",
"//time/slots:go_default_library",
"//validator/client/beacon-api/mock:go_default_library",
"//validator/client/beacon-api/test-helpers:go_default_library",
Expand Down
265 changes: 167 additions & 98 deletions validator/client/beacon-api/duties.go
Expand Up @@ -8,11 +8,14 @@ import (
"net/url"
"strconv"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/consensus-types/validator"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"golang.org/x/sync/errgroup"
)

type dutiesProvider interface {
Expand All @@ -31,37 +34,42 @@ type committeeIndexSlotPair struct {
slot primitives.Slot
}

type validatorForDuty struct {
pubkey []byte
index primitives.ValidatorIndex
status ethpb.ValidatorStatus
}

func (c beaconApiValidatorClient) getDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) {
all, err := c.multipleValidatorStatus(ctx, &ethpb.MultipleValidatorStatusRequest{PublicKeys: in.PublicKeys})
vals, err := c.getValidatorsForDuties(ctx, in.PublicKeys)
if err != nil {
return nil, errors.Wrap(err, "failed to get validator status")
}
known := &ethpb.MultipleValidatorStatusResponse{
PublicKeys: make([][]byte, 0, len(all.PublicKeys)),
Statuses: make([]*ethpb.ValidatorStatusResponse, 0, len(all.Statuses)),
Indices: make([]primitives.ValidatorIndex, 0, len(all.Indices)),
}
for i, status := range all.Statuses {
if status.Status != ethpb.ValidatorStatus_UNKNOWN_STATUS {
known.PublicKeys = append(known.PublicKeys, all.PublicKeys[i])
known.Statuses = append(known.Statuses, all.Statuses[i])
known.Indices = append(known.Indices, all.Indices[i])
}
return nil, errors.Wrap(err, "failed to get validators for duties")
}

// Sync committees are an Altair feature
fetchSyncDuties := in.Epoch >= params.BeaconConfig().AltairForkEpoch

currentEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch, known, fetchSyncDuties)
if err != nil {
return nil, errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch)
}
errCh := make(chan error, 1)

nextEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch+1, known, fetchSyncDuties)
var currentEpochDuties []*ethpb.DutiesResponse_Duty
go func() {
currentEpochDuties, err = c.getDutiesForEpoch(ctx, in.Epoch, vals, fetchSyncDuties)
if err != nil {
errCh <- errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch)
return
}
errCh <- nil
}()

nextEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch+1, vals, fetchSyncDuties)
if err != nil {
return nil, errors.Wrapf(err, "failed to get duties for next epoch `%d`", in.Epoch+1)
}

if err = <-errCh; err != nil {
return nil, err
}

return &ethpb.DutiesResponse{
CurrentEpochDuties: currentEpochDuties,
NextEpochDuties: nextEpochDuties,
Expand All @@ -71,103 +79,117 @@ func (c beaconApiValidatorClient) getDuties(ctx context.Context, in *ethpb.Dutie
func (c beaconApiValidatorClient) getDutiesForEpoch(
ctx context.Context,
epoch primitives.Epoch,
multipleValidatorStatus *ethpb.MultipleValidatorStatusResponse,
vals []validatorForDuty,
fetchSyncDuties bool,
) ([]*ethpb.DutiesResponse_Duty, error) {
attesterDuties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, multipleValidatorStatus.Indices)
if err != nil {
return nil, errors.Wrapf(err, "failed to get attester duties for epoch `%d`", epoch)
indices := make([]primitives.ValidatorIndex, len(vals))
for i, v := range vals {
indices[i] = v.index
}

var syncDuties []*structs.SyncCommitteeDuty
if fetchSyncDuties {
if syncDuties, err = c.dutiesProvider.GetSyncDuties(ctx, epoch, multipleValidatorStatus.Indices); err != nil {
return nil, errors.Wrapf(err, "failed to get sync duties for epoch `%d`", epoch)
}
}

var proposerDuties []*structs.ProposerDuty
if proposerDuties, err = c.dutiesProvider.GetProposerDuties(ctx, epoch); err != nil {
return nil, errors.Wrapf(err, "failed to get proposer duties for epoch `%d`", epoch)
}

committees, err := c.dutiesProvider.GetCommittees(ctx, epoch)
if err != nil {
return nil, errors.Wrapf(err, "failed to get committees for epoch `%d`", epoch)
}
slotCommittees := make(map[string]uint64)
for _, c := range committees {
n, ok := slotCommittees[c.Slot]
if !ok {
n = 0
}
slotCommittees[c.Slot] = n + 1
}
// Below variables MUST NOT be used in the main function before wg.Wait().
// This is because they are populated in goroutines and wg.Wait()
// will return only once all goroutines finish their execution.

// Mapping from a validator index to its attesting committee's index and slot
attesterDutiesMapping := make(map[primitives.ValidatorIndex]committeeIndexSlotPair)
for _, attesterDuty := range attesterDuties {
validatorIndex, err := strconv.ParseUint(attesterDuty.ValidatorIndex, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse attester validator index `%s`", attesterDuty.ValidatorIndex)
}
// Set containing all validator indices that are part of a sync committee for this epoch
syncDutiesMapping := make(map[primitives.ValidatorIndex]bool)
// Mapping from a validator index to its proposal slot
proposerDutySlots := make(map[primitives.ValidatorIndex][]primitives.Slot)
// Mapping from the {committeeIndex, slot} to each of the committee's validator indices
committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex)

slot, err := strconv.ParseUint(attesterDuty.Slot, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse attester slot `%s`", attesterDuty.Slot)
}
var wg errgroup.Group

committeeIndex, err := strconv.ParseUint(attesterDuty.CommitteeIndex, 10, 64)
wg.Go(func() error {
attesterDuties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, indices)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse attester committee index `%s`", attesterDuty.CommitteeIndex)
return errors.Wrapf(err, "failed to get attester duties for epoch `%d`", epoch)
}

attesterDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = committeeIndexSlotPair{
slot: primitives.Slot(slot),
committeeIndex: primitives.CommitteeIndex(committeeIndex),
for _, attesterDuty := range attesterDuties {
validatorIndex, err := strconv.ParseUint(attesterDuty.ValidatorIndex, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse attester validator index `%s`", attesterDuty.ValidatorIndex)
}
slot, err := strconv.ParseUint(attesterDuty.Slot, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse attester slot `%s`", attesterDuty.Slot)
}
committeeIndex, err := strconv.ParseUint(attesterDuty.CommitteeIndex, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse attester committee index `%s`", attesterDuty.CommitteeIndex)
}
attesterDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = committeeIndexSlotPair{
slot: primitives.Slot(slot),
committeeIndex: primitives.CommitteeIndex(committeeIndex),
}
}
return nil
})

if fetchSyncDuties {
wg.Go(func() error {
syncDuties, err := c.dutiesProvider.GetSyncDuties(ctx, epoch, indices)
if err != nil {
return errors.Wrapf(err, "failed to get sync duties for epoch `%d`", epoch)
}

for _, syncDuty := range syncDuties {
validatorIndex, err := strconv.ParseUint(syncDuty.ValidatorIndex, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse sync validator index `%s`", syncDuty.ValidatorIndex)
}
syncDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = true
}
return nil
})
}

// Mapping from a validator index to its proposal slot
proposerDutySlots := make(map[primitives.ValidatorIndex][]primitives.Slot)
for _, proposerDuty := range proposerDuties {
validatorIndex, err := strconv.ParseUint(proposerDuty.ValidatorIndex, 10, 64)
wg.Go(func() error {
proposerDuties, err := c.dutiesProvider.GetProposerDuties(ctx, epoch)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse proposer validator index `%s`", proposerDuty.ValidatorIndex)
return errors.Wrapf(err, "failed to get proposer duties for epoch `%d`", epoch)
}

slot, err := strconv.ParseUint(proposerDuty.Slot, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse proposer slot `%s`", proposerDuty.Slot)
for _, proposerDuty := range proposerDuties {
validatorIndex, err := strconv.ParseUint(proposerDuty.ValidatorIndex, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse proposer validator index `%s`", proposerDuty.ValidatorIndex)
}
slot, err := strconv.ParseUint(proposerDuty.Slot, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse proposer slot `%s`", proposerDuty.Slot)
}
proposerDutySlots[primitives.ValidatorIndex(validatorIndex)] =
append(proposerDutySlots[primitives.ValidatorIndex(validatorIndex)], primitives.Slot(slot))
}
return nil
})

proposerDutySlots[primitives.ValidatorIndex(validatorIndex)] = append(proposerDutySlots[primitives.ValidatorIndex(validatorIndex)], primitives.Slot(slot))
committees, err := c.dutiesProvider.GetCommittees(ctx, epoch)
if err != nil {
return nil, errors.Wrapf(err, "failed to get committees for epoch `%d`", epoch)
}

// Set containing all validator indices that are part of a sync committee for this epoch
syncDutiesMapping := make(map[primitives.ValidatorIndex]bool)
for _, syncDuty := range syncDuties {
validatorIndex, err := strconv.ParseUint(syncDuty.ValidatorIndex, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse sync validator index `%s`", syncDuty.ValidatorIndex)
slotCommittees := make(map[string]uint64)
for _, c := range committees {
n, ok := slotCommittees[c.Slot]
if !ok {
n = 0
}

syncDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = true
slotCommittees[c.Slot] = n + 1
}

// Mapping from the {committeeIndex, slot} to each of the committee's validator indices
committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex)
for _, committee := range committees {
committeeIndex, err := strconv.ParseUint(committee.Index, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse committee index `%s`", committee.Index)
}

slot, err := strconv.ParseUint(committee.Slot, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse slot `%s`", committee.Slot)
}

validatorIndices := make([]primitives.ValidatorIndex, len(committee.Validators))
for index, validatorIndexString := range committee.Validators {
validatorIndex, err := strconv.ParseUint(validatorIndexString, 10, 64)
Expand All @@ -176,24 +198,26 @@ func (c beaconApiValidatorClient) getDutiesForEpoch(
}
validatorIndices[index] = primitives.ValidatorIndex(validatorIndex)
}

key := committeeIndexSlotPair{
committeeIndex: primitives.CommitteeIndex(committeeIndex),
slot: primitives.Slot(slot),
}
committeeMapping[key] = validatorIndices
}

duties := make([]*ethpb.DutiesResponse_Duty, len(multipleValidatorStatus.Statuses))
for index, validatorStatus := range multipleValidatorStatus.Statuses {
validatorIndex := multipleValidatorStatus.Indices[index]
pubkey := multipleValidatorStatus.PublicKeys[index]
if err = wg.Wait(); err != nil {
return nil, err
}

var attesterSlot primitives.Slot
var committeeIndex primitives.CommitteeIndex
var committeeValidatorIndices []primitives.ValidatorIndex
duties := make([]*ethpb.DutiesResponse_Duty, len(vals))
for i, v := range vals {
var (
attesterSlot primitives.Slot
committeeIndex primitives.CommitteeIndex
committeeValidatorIndices []primitives.ValidatorIndex
)

if committeeMappingKey, ok := attesterDutiesMapping[validatorIndex]; ok {
if committeeMappingKey, ok := attesterDutiesMapping[v.index]; ok {
committeeIndex = committeeMappingKey.committeeIndex
attesterSlot = committeeMappingKey.slot

Expand All @@ -202,22 +226,67 @@ func (c beaconApiValidatorClient) getDutiesForEpoch(
}
}

duties[index] = &ethpb.DutiesResponse_Duty{
duties[i] = &ethpb.DutiesResponse_Duty{
Committee: committeeValidatorIndices,
CommitteeIndex: committeeIndex,
AttesterSlot: attesterSlot,
ProposerSlots: proposerDutySlots[validatorIndex],
PublicKey: pubkey,
Status: validatorStatus.Status,
ValidatorIndex: validatorIndex,
IsSyncCommittee: syncDutiesMapping[validatorIndex],
ProposerSlots: proposerDutySlots[v.index],
PublicKey: v.pubkey,
Status: v.status,
ValidatorIndex: v.index,
IsSyncCommittee: syncDutiesMapping[v.index],
CommitteesAtSlot: slotCommittees[strconv.FormatUint(uint64(attesterSlot), 10)],
}
}

return duties, nil
}

func (c *beaconApiValidatorClient) getValidatorsForDuties(ctx context.Context, pubkeys [][]byte) ([]validatorForDuty, error) {
vals := make([]validatorForDuty, 0, len(pubkeys))
stringPubkeysToPubkeys := make(map[string][]byte, len(pubkeys))
stringPubkeys := make([]string, len(pubkeys))

for i, pk := range pubkeys {
stringPk := hexutil.Encode(pk)
stringPubkeysToPubkeys[stringPk] = pk
stringPubkeys[i] = stringPk
}

statusesWithDuties := []string{validator.ActiveOngoing.String(), validator.ActiveExiting.String()}
stateValidatorsResponse, err := c.stateValidatorsProvider.GetStateValidators(ctx, stringPubkeys, nil, statusesWithDuties)
if err != nil {
return nil, errors.Wrap(err, "failed to get state validators")
}

for _, validatorContainer := range stateValidatorsResponse.Data {
val := validatorForDuty{}

validatorIndex, err := strconv.ParseUint(validatorContainer.Index, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse validator index %s", validatorContainer.Index)
}
val.index = primitives.ValidatorIndex(validatorIndex)

stringPubkey := validatorContainer.Validator.Pubkey
pubkey, ok := stringPubkeysToPubkeys[stringPubkey]
if !ok {
return nil, errors.Wrapf(err, "returned public key %s not requested", stringPubkey)
}
val.pubkey = pubkey

status, ok := beaconAPITogRPCValidatorStatus[validatorContainer.Status]
if !ok {
return nil, errors.New("invalid validator status " + validatorContainer.Status)
}
val.status = status

vals = append(vals, val)
}

return vals, nil
}

// GetCommittees retrieves the committees for the given epoch
func (c beaconApiDutiesProvider) GetCommittees(ctx context.Context, epoch primitives.Epoch) ([]*structs.Committee, error) {
committeeParams := url.Values{}
Expand Down

0 comments on commit 63c2b35

Please sign in to comment.