Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize GetDuties VC action #13789

Merged
merged 6 commits into from Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion validator/client/beacon-api/BUILD.bazel
Expand Up @@ -66,6 +66,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

Expand Down Expand Up @@ -129,7 +130,6 @@ go_test(
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/validator-mock:go_default_library",
"//time/slots:go_default_library",
"//validator/client/beacon-api/mock:go_default_library",
"//validator/client/beacon-api/test-helpers:go_default_library",
Expand Down
265 changes: 167 additions & 98 deletions validator/client/beacon-api/duties.go
Expand Up @@ -8,11 +8,14 @@ import (
"net/url"
"strconv"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/consensus-types/validator"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"golang.org/x/sync/errgroup"
)

type dutiesProvider interface {
Expand All @@ -31,37 +34,42 @@ type committeeIndexSlotPair struct {
slot primitives.Slot
}

type validatorForDuty struct {
pubkey []byte
index primitives.ValidatorIndex
status ethpb.ValidatorStatus
}

func (c beaconApiValidatorClient) getDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) {
all, err := c.multipleValidatorStatus(ctx, &ethpb.MultipleValidatorStatusRequest{PublicKeys: in.PublicKeys})
vals, err := c.getValidatorsForDuties(ctx, in.PublicKeys)
if err != nil {
return nil, errors.Wrap(err, "failed to get validator status")
}
known := &ethpb.MultipleValidatorStatusResponse{
PublicKeys: make([][]byte, 0, len(all.PublicKeys)),
Statuses: make([]*ethpb.ValidatorStatusResponse, 0, len(all.Statuses)),
Indices: make([]primitives.ValidatorIndex, 0, len(all.Indices)),
}
for i, status := range all.Statuses {
if status.Status != ethpb.ValidatorStatus_UNKNOWN_STATUS {
known.PublicKeys = append(known.PublicKeys, all.PublicKeys[i])
known.Statuses = append(known.Statuses, all.Statuses[i])
known.Indices = append(known.Indices, all.Indices[i])
}
return nil, errors.Wrap(err, "failed to get validators for duties")
}

// Sync committees are an Altair feature
fetchSyncDuties := in.Epoch >= params.BeaconConfig().AltairForkEpoch

currentEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch, known, fetchSyncDuties)
if err != nil {
return nil, errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch)
}
errCh := make(chan error, 1)

nextEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch+1, known, fetchSyncDuties)
var currentEpochDuties []*ethpb.DutiesResponse_Duty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question:

Why using an errgroup here for only one goroutine?
(In general errgroups are used when several goroutines run in parallel, and we want manage the case one of them returns an error.)

A pattern without errgroup could be the following:

errCh := make(chan error)  // Create a channel for errors
 go faultyFunction(errCh) // Start the faulty function in a new goroutine

 // Do other stuffs

 err := <-errCh  // Wait for an error
 fmt.Println("Received an error:", err)

With faultyFunction sending the error at the end in errCh.

However, I admit errgroup totally works here.

go func() {
currentEpochDuties, err = c.getDutiesForEpoch(ctx, in.Epoch, vals, fetchSyncDuties)
if err != nil {
errCh <- errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch)
return
}
errCh <- nil
}()

nextEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch+1, vals, fetchSyncDuties)
if err != nil {
return nil, errors.Wrapf(err, "failed to get duties for next epoch `%d`", in.Epoch+1)
}

if err = <-errCh; err != nil {
return nil, err
}

return &ethpb.DutiesResponse{
CurrentEpochDuties: currentEpochDuties,
NextEpochDuties: nextEpochDuties,
Expand All @@ -71,103 +79,117 @@ func (c beaconApiValidatorClient) getDuties(ctx context.Context, in *ethpb.Dutie
func (c beaconApiValidatorClient) getDutiesForEpoch(
ctx context.Context,
epoch primitives.Epoch,
multipleValidatorStatus *ethpb.MultipleValidatorStatusResponse,
vals []validatorForDuty,
fetchSyncDuties bool,
) ([]*ethpb.DutiesResponse_Duty, error) {
attesterDuties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, multipleValidatorStatus.Indices)
if err != nil {
return nil, errors.Wrapf(err, "failed to get attester duties for epoch `%d`", epoch)
indices := make([]primitives.ValidatorIndex, len(vals))
for i, v := range vals {
indices[i] = v.index
}

var syncDuties []*structs.SyncCommitteeDuty
if fetchSyncDuties {
if syncDuties, err = c.dutiesProvider.GetSyncDuties(ctx, epoch, multipleValidatorStatus.Indices); err != nil {
return nil, errors.Wrapf(err, "failed to get sync duties for epoch `%d`", epoch)
}
}

var proposerDuties []*structs.ProposerDuty
if proposerDuties, err = c.dutiesProvider.GetProposerDuties(ctx, epoch); err != nil {
return nil, errors.Wrapf(err, "failed to get proposer duties for epoch `%d`", epoch)
}

committees, err := c.dutiesProvider.GetCommittees(ctx, epoch)
if err != nil {
return nil, errors.Wrapf(err, "failed to get committees for epoch `%d`", epoch)
}
slotCommittees := make(map[string]uint64)
for _, c := range committees {
n, ok := slotCommittees[c.Slot]
if !ok {
n = 0
}
slotCommittees[c.Slot] = n + 1
}
// Below variables MUST NOT be used in the main function before wg.Wait().
// This is because they are populated in goroutines and wg.Wait()
// will return only once all goroutines finish their execution.

// Mapping from a validator index to its attesting committee's index and slot
attesterDutiesMapping := make(map[primitives.ValidatorIndex]committeeIndexSlotPair)
for _, attesterDuty := range attesterDuties {
validatorIndex, err := strconv.ParseUint(attesterDuty.ValidatorIndex, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse attester validator index `%s`", attesterDuty.ValidatorIndex)
}
// Set containing all validator indices that are part of a sync committee for this epoch
syncDutiesMapping := make(map[primitives.ValidatorIndex]bool)
// Mapping from a validator index to its proposal slot
proposerDutySlots := make(map[primitives.ValidatorIndex][]primitives.Slot)
// Mapping from the {committeeIndex, slot} to each of the committee's validator indices
committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex)

slot, err := strconv.ParseUint(attesterDuty.Slot, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse attester slot `%s`", attesterDuty.Slot)
}
var wg errgroup.Group

committeeIndex, err := strconv.ParseUint(attesterDuty.CommitteeIndex, 10, 64)
wg.Go(func() error {
attesterDuties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, indices)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse attester committee index `%s`", attesterDuty.CommitteeIndex)
return errors.Wrapf(err, "failed to get attester duties for epoch `%d`", epoch)
}

attesterDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = committeeIndexSlotPair{
slot: primitives.Slot(slot),
committeeIndex: primitives.CommitteeIndex(committeeIndex),
for _, attesterDuty := range attesterDuties {
validatorIndex, err := strconv.ParseUint(attesterDuty.ValidatorIndex, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse attester validator index `%s`", attesterDuty.ValidatorIndex)
}
slot, err := strconv.ParseUint(attesterDuty.Slot, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse attester slot `%s`", attesterDuty.Slot)
}
committeeIndex, err := strconv.ParseUint(attesterDuty.CommitteeIndex, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse attester committee index `%s`", attesterDuty.CommitteeIndex)
}
attesterDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = committeeIndexSlotPair{
slot: primitives.Slot(slot),
committeeIndex: primitives.CommitteeIndex(committeeIndex),
}
}
return nil
})

if fetchSyncDuties {
wg.Go(func() error {
syncDuties, err := c.dutiesProvider.GetSyncDuties(ctx, epoch, indices)
if err != nil {
return errors.Wrapf(err, "failed to get sync duties for epoch `%d`", epoch)
}

for _, syncDuty := range syncDuties {
validatorIndex, err := strconv.ParseUint(syncDuty.ValidatorIndex, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse sync validator index `%s`", syncDuty.ValidatorIndex)
}
syncDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = true
}
return nil
})
}

// Mapping from a validator index to its proposal slot
proposerDutySlots := make(map[primitives.ValidatorIndex][]primitives.Slot)
for _, proposerDuty := range proposerDuties {
validatorIndex, err := strconv.ParseUint(proposerDuty.ValidatorIndex, 10, 64)
wg.Go(func() error {
proposerDuties, err := c.dutiesProvider.GetProposerDuties(ctx, epoch)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse proposer validator index `%s`", proposerDuty.ValidatorIndex)
return errors.Wrapf(err, "failed to get proposer duties for epoch `%d`", epoch)
}

slot, err := strconv.ParseUint(proposerDuty.Slot, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse proposer slot `%s`", proposerDuty.Slot)
for _, proposerDuty := range proposerDuties {
validatorIndex, err := strconv.ParseUint(proposerDuty.ValidatorIndex, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse proposer validator index `%s`", proposerDuty.ValidatorIndex)
}
slot, err := strconv.ParseUint(proposerDuty.Slot, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse proposer slot `%s`", proposerDuty.Slot)
}
proposerDutySlots[primitives.ValidatorIndex(validatorIndex)] =
append(proposerDutySlots[primitives.ValidatorIndex(validatorIndex)], primitives.Slot(slot))
}
return nil
})

proposerDutySlots[primitives.ValidatorIndex(validatorIndex)] = append(proposerDutySlots[primitives.ValidatorIndex(validatorIndex)], primitives.Slot(slot))
committees, err := c.dutiesProvider.GetCommittees(ctx, epoch)
if err != nil {
return nil, errors.Wrapf(err, "failed to get committees for epoch `%d`", epoch)
}

// Set containing all validator indices that are part of a sync committee for this epoch
syncDutiesMapping := make(map[primitives.ValidatorIndex]bool)
for _, syncDuty := range syncDuties {
validatorIndex, err := strconv.ParseUint(syncDuty.ValidatorIndex, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse sync validator index `%s`", syncDuty.ValidatorIndex)
slotCommittees := make(map[string]uint64)
for _, c := range committees {
n, ok := slotCommittees[c.Slot]
if !ok {
n = 0
}

syncDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = true
slotCommittees[c.Slot] = n + 1
}

// Mapping from the {committeeIndex, slot} to each of the committee's validator indices
committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex)
for _, committee := range committees {
committeeIndex, err := strconv.ParseUint(committee.Index, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse committee index `%s`", committee.Index)
}

slot, err := strconv.ParseUint(committee.Slot, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse slot `%s`", committee.Slot)
}

validatorIndices := make([]primitives.ValidatorIndex, len(committee.Validators))
for index, validatorIndexString := range committee.Validators {
validatorIndex, err := strconv.ParseUint(validatorIndexString, 10, 64)
Expand All @@ -176,24 +198,26 @@ func (c beaconApiValidatorClient) getDutiesForEpoch(
}
validatorIndices[index] = primitives.ValidatorIndex(validatorIndex)
}

key := committeeIndexSlotPair{
committeeIndex: primitives.CommitteeIndex(committeeIndex),
slot: primitives.Slot(slot),
}
committeeMapping[key] = validatorIndices
}

duties := make([]*ethpb.DutiesResponse_Duty, len(multipleValidatorStatus.Statuses))
for index, validatorStatus := range multipleValidatorStatus.Statuses {
validatorIndex := multipleValidatorStatus.Indices[index]
pubkey := multipleValidatorStatus.PublicKeys[index]
if err = wg.Wait(); err != nil {
return nil, err
}

var attesterSlot primitives.Slot
var committeeIndex primitives.CommitteeIndex
var committeeValidatorIndices []primitives.ValidatorIndex
duties := make([]*ethpb.DutiesResponse_Duty, len(vals))
for i, v := range vals {
var (
attesterSlot primitives.Slot
committeeIndex primitives.CommitteeIndex
committeeValidatorIndices []primitives.ValidatorIndex
)

if committeeMappingKey, ok := attesterDutiesMapping[validatorIndex]; ok {
if committeeMappingKey, ok := attesterDutiesMapping[v.index]; ok {
committeeIndex = committeeMappingKey.committeeIndex
attesterSlot = committeeMappingKey.slot

Expand All @@ -202,22 +226,67 @@ func (c beaconApiValidatorClient) getDutiesForEpoch(
}
}

duties[index] = &ethpb.DutiesResponse_Duty{
duties[i] = &ethpb.DutiesResponse_Duty{
Committee: committeeValidatorIndices,
CommitteeIndex: committeeIndex,
AttesterSlot: attesterSlot,
ProposerSlots: proposerDutySlots[validatorIndex],
PublicKey: pubkey,
Status: validatorStatus.Status,
ValidatorIndex: validatorIndex,
IsSyncCommittee: syncDutiesMapping[validatorIndex],
ProposerSlots: proposerDutySlots[v.index],
PublicKey: v.pubkey,
Status: v.status,
ValidatorIndex: v.index,
IsSyncCommittee: syncDutiesMapping[v.index],
CommitteesAtSlot: slotCommittees[strconv.FormatUint(uint64(attesterSlot), 10)],
}
}

return duties, nil
}

func (c *beaconApiValidatorClient) getValidatorsForDuties(ctx context.Context, pubkeys [][]byte) ([]validatorForDuty, error) {
vals := make([]validatorForDuty, 0, len(pubkeys))
stringPubkeysToPubkeys := make(map[string][]byte, len(pubkeys))
stringPubkeys := make([]string, len(pubkeys))

for i, pk := range pubkeys {
stringPk := hexutil.Encode(pk)
stringPubkeysToPubkeys[stringPk] = pk
stringPubkeys[i] = stringPk
}

statusesWithDuties := []string{validator.ActiveOngoing.String(), validator.ActiveExiting.String()}
stateValidatorsResponse, err := c.stateValidatorsProvider.GetStateValidators(ctx, stringPubkeys, nil, statusesWithDuties)
if err != nil {
return nil, errors.Wrap(err, "failed to get state validators")
}

for _, validatorContainer := range stateValidatorsResponse.Data {
val := validatorForDuty{}

validatorIndex, err := strconv.ParseUint(validatorContainer.Index, 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse validator index %s", validatorContainer.Index)
}
val.index = primitives.ValidatorIndex(validatorIndex)

stringPubkey := validatorContainer.Validator.Pubkey
pubkey, ok := stringPubkeysToPubkeys[stringPubkey]
if !ok {
return nil, errors.Wrapf(err, "returned public key %s not requested", stringPubkey)
}
val.pubkey = pubkey

status, ok := beaconAPITogRPCValidatorStatus[validatorContainer.Status]
if !ok {
return nil, errors.New("invalid validator status " + validatorContainer.Status)
}
val.status = status

vals = append(vals, val)
}

return vals, nil
}

// GetCommittees retrieves the committees for the given epoch
func (c beaconApiDutiesProvider) GetCommittees(ctx context.Context, epoch primitives.Epoch) ([]*structs.Committee, error) {
committeeParams := url.Values{}
Expand Down