Skip to content

Commit

Permalink
Validator: cache domain data calls (#4914)
Browse files Browse the repository at this point in the history
* Use a domain data cache to reduce the number of calls per epoch

* fix fakevalidator

* Refactor to use a feature flag, use proto.clone, move interceptor to its own file

* gofmt

* fix comment

* tune cache slightly

* log if error on domain data

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
prestonvanloon and prylabs-bulldozer[bot] committed Feb 24, 2020
1 parent 855f5d2 commit c0f1a1d
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 16 deletions.
5 changes: 5 additions & 0 deletions shared/featureconfig/config.go
Expand Up @@ -43,6 +43,7 @@ type Flags struct {
DisableStrictAttestationPubsubVerification bool // DisableStrictAttestationPubsubVerification will disabling strict signature verification in pubsub.
DisableUpdateHeadPerAttestation bool // DisableUpdateHeadPerAttestation will disabling update head on per attestation basis.
EnableByteMempool bool // EnaableByteMempool memory management.
EnableDomainDataCache bool // EnableDomainDataCache caches validator calls to DomainData per epoch.

// DisableForkChoice disables using LMD-GHOST fork choice to update
// the head of the chain based on attestations and instead accepts any valid received block
Expand Down Expand Up @@ -169,6 +170,10 @@ func ConfigureValidator(ctx *cli.Context) {
log.Warn("Enabled validator attestation slashing protection.")
cfg.ProtectAttester = true
}
if ctx.GlobalBool(enableDomainDataCacheFlag.Name) {
log.Warn("Enabled domain data cache.")
cfg.EnableDomainDataCache = true
}
Init(cfg)
}

Expand Down
7 changes: 7 additions & 0 deletions shared/featureconfig/flags.go
Expand Up @@ -98,6 +98,11 @@ var (
Name: "enable-byte-mempool",
Usage: "Enable use of sync.Pool for certain byte arrays in the beacon state",
}
enableDomainDataCacheFlag = cli.BoolFlag{
Name: "enable-domain-data-cache",
Usage: "Enable caching of domain data requests per epoch. This feature reduces the total " +
"calls to the beacon node for each assignment.",
}
)

// Deprecated flags list.
Expand Down Expand Up @@ -235,12 +240,14 @@ var ValidatorFlags = append(deprecatedFlags, []cli.Flag{
minimalConfigFlag,
protectAttesterFlag,
protectProposerFlag,
enableDomainDataCacheFlag,
}...)

// E2EValidatorFlags contains a list of the validator feature flags to be tested in E2E.
var E2EValidatorFlags = []string{
"--protect-attester",
"--protect-proposer",
"--enable-domain-data-cache",
}

// BeaconChainFlags contains a list of all the feature flags that apply to the beacon-chain client.
Expand Down
4 changes: 4 additions & 0 deletions validator/client/BUILD.bazel
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"grpc_interceptor.go",
"runner.go",
"service.go",
"validator.go",
Expand All @@ -26,6 +27,8 @@ go_library(
"//shared/slotutil:go_default_library",
"//validator/db:go_default_library",
"//validator/keymanager:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//retry:go_default_library",
Expand All @@ -43,6 +46,7 @@ go_library(
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//credentials:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
],
)
Expand Down
2 changes: 2 additions & 0 deletions validator/client/fake_validator_test.go
Expand Up @@ -98,3 +98,5 @@ func (fv *fakeValidator) ProposeBlock(_ context.Context, slot uint64, pubKey [48
func (fv *fakeValidator) SubmitAggregateAndProof(_ context.Context, slot uint64, pubKey [48]byte) {}

func (fv *fakeValidator) LogAttestationsSubmitted() {}

func (fv *fakeValidator) UpdateDomainDataCaches(context.Context, uint64) {}
31 changes: 31 additions & 0 deletions validator/client/grpc_interceptor.go
@@ -0,0 +1,31 @@
package client

import (
"context"
"time"

"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// This method logs the gRPC backend as well as request duration when the log level is set to debug
// or higher.
func logDebugRequestInfoUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// Shortcut when debug logging is not enabled.
if logrus.GetLevel() < logrus.DebugLevel {
return invoker(ctx, method, req, reply, cc, opts...)
}

var header metadata.MD
opts = append(
opts,
grpc.Header(&header),
)
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.WithField("backend", header["x-backend"]).
WithField("method", method).WithField("duration", time.Now().Sub(start)).
Debug("gRPC request finished.")
return err
}
7 changes: 7 additions & 0 deletions validator/client/runner.go
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
Expand All @@ -28,6 +29,7 @@ type Validator interface {
ProposeBlock(ctx context.Context, slot uint64, pubKey [48]byte)
SubmitAggregateAndProof(ctx context.Context, slot uint64, pubKey [48]byte)
LogAttestationsSubmitted()
UpdateDomainDataCaches(ctx context.Context, slot uint64)
}

// Run the main validator routine. This routine exits if the context is
Expand Down Expand Up @@ -83,6 +85,11 @@ func run(ctx context.Context, v Validator) {
continue
}

// Start fetching domain data for the next epoch.
if helpers.IsEpochEnd(slot) {
go v.UpdateDomainDataCaches(ctx, slot+1)
}

var wg sync.WaitGroup

allRoles, err := v.RolesAt(ctx, slot)
Expand Down
12 changes: 12 additions & 0 deletions validator/client/service.go
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"

"github.com/dgraph-io/ristretto"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
Expand Down Expand Up @@ -110,6 +111,7 @@ func (v *ValidatorService) Start() {
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_retry.UnaryClientInterceptor(),
logDebugRequestInfoUnaryInterceptor,
)),
}
conn, err := grpc.DialContext(v.ctx, v.endpoint, opts...)
Expand All @@ -132,6 +134,15 @@ func (v *ValidatorService) Start() {
}

v.conn = conn
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1280, // number of keys to track.
MaxCost: 128, // maximum cost of cache, 1 item = 1 cost.
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
panic(err)
}

v.validator = &validator{
db: valDB,
validatorClient: ethpb.NewBeaconNodeValidatorClient(v.conn),
Expand All @@ -144,6 +155,7 @@ func (v *ValidatorService) Start() {
emitAccountMetrics: v.emitAccountMetrics,
prevBalance: make(map[[48]byte]uint64),
attLogs: make(map[[32]byte]*attSubmitted),
domainDataCache: cache,
}
go run(v.ctx, v.validator)
}
Expand Down
58 changes: 58 additions & 0 deletions validator/client/validator.go
Expand Up @@ -4,17 +4,23 @@ package client
import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"

"github.com/dgraph-io/ristretto"
"github.com/gogo/protobuf/proto"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotutil"
Expand All @@ -40,6 +46,8 @@ type validator struct {
emitAccountMetrics bool
attLogs map[[32]byte]*attSubmitted
attLogsLock sync.Mutex
domainDataLock sync.Mutex
domainDataCache *ristretto.Cache
}

// Done cleans up the validator.
Expand Down Expand Up @@ -330,3 +338,53 @@ func (v *validator) isAggregator(ctx context.Context, committee []uint64, slot u

return binary.LittleEndian.Uint64(b[:8])%modulo == 0, nil
}

// UpdateDomainDataCaches by making calls for all of the possible domain data. These can change when
// the fork version changes which can happen once per epoch. Although changing for the fork version
// is very rare, a validator should check these data every epoch to be sure the validator is
// participating on the correct fork version.
func (v *validator) UpdateDomainDataCaches(ctx context.Context, slot uint64) {
if !featureconfig.Get().EnableDomainDataCache {
return
}

for _, d := range [][]byte{
params.BeaconConfig().DomainRandao,
params.BeaconConfig().DomainBeaconAttester,
params.BeaconConfig().DomainBeaconProposer,
} {
_, err := v.domainData(ctx, helpers.SlotToEpoch(slot), d)
if err != nil {
log.WithError(err).Errorf("Failed to update domain data for domain %v", d)
}
}
}

func (v *validator) domainData(ctx context.Context, epoch uint64, domain []byte) (*ethpb.DomainResponse, error) {
v.domainDataLock.Lock()
defer v.domainDataLock.Unlock()

req := &ethpb.DomainRequest{
Epoch: epoch,
Domain: domain,
}

key := strings.Join([]string{strconv.FormatUint(req.Epoch, 10), hex.EncodeToString(req.Domain)}, ",")

if featureconfig.Get().EnableDomainDataCache {
if val, ok := v.domainDataCache.Get(key); ok {
return proto.Clone(val.(proto.Message)).(*ethpb.DomainResponse), nil
}
}

res, err := v.validatorClient.DomainData(ctx, req)
if err != nil {
return nil, err
}

if featureconfig.Get().EnableDomainDataCache {
v.domainDataCache.Set(key, proto.Clone(res), 1)
}

return res, nil
}
5 changes: 1 addition & 4 deletions validator/client/validator_aggregate.go
Expand Up @@ -104,10 +104,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot uint64, pu
// This implements selection logic outlined in:
// https://github.com/ethereum/eth2.0-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#aggregation-selection
func (v *validator) signSlot(ctx context.Context, pubKey [48]byte, slot uint64) ([]byte, error) {
domain, err := v.validatorClient.DomainData(ctx, &ethpb.DomainRequest{
Epoch: helpers.SlotToEpoch(slot),
Domain: params.BeaconConfig().DomainBeaconAttester,
})
domain, err := v.domainData(ctx, helpers.SlotToEpoch(slot), params.BeaconConfig().DomainBeaconAttester)
if err != nil {
return nil, err
}
Expand Down
5 changes: 1 addition & 4 deletions validator/client/validator_attest.go
Expand Up @@ -199,10 +199,7 @@ func (v *validator) duty(pubKey [48]byte) (*ethpb.DutiesResponse_Duty, error) {

// Given validator's public key, this returns the signature of an attestation data.
func (v *validator) signAtt(ctx context.Context, pubKey [48]byte, data *ethpb.AttestationData) ([]byte, error) {
domain, err := v.validatorClient.DomainData(ctx, &ethpb.DomainRequest{
Epoch: data.Target.Epoch,
Domain: params.BeaconConfig().DomainBeaconAttester,
})
domain, err := v.domainData(ctx, data.Target.Epoch, params.BeaconConfig().DomainBeaconAttester)
if err != nil {
return nil, err
}
Expand Down
11 changes: 3 additions & 8 deletions validator/client/validator_propose.go
Expand Up @@ -172,10 +172,8 @@ func (v *validator) ProposeExit(ctx context.Context, exit *ethpb.VoluntaryExit)

// Sign randao reveal with randao domain and private key.
func (v *validator) signRandaoReveal(ctx context.Context, pubKey [48]byte, epoch uint64) ([]byte, error) {
domain, err := v.validatorClient.DomainData(ctx, &ethpb.DomainRequest{
Epoch: epoch,
Domain: params.BeaconConfig().DomainRandao,
})
domain, err := v.domainData(ctx, epoch, params.BeaconConfig().DomainRandao)

if err != nil {
return nil, errors.Wrap(err, "could not get domain data")
}
Expand All @@ -190,10 +188,7 @@ func (v *validator) signRandaoReveal(ctx context.Context, pubKey [48]byte, epoch

// Sign block with proposer domain and private key.
func (v *validator) signBlock(ctx context.Context, pubKey [48]byte, epoch uint64, b *ethpb.BeaconBlock) ([]byte, error) {
domain, err := v.validatorClient.DomainData(ctx, &ethpb.DomainRequest{
Epoch: epoch,
Domain: params.BeaconConfig().DomainBeaconProposer,
})
domain, err := v.domainData(ctx, epoch, params.BeaconConfig().DomainBeaconProposer)
if err != nil {
return nil, errors.Wrap(err, "could not get domain data")
}
Expand Down

0 comments on commit c0f1a1d

Please sign in to comment.