Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve vc logs #13573

Merged
merged 11 commits into from Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion validator/accounts/accounts_delete.go
Expand Up @@ -55,7 +55,7 @@ func (acm *CLIManager) Delete(ctx context.Context) error {
}); err != nil {
return err
}
log.WithField("publicKeys", allAccountStr).Warn(
log.WithField("pubkeys", allAccountStr).Warn(
"Attempted to delete accounts. IMPORTANT: please run `validator accounts list` to ensure " +
"the public keys are indeed deleted. If they are still there, please file an issue at " +
"https://github.com/prysmaticlabs/prysm/issues/new")
Expand Down
2 changes: 1 addition & 1 deletion validator/accounts/accounts_exit.go
Expand Up @@ -171,7 +171,7 @@ func displayExitInfo(rawExitedKeys [][]byte, trimmedExitedKeys []string) {
info := fmt.Sprintf("Voluntary exit was successful for the accounts listed. "+
"URLs where you can track each validator's exit:\n"+strings.Repeat("%s\n", len(ifaceKeys)), ifaceKeys...)

log.WithField("publicKeys", strings.Join(trimmedExitedKeys, ", ")).Info(info)
log.WithField("pubkeys", strings.Join(trimmedExitedKeys, ", ")).Info(info)
} else {
log.Info("No successful voluntary exits")
}
Expand Down
4 changes: 2 additions & 2 deletions validator/accounts/testing/mock.go
Expand Up @@ -92,7 +92,7 @@ type Validator struct {
proposerSettings *validatorserviceconfig.ProposerSettings
}

func (_ *Validator) LogSyncCommitteeMessagesSubmitted() {}
func (_ *Validator) LogSubmittedSyncCommitteeMessages() {}

func (_ *Validator) Done() {
panic("implement me")
Expand Down Expand Up @@ -154,7 +154,7 @@ func (_ *Validator) SubmitSignedContributionAndProof(_ context.Context, _ primit
panic("implement me")
}

func (_ *Validator) LogAttestationsSubmitted() {
func (_ *Validator) LogSubmittedAtts(_ primitives.Slot) {
panic("implement me")
}

Expand Down
15 changes: 1 addition & 14 deletions validator/client/aggregate.go
Expand Up @@ -112,7 +112,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives
return
}

if err := v.addIndicesToLog(duty); err != nil {
if err := v.saveSubmittedAtt(res.AggregateAndProof.Aggregate.Data, pubKey[:], true); err != nil {
log.WithError(err).Error("Could not add aggregator indices to logs")
if v.emitAccountMetrics {
ValidatorAggFailVec.WithLabelValues(fmtKey).Inc()
Expand Down Expand Up @@ -204,16 +204,3 @@ func (v *validator) aggregateAndProofSig(ctx context.Context, pubKey [fieldparam

return sig.Marshal(), nil
}

func (v *validator) addIndicesToLog(duty *ethpb.DutiesResponse_Duty) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

for _, log := range v.attLogs {
if duty.CommitteeIndex == log.data.CommitteeIndex {
log.aggregatorIndices = append(log.aggregatorIndices, duty.ValidatorIndex)
}
}

return nil
}
44 changes: 12 additions & 32 deletions validator/client/attest.go
Expand Up @@ -3,19 +3,18 @@ package client
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v4/async"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v4/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/crypto/hash"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -55,7 +54,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
defer lock.Unlock()

fmtKey := fmt.Sprintf("%#x", pubKey[:])
log := log.WithField("pubKey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).WithField("slot", slot)
log := log.WithField("pubkey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).WithField("slot", slot)
duty, err := v.duty(pubKey)
if err != nil {
log.WithError(err).Error("Could not fetch validator assignment")
Expand Down Expand Up @@ -154,7 +153,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
return
}

if err := v.saveAttesterIndexToData(data, duty.ValidatorIndex); err != nil {
if err := v.saveSubmittedAtt(data, pubKey[:], false); err != nil {
log.WithError(err).Error("Could not save validator index for logging")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
Expand Down Expand Up @@ -228,25 +227,6 @@ func (v *validator) getDomainAndSigningRoot(ctx context.Context, data *ethpb.Att
return domain, root, nil
}

// For logging, this saves the last submitted attester index to its attestation data. The purpose of this
// is to enhance attesting logs to be readable when multiple validator keys ran in a single client.
func (v *validator) saveAttesterIndexToData(data *ethpb.AttestationData, index primitives.ValidatorIndex) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

h, err := hash.Proto(data)
if err != nil {
return err
}

if v.attLogs[h] == nil {
v.attLogs[h] = &attSubmitted{data, []primitives.ValidatorIndex{}, []primitives.ValidatorIndex{}}
}
v.attLogs[h] = &attSubmitted{data, append(v.attLogs[h].attesterIndices, index), []primitives.ValidatorIndex{}}

return nil
}

// highestSlot returns the highest slot with a valid block seen by the validator
func (v *validator) highestSlot() primitives.Slot {
v.highestValidSlotLock.Lock()
Expand Down Expand Up @@ -313,14 +293,14 @@ func (v *validator) waitOneThirdOrValidBlock(ctx context.Context, slot primitive

func attestationLogFields(pubKey [fieldparams.BLSPubkeyLength]byte, indexedAtt *ethpb.IndexedAttestation) logrus.Fields {
return logrus.Fields{
"attesterPublicKey": fmt.Sprintf("%#x", pubKey),
"attestationSlot": indexedAtt.Data.Slot,
"committeeIndex": indexedAtt.Data.CommitteeIndex,
"beaconBlockRoot": fmt.Sprintf("%#x", indexedAtt.Data.BeaconBlockRoot),
"sourceEpoch": indexedAtt.Data.Source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", indexedAtt.Data.Source.Root),
"targetEpoch": indexedAtt.Data.Target.Epoch,
"targetRoot": fmt.Sprintf("%#x", indexedAtt.Data.Target.Root),
"signature": fmt.Sprintf("%#x", indexedAtt.Signature),
"pubkey": fmt.Sprintf("%#x", pubKey),
"slot": indexedAtt.Data.Slot,
"committeeIndex": indexedAtt.Data.CommitteeIndex,
"blockRoot": fmt.Sprintf("%#x", indexedAtt.Data.BeaconBlockRoot),
"sourceEpoch": indexedAtt.Data.Source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", indexedAtt.Data.Source.Root),
"targetEpoch": indexedAtt.Data.Target.Epoch,
"targetRoot": fmt.Sprintf("%#x", indexedAtt.Data.Target.Root),
"signature": fmt.Sprintf("%#x", indexedAtt.Signature),
}
}
4 changes: 2 additions & 2 deletions validator/client/iface/validator.go
Expand Up @@ -52,8 +52,8 @@ type Validator interface {
SubmitAggregateAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
SubmitSyncCommitteeMessage(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
SubmitSignedContributionAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
LogAttestationsSubmitted()
LogSyncCommitteeMessagesSubmitted()
LogSubmittedAtts(slot primitives.Slot)
LogSubmittedSyncCommitteeMessages()
UpdateDomainDataCaches(ctx context.Context, slot primitives.Slot)
WaitForKeymanagerInitialization(ctx context.Context) error
Keymanager() (keymanager.IKeymanager, error)
Expand Down
136 changes: 116 additions & 20 deletions validator/client/log.go
Expand Up @@ -2,46 +2,142 @@ package client

import (
"fmt"
"strconv"
"sync/atomic"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
)

var log = logrus.WithField("prefix", "validator")
var log = logrus.WithField("prefix", "client")

type attSubmitted struct {
data *ethpb.AttestationData
attesterIndices []primitives.ValidatorIndex
aggregatorIndices []primitives.ValidatorIndex
type submittedAttData struct {
beaconBlockRoot []byte
source *ethpb.Checkpoint
target *ethpb.Checkpoint
}

// LogAttestationsSubmitted logs info about submitted attestations.
func (v *validator) LogAttestationsSubmitted() {
type submittedAtt struct {
data submittedAttData
pubkeys [][]byte
committees []primitives.CommitteeIndex
}

// submittedAttKey is defined as a concatenation of:
// - AttestationData.BeaconBlockRoot
// - AttestationData.Source.HashTreeRoot()
// - AttestationData.Target.HashTreeRoot()
type submittedAttKey [96]byte

func (k submittedAttKey) FromAttData(data *ethpb.AttestationData) error {
sourceRoot, err := data.Source.HashTreeRoot()
if err != nil {
return err
}
targetRoot, err := data.Target.HashTreeRoot()
if err != nil {
return err
}
copy(k[0:], data.BeaconBlockRoot)
copy(k[32:], sourceRoot[:])
copy(k[64:], targetRoot[:])
return nil
}

// saveSubmittedAtt saves the submitted attestation data along with the attester's pubkey.
// The purpose of this is to display combined attesting logs for all keys managed by the validator client.
func (v *validator) saveSubmittedAtt(data *ethpb.AttestationData, pubkey []byte, isAggregate bool) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

key := submittedAttKey{}
if err := key.FromAttData(data); err != nil {
return errors.Wrapf(err, "could not create submitted attestation key")
}
d := submittedAttData{
beaconBlockRoot: data.BeaconBlockRoot,
source: data.Source,
target: data.Target,
}

var submittedAtts map[submittedAttKey]*submittedAtt
if isAggregate {
submittedAtts = v.submittedAggregates
} else {
submittedAtts = v.submittedAtts
}

if submittedAtts[key] == nil {
submittedAtts[key] = &submittedAtt{
d,
[][]byte{},
[]primitives.CommitteeIndex{},
}
}
submittedAtts[key] = &submittedAtt{
d,
append(submittedAtts[key].pubkeys, pubkey),
append(submittedAtts[key].committees, data.CommitteeIndex),
}

return nil
}

// LogSubmittedAtts logs info about submitted attestations.
func (v *validator) LogSubmittedAtts(slot primitives.Slot) {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

for _, attLog := range v.attLogs {
for _, attLog := range v.submittedAtts {
pubkeys := make([]string, len(attLog.pubkeys))
for i, p := range attLog.pubkeys {
pubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p))
}
committees := make([]string, len(attLog.committees))
for i, c := range attLog.committees {
committees[i] = strconv.FormatUint(uint64(c), 10)
}
log.WithFields(logrus.Fields{
"Slot": attLog.data.Slot,
"CommitteeIndex": attLog.data.CommitteeIndex,
"BeaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.BeaconBlockRoot)),
"SourceEpoch": attLog.data.Source.Epoch,
"SourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Source.Root)),
"TargetEpoch": attLog.data.Target.Epoch,
"TargetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Target.Root)),
"AttesterIndices": attLog.attesterIndices,
"AggregatorIndices": attLog.aggregatorIndices,
"slot": slot,
"committeeIndices": committees,
"pubkeys": pubkeys,
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.beaconBlockRoot)),
"sourceEpoch": attLog.data.source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.source.Root)),
"targetEpoch": attLog.data.target.Epoch,
"targetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.target.Root)),
}).Info("Submitted new attestations")
}
for _, attLog := range v.submittedAggregates {
pubkeys := make([]string, len(attLog.pubkeys))
for i, p := range attLog.pubkeys {
pubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p))
}
committees := make([]string, len(attLog.committees))
for i, c := range attLog.committees {
committees[i] = strconv.FormatUint(uint64(c), 10)
}
log.WithFields(logrus.Fields{
"slot": slot,
"committeeIndices": committees,
"pubkeys": pubkeys,
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.beaconBlockRoot)),
"sourceEpoch": attLog.data.source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.source.Root)),
"targetEpoch": attLog.data.target.Epoch,
"targetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.target.Root)),
}).Info("Submitted new aggregate attestations")
}

v.attLogs = make(map[[32]byte]*attSubmitted)
v.submittedAtts = make(map[submittedAttKey]*submittedAtt)
v.submittedAggregates = make(map[submittedAttKey]*submittedAtt)
}

// LogSyncCommitteeMessagesSubmitted logs info about submitted sync committee messages.
func (v *validator) LogSyncCommitteeMessagesSubmitted() {
// LogSubmittedSyncCommitteeMessages logs info about submitted sync committee messages.
func (v *validator) LogSubmittedSyncCommitteeMessages() {
log.WithField("messages", v.syncCommitteeStats.totalMessagesSubmitted).Debug("Submitted sync committee messages successfully to beacon node")
// Reset the amount.
atomic.StoreUint64(&v.syncCommitteeStats.totalMessagesSubmitted, 0)
Expand Down
12 changes: 6 additions & 6 deletions validator/client/metrics.go
Expand Up @@ -298,25 +298,25 @@ func (v *validator) logForEachValidator(index int, pubKey []byte, resp *ethpb.Va
if index < len(resp.BalancesBeforeEpochTransition) {
balBeforeEpoch = resp.BalancesBeforeEpochTransition[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing balance before epoch transition")
log.WithField("pubkey", truncatedKey).Warn("Missing balance before epoch transition")
}
if index < len(resp.BalancesAfterEpochTransition) {
balAfterEpoch = resp.BalancesAfterEpochTransition[index]
}
if index < len(resp.CorrectlyVotedSource) {
correctlyVotedSource = resp.CorrectlyVotedSource[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted source")
log.WithField("pubkey", truncatedKey).Warn("Missing correctly voted source")
}
if index < len(resp.CorrectlyVotedTarget) {
correctlyVotedTarget = resp.CorrectlyVotedTarget[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted target")
log.WithField("pubkey", truncatedKey).Warn("Missing correctly voted target")
}
if index < len(resp.CorrectlyVotedHead) {
correctlyVotedHead = resp.CorrectlyVotedHead[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted head")
log.WithField("pubkey", truncatedKey).Warn("Missing correctly voted head")
}

if _, ok := v.startBalances[pubKeyBytes]; !ok {
Expand All @@ -333,7 +333,7 @@ func (v *validator) logForEachValidator(index int, pubKey []byte, resp *ethpb.Va
percentSinceStart := (newBalance - startBalance) / startBalance

previousEpochSummaryFields := logrus.Fields{
"pubKey": truncatedKey,
"pubkey": truncatedKey,
"epoch": prevEpoch,
"correctlyVotedSource": correctlyVotedSource,
"correctlyVotedTarget": correctlyVotedTarget,
Expand All @@ -349,7 +349,7 @@ func (v *validator) logForEachValidator(index int, pubKey []byte, resp *ethpb.Va
if index < len(resp.InactivityScores) {
previousEpochSummaryFields["inactivityScore"] = resp.InactivityScores[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing inactivity score")
log.WithField("pubkey", truncatedKey).Warn("Missing inactivity score")
}
}

Expand Down