Skip to content

Commit

Permalink
Validator Slashing Protection DB (#4389)
Browse files Browse the repository at this point in the history
* Begin adding DB to validator client

Begin adding ValidatorProposalHistory

Implement most of proposal history

Finish tests

Fix marking a proposal for the first time

Change proposalhistory to not using bit shifting

Add pb.go

Change after proto/slashing added

Finally fix protos

Fix most tests

Fix all tests for double proposal protection

Start initialiing DB in validator client

Add db to validator struct

Add DB to ProposeBlock

Fix test errors and begin mocking

Fix test formatting and pass test for validator protection!

Fix merge issues

Fix renames

Fix tests

* Fix tests

* Fix first startup on DB

* Fix nil check tests

* Fix E2E

* Fix e2e flag

* Fix comments

* Fix for comments

* Move proposal hepers to validator/client to keep DB clean

* Add clear-db flag to validator client

* Fix formatting

* Clear out unintended changes

* Fix build issues

* Fix build issues

* Gazelle

* Fix mock test

* Remove proposal history

* Add terminal confirmation to DB clearing

* Add interface for validatorDB, add context to DB functions

* Add force-clear-db flag

* Cleanup

* Update validator/node/node.go

Co-Authored-By: Raul Jordan <raul@prysmaticlabs.com>

* Change db to clear file, not whole folder

* Fix db test

* Fix teardown test

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
  • Loading branch information
0xKiwi and rauljordan committed Jan 8, 2020
1 parent a69cb5c commit 4ab0a91
Show file tree
Hide file tree
Showing 18 changed files with 589 additions and 0 deletions.
1 change: 1 addition & 0 deletions endtoend/validator.go
Expand Up @@ -58,6 +58,7 @@ func initializeValidators(
fmt.Sprintf("--interop-num-validators=%d", validatorsPerNode),
fmt.Sprintf("--interop-start-index=%d", validatorsPerNode*n),
fmt.Sprintf("--monitoring-port=%d", 9080+n),
fmt.Sprintf("--datadir=%s/eth2-val-%d", tmpPath, n),
fmt.Sprintf("--beacon-rpc-provider=localhost:%d", 4000+n),
}
if config.minimalConfig {
Expand Down
2 changes: 2 additions & 0 deletions validator/client/BUILD.bazel
Expand Up @@ -22,6 +22,7 @@ go_library(
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/slotutil:go_default_library",
"//validator/db:go_default_library",
"//validator/keymanager:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library",
Expand Down Expand Up @@ -64,6 +65,7 @@ go_test(
"//shared/roughtime:go_default_library",
"//shared/testutil:go_default_library",
"//validator/accounts:go_default_library",
"//validator/db:go_default_library",
"//validator/internal:go_default_library",
"//validator/keymanager:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
Expand Down
18 changes: 18 additions & 0 deletions validator/client/service.go
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
Expand All @@ -28,13 +29,15 @@ type ValidatorService struct {
conn *grpc.ClientConn
endpoint string
withCert string
dataDir string
keyManager keymanager.KeyManager
logValidatorBalances bool
}

// Config for the validator service.
type Config struct {
Endpoint string
DataDir string
CertFlag string
GraffitiFlag string
KeyManager keymanager.KeyManager
Expand All @@ -50,6 +53,7 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
cancel: cancel,
endpoint: cfg.Endpoint,
withCert: cfg.CertFlag,
dataDir: cfg.DataDir,
graffiti: []byte(cfg.GraffitiFlag),
keyManager: cfg.KeyManager,
logValidatorBalances: cfg.LogValidatorBalances,
Expand Down Expand Up @@ -92,8 +96,22 @@ func (v *ValidatorService) Start() {
return
}
log.Info("Successfully started gRPC connection")

pubkeys, err := v.keyManager.FetchValidatingKeys()
if err != nil {
log.Errorf("Could not get validating keys: %v", err)
return
}

valDB, err := db.NewKVStore(v.dataDir, pubkeys)
if err != nil {
log.Errorf("Could not initialize db: %v", err)
return
}

v.conn = conn
v.validator = &validator{
db: valDB,
validatorClient: ethpb.NewBeaconNodeValidatorClient(v.conn),
beaconClient: ethpb.NewBeaconChainClient(v.conn),
aggregatorClient: pb.NewAggregatorServiceClient(v.conn),
Expand Down
2 changes: 2 additions & 0 deletions validator/client/validator.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
Expand All @@ -26,6 +27,7 @@ import (
type validator struct {
genesisTime uint64
ticker *slotutil.SlotTicker
db *db.Store
duties *ethpb.DutiesResponse
validatorClient ethpb.BeaconNodeValidatorClient
beaconClient ethpb.BeaconChainClient
Expand Down
3 changes: 3 additions & 0 deletions validator/client/validator_propose_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/golang/mock/gomock"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/internal"
logTest "github.com/sirupsen/logrus/hooks/test"
)
Expand All @@ -18,12 +19,14 @@ type mocks struct {
}

func setup(t *testing.T) (*validator, *mocks, func()) {
valDB := db.SetupDB(t, [][48]byte{validatorPubKey})
ctrl := gomock.NewController(t)
m := &mocks{
validatorClient: internal.NewMockBeaconNodeValidatorClient(ctrl),
aggregatorClient: internal.NewMockAggregatorServiceClient(ctrl),
}
validator := &validator{
db: valDB,
validatorClient: m.validatorClient,
aggregatorClient: m.aggregatorClient,
keyManager: testKeyManager,
Expand Down
38 changes: 38 additions & 0 deletions validator/db/BUILD.bazel
@@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = [
"db.go",
"proposal_history.go",
"schema.go",
"setup_db.go",
],
importpath = "github.com/prysmaticlabs/prysm/validator/db",
visibility = ["//validator:__subpackages__"],
deps = [
"//proto/slashing:go_default_library",
"//shared/params:go_default_library",
"//validator/db/iface:go_default_library",
"@com_github_boltdb_bolt//:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = [
"proposal_history_test.go",
"setup_db_test.go",
],
embed = [":go_default_library"],
deps = [
"//proto/slashing:go_default_library",
"//shared/params:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)
123 changes: 123 additions & 0 deletions validator/db/db.go
@@ -0,0 +1,123 @@
package db

import (
"context"
"os"
"path/filepath"
"time"

"github.com/boltdb/bolt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/validator/db/iface"
"github.com/sirupsen/logrus"
)

var log = logrus.WithField("prefix", "db")

var _ = iface.ValidatorDB(&Store{})

var databaseFileName = "validator.db"

// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for eth2.
type Store struct {
db *bolt.DB
databasePath string
}

// Close closes the underlying boltdb database.
func (db *Store) Close() error {
return db.db.Close()
}

func (db *Store) update(fn func(*bolt.Tx) error) error {
return db.db.Update(fn)
}
func (db *Store) batch(fn func(*bolt.Tx) error) error {
return db.db.Batch(fn)
}
func (db *Store) view(fn func(*bolt.Tx) error) error {
return db.db.View(fn)
}

// ClearDB removes the previously stored directory at the data directory.
func (db *Store) ClearDB() error {
if _, err := os.Stat(db.databasePath); os.IsNotExist(err) {
return nil
}
return os.Remove(filepath.Join(db.databasePath, databaseFileName))
}

// DatabasePath at which this database writes files.
func (db *Store) DatabasePath() string {
return db.databasePath
}

func createBuckets(tx *bolt.Tx, buckets ...[]byte) error {
for _, bucket := range buckets {
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return err
}
}
return nil
}

// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(dirPath string, pubkeys [][48]byte) (*Store, error) {
if err := os.MkdirAll(dirPath, 0700); err != nil {
return nil, err
}
datafile := filepath.Join(dirPath, databaseFileName)
boltDB, err := bolt.Open(datafile, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
if err == bolt.ErrTimeout {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
}
return nil, err
}

kv := &Store{db: boltDB, databasePath: dirPath}

if err := kv.db.Update(func(tx *bolt.Tx) error {
return createBuckets(
tx,
historicProposalsBucket,
validatorsMinMaxSpanBucket,
)
}); err != nil {
return nil, err
}

// Initialize the required pubkeys into the DB to ensure they're not empty.
for _, pubkey := range pubkeys {
history, err := kv.ProposalHistory(context.Background(), pubkey[:])
if err != nil {
return nil, err
}
if history == nil {
cleanHistory := &slashpb.ProposalHistory{
EpochBits: bitfield.NewBitlist(params.BeaconConfig().WeakSubjectivityPeriod),
}
if err := kv.SaveProposalHistory(context.Background(), pubkey[:], cleanHistory); err != nil {
return nil, err
}
}
}

return kv, err
}

// Size returns the db size in bytes.
func (db *Store) Size() (int64, error) {
var size int64
err := db.db.View(func(tx *bolt.Tx) error {
size = tx.Size()
return nil
})
return size, err
}
10 changes: 10 additions & 0 deletions validator/db/iface/BUILD.bazel
@@ -0,0 +1,10 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["interface.go"],
importpath = "github.com/prysmaticlabs/prysm/validator/db/iface",
# Other packages must use github.com/prysmaticlabs/prysm/validator/db.Database alias.
visibility = ["//validator/db:__subpackages__"],
deps = ["//proto/slashing:go_default_library"],
)
20 changes: 20 additions & 0 deletions validator/db/iface/interface.go
@@ -0,0 +1,20 @@
// Package iface exists to prevent circular dependencies when implementing the database interface.
package iface

import (
"context"
"io"

slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
)

// ValidatorDB defines the necessary methods for a Prysm validator DB.
type ValidatorDB interface {
io.Closer
DatabasePath() string
ClearDB() error
// Proposer protection related methods.
ProposalHistory(ctx context.Context, publicKey []byte) (*slashpb.ProposalHistory, error)
SaveProposalHistory(ctx context.Context, publicKey []byte, history *slashpb.ProposalHistory) error
DeleteProposalHistory(ctx context.Context, publicKey []byte) error
}
71 changes: 71 additions & 0 deletions validator/db/proposal_history.go
@@ -0,0 +1,71 @@
package db

import (
"context"

"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"go.opencensus.io/trace"
)

func unmarshalProposalHistory(enc []byte) (*slashpb.ProposalHistory, error) {
history := &slashpb.ProposalHistory{}
err := proto.Unmarshal(enc, history)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding")
}
return history, nil
}

// ProposalHistory accepts a validator public key and returns the corresponding proposal history.
// Returns nil if there is no proposal history for the validator.
func (db *Store) ProposalHistory(ctx context.Context, publicKey []byte) (*slashpb.ProposalHistory, error) {
ctx, span := trace.StartSpan(ctx, "Validator.ProposalHistory")
defer span.End()

var err error
var proposalHistory *slashpb.ProposalHistory
err = db.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
enc := bucket.Get(publicKey)
if enc == nil {
return nil
}
proposalHistory, err = unmarshalProposalHistory(enc)
return err
})
return proposalHistory, err
}

// SaveProposalHistory returns the proposal history for the requested validator public key.
func (db *Store) SaveProposalHistory(ctx context.Context, pubKey []byte, proposalHistory *slashpb.ProposalHistory) error {
ctx, span := trace.StartSpan(ctx, "Validator.SaveProposalHistory")
defer span.End()

enc, err := proto.Marshal(proposalHistory)
if err != nil {
return errors.Wrap(err, "failed to encode proposal history")
}

err = db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
return bucket.Put(pubKey, enc)
})
return err
}

// DeleteProposalHistory deletes the proposal history for the corresponding validator public key.
func (db *Store) DeleteProposalHistory(ctx context.Context, pubkey []byte) error {
ctx, span := trace.StartSpan(ctx, "Validator.DeleteProposalHistory")
defer span.End()

return db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
if err := bucket.Delete(pubkey); err != nil {
return errors.Wrap(err, "failed to delete the proposal history")
}
return nil
})
}

0 comments on commit 4ab0a91

Please sign in to comment.