diff --git a/validator/accounts/account.go b/validator/accounts/account.go index 4954cf16682..e0ecfb8966d 100644 --- a/validator/accounts/account.go +++ b/validator/accounts/account.go @@ -222,12 +222,12 @@ func HandleEmptyKeystoreFlags(cliCtx *cli.Context, confirmPassword bool) (string } // Merge merges data from validator databases in sourceDirectories into a new store, which is created in targetDirectory. -func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) error { +func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) (err error) { var sourceStores []*db.Store defer func() { for _, store := range sourceStores { - if err := store.Close(); err != nil { - err = errors.Wrapf(err, "Failed to close the database in %s", store.DatabasePath()) + if deferErr := store.Close(); deferErr != nil { + err = errors.Wrapf(deferErr, "Failed to close the database in %s", store.DatabasePath()) } } }() @@ -247,8 +247,7 @@ func Merge(ctx context.Context, sourceDirectories []string, targetDirectory stri return errors.New("no validator databases found in source directories") } - err := db.Merge(ctx, sourceStores, targetDirectory) - if err != nil { + if err := db.Merge(ctx, sourceStores, targetDirectory); err != nil { return errors.Wrapf(err, "Failed to merge validator databases into %s", targetDirectory) } diff --git a/validator/accounts/account_test.go b/validator/accounts/account_test.go index 3b16e5c7e27..6b7366fd86e 100644 --- a/validator/accounts/account_test.go +++ b/validator/accounts/account_test.go @@ -24,22 +24,18 @@ import ( ) type sourceStoresHistory struct { - ProposalEpoch uint64 - FirstStoreFirstPubKeyProposals bitfield.Bitlist - FirstStoreSecondPubKeyProposals bitfield.Bitlist - SecondStoreFirstPubKeyProposals bitfield.Bitlist - SecondStoreSecondPubKeyProposals bitfield.Bitlist - FirstStoreFirstPubKeyAttestations map[uint64]uint64 - FirstStoreSecondPubKeyAttestations map[uint64]uint64 - SecondStoreFirstPubKeyAttestations map[uint64]uint64 - SecondStoreSecondPubKeyAttestations map[uint64]uint64 + ProposalEpoch uint64 + FirstStorePubKeyProposals bitfield.Bitlist + SecondStorePubKeyProposals bitfield.Bitlist + FirstStorePubKeyAttestations map[uint64]uint64 + SecondStorePubKeyAttestations map[uint64]uint64 } func TestNewValidatorAccount_AccountExists(t *testing.T) { directory := testutil.TempDir() + "/testkeystore" defer func() { if err := os.RemoveAll(directory); err != nil { - t.Logf("Could not remove directory: %v", err) + t.Errorf("Could not remove directory: %v", err) } }() validatorKey, err := keystore.NewKey() @@ -176,46 +172,13 @@ func TestChangePassword_KeyNotMatchingOldPasswordNotEncryptedWithNewPassword(t * } } -func TestMerge(t *testing.T) { - firstStorePubKeys := [][48]byte{{1}, {2}} - firstStore := db.SetupDB(t, firstStorePubKeys) - secondStorePubKeys := [][48]byte{{3}, {4}} - secondStore := db.SetupDB(t, secondStorePubKeys) - - history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore) - if err != nil { - t.Fatalf(err.Error()) - } - - if err := firstStore.Close(); err != nil { - t.Fatalf("Closing source store failed: %v", err) - } - if err := secondStore.Close(); err != nil { - t.Fatalf("Closing source store failed: %v", err) - } - - targetDirectory := testutil.TempDir() + "/target" - err = Merge(context.Background(), []string{firstStore.DatabasePath(), secondStore.DatabasePath()}, targetDirectory) - if err != nil { - t.Fatalf("Merging failed: %v", err) - } - mergedStore, err := db.GetKVStore(targetDirectory) - if err != nil { - t.Fatalf("Retrieving the merged store failed: %v", err) - } - - assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history) - - cleanupAfterMerge(t, []string{firstStore.DatabasePath(), secondStore.DatabasePath(), targetDirectory}) -} - func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) { - firstStorePubKeys := [][48]byte{{1}, {2}} - firstStore := db.SetupDB(t, firstStorePubKeys) - secondStorePubKeys := [][48]byte{{3}, {4}} - secondStore := db.SetupDB(t, secondStorePubKeys) + firstStorePubKey := [48]byte{1} + firstStore := db.SetupDB(t, [][48]byte{firstStorePubKey}) + secondStorePubKey := [48]byte{2} + secondStore := db.SetupDB(t, [][48]byte{secondStorePubKey}) - history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore) + history, err := prepareSourcesForMerging(firstStorePubKey, firstStore, secondStorePubKey, secondStore) if err != nil { t.Fatalf(err.Error()) } @@ -232,6 +195,12 @@ func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) { t.Fatalf("Could not create directory %s", sourceDirectoryWithoutStore) } targetDirectory := testutil.TempDir() + "/target" + t.Cleanup(func() { + if err := os.RemoveAll(targetDirectory); err != nil { + t.Errorf("Could not remove target directory : %v", err) + } + }) + err = Merge( context.Background(), []string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore}, targetDirectory) @@ -243,11 +212,7 @@ func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) { t.Fatalf("Retrieving the merged store failed: %v", err) } - assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history) - - cleanupAfterMerge( - t, - []string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore, targetDirectory}) + assertMergedStore(t, mergedStore, firstStorePubKey, secondStorePubKey, history) } func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) { @@ -263,32 +228,29 @@ func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) { if err := os.MkdirAll(targetDirectory, 0700); err != nil { t.Fatalf("Could not create directory %s", targetDirectory) } + t.Cleanup(func() { + for _, dir := range []string{sourceDirectory1, sourceDirectory2, targetDirectory} { + if err := os.RemoveAll(dir); err != nil { + t.Errorf("Could not remove directory : %v", err) + } + } + }) err := Merge(context.Background(), []string{sourceDirectory1, sourceDirectory2}, targetDirectory) expected := "no validator databases found in source directories" if err == nil || !strings.Contains(err.Error(), expected) { t.Errorf("Expected: %s vs received %v", expected, err) } - - cleanupAfterMerge(t, []string{sourceDirectory1, sourceDirectory2, targetDirectory}) } -func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store, secondStorePubKeys [][48]byte, secondStore *db.Store) (*sourceStoresHistory, error) { +func prepareSourcesForMerging(firstStorePubKey [48]byte, firstStore *db.Store, secondStorePubKey [48]byte, secondStore *db.Store) (*sourceStoresHistory, error) { proposalEpoch := uint64(0) proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01} - if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[0][:], proposalEpoch, proposalHistory1); err != nil { + if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKey[:], proposalEpoch, proposalHistory1); err != nil { return nil, errors.Wrapf(err, "Saving proposal history failed") } proposalHistory2 := bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x01} - if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[1][:], proposalEpoch, proposalHistory2); err != nil { - return nil, errors.Wrapf(err, "Saving proposal history failed") - } - proposalHistory3 := bitfield.Bitlist{0x03, 0x00, 0x00, 0x00, 0x01} - if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[0][:], proposalEpoch, proposalHistory3); err != nil { - return nil, errors.Wrapf(err, "Saving proposal history failed") - } - proposalHistory4 := bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x01} - if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[1][:], proposalEpoch, proposalHistory4); err != nil { + if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKey[:], proposalEpoch, proposalHistory2); err != nil { return nil, errors.Wrapf(err, "Saving proposal history failed") } @@ -298,47 +260,29 @@ func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store TargetToSource: attestationHistoryMap1, LatestEpochWritten: 0, } - attestationHistoryMap2 := make(map[uint64]uint64) - attestationHistoryMap2[0] = 1 - pubKeyAttestationHistory2 := &slashpb.AttestationHistory{ - TargetToSource: attestationHistoryMap2, - LatestEpochWritten: 0, - } dbAttestationHistory1 := make(map[[48]byte]*slashpb.AttestationHistory) - dbAttestationHistory1[firstStorePubKeys[0]] = pubKeyAttestationHistory1 - dbAttestationHistory1[firstStorePubKeys[1]] = pubKeyAttestationHistory2 + dbAttestationHistory1[firstStorePubKey] = pubKeyAttestationHistory1 if err := firstStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory1); err != nil { return nil, errors.Wrapf(err, "Saving attestation history failed") } - attestationHistoryMap3 := make(map[uint64]uint64) - attestationHistoryMap3[0] = 2 - pubKeyAttestationHistory3 := &slashpb.AttestationHistory{ - TargetToSource: attestationHistoryMap3, - LatestEpochWritten: 0, - } - attestationHistoryMap4 := make(map[uint64]uint64) - attestationHistoryMap4[0] = 3 - pubKeyAttestationHistory4 := &slashpb.AttestationHistory{ - TargetToSource: attestationHistoryMap4, + attestationHistoryMap2 := make(map[uint64]uint64) + attestationHistoryMap2[0] = 1 + pubKeyAttestationHistory2 := &slashpb.AttestationHistory{ + TargetToSource: attestationHistoryMap2, LatestEpochWritten: 0, } dbAttestationHistory2 := make(map[[48]byte]*slashpb.AttestationHistory) - dbAttestationHistory2[secondStorePubKeys[0]] = pubKeyAttestationHistory3 - dbAttestationHistory2[secondStorePubKeys[1]] = pubKeyAttestationHistory4 + dbAttestationHistory2[secondStorePubKey] = pubKeyAttestationHistory2 if err := secondStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory2); err != nil { return nil, errors.Wrapf(err, "Saving attestation history failed") } mergeHistory := &sourceStoresHistory{ - ProposalEpoch: proposalEpoch, - FirstStoreFirstPubKeyProposals: proposalHistory1, - FirstStoreSecondPubKeyProposals: proposalHistory2, - SecondStoreFirstPubKeyProposals: proposalHistory3, - SecondStoreSecondPubKeyProposals: proposalHistory4, - FirstStoreFirstPubKeyAttestations: attestationHistoryMap1, - FirstStoreSecondPubKeyAttestations: attestationHistoryMap2, - SecondStoreFirstPubKeyAttestations: attestationHistoryMap3, - SecondStoreSecondPubKeyAttestations: attestationHistoryMap4, + ProposalEpoch: proposalEpoch, + FirstStorePubKeyProposals: proposalHistory1, + SecondStorePubKeyProposals: proposalHistory2, + FirstStorePubKeyAttestations: attestationHistoryMap1, + SecondStorePubKeyAttestations: attestationHistoryMap2, } return mergeHistory, nil @@ -347,95 +291,49 @@ func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store func assertMergedStore( t *testing.T, mergedStore *db.Store, - firstStorePubKeys [][48]byte, - secondStorePubKeys [][48]byte, + firstStorePubKey [48]byte, + secondStorePubKey [48]byte, history *sourceStoresHistory) { mergedProposalHistory1, err := mergedStore.ProposalHistoryForEpoch( - context.Background(), firstStorePubKeys[0][:], history.ProposalEpoch) + context.Background(), firstStorePubKey[:], history.ProposalEpoch) if err != nil { - t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[0]) - } else { - if !bytes.Equal(mergedProposalHistory1, history.FirstStoreFirstPubKeyProposals) { - t.Errorf( - "Proposals not merged correctly: expected %v vs received %v", - history.FirstStoreFirstPubKeyProposals, - mergedProposalHistory1) - } + t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKey) } - mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch( - context.Background(), firstStorePubKeys[1][:], history.ProposalEpoch) - if err != nil { - t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[1]) - } else { - if !bytes.Equal(mergedProposalHistory2, history.FirstStoreSecondPubKeyProposals) { - t.Errorf( - "Proposals not merged correctly: expected %v vs received %v", - history.FirstStoreSecondPubKeyProposals, - mergedProposalHistory2) - } + if !bytes.Equal(mergedProposalHistory1, history.FirstStorePubKeyProposals) { + t.Fatalf( + "Proposals not merged correctly: expected %v vs received %v", + history.FirstStorePubKeyProposals, + mergedProposalHistory1) } - mergedProposalHistory3, err := mergedStore.ProposalHistoryForEpoch( - context.Background(), secondStorePubKeys[0][:], history.ProposalEpoch) + mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch( + context.Background(), secondStorePubKey[:], history.ProposalEpoch) if err != nil { - t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[0]) - } else { - if !bytes.Equal(mergedProposalHistory3, history.SecondStoreFirstPubKeyProposals) { - t.Errorf( - "Proposals not merged correctly: expected %v vs received %v", - history.SecondStoreFirstPubKeyProposals, - mergedProposalHistory3) - } + t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKey) } - mergedProposalHistory4, err := mergedStore.ProposalHistoryForEpoch( - context.Background(), secondStorePubKeys[1][:], history.ProposalEpoch) - if err != nil { - t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[1]) - } else { - if !bytes.Equal(mergedProposalHistory4, history.SecondStoreSecondPubKeyProposals) { - t.Errorf("Proposals not merged correctly: expected %v vs received %v", - history.SecondStoreSecondPubKeyProposals, - mergedProposalHistory4) - } + if !bytes.Equal(mergedProposalHistory2, history.SecondStorePubKeyProposals) { + t.Fatalf( + "Proposals not merged correctly: expected %v vs received %v", + history.SecondStorePubKeyProposals, + mergedProposalHistory2) } mergedAttestationHistory, err := mergedStore.AttestationHistoryForPubKeys( context.Background(), - append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1])) + [][48]byte{firstStorePubKey, secondStorePubKey}) if err != nil { - t.Error("Retrieving merged attestation history failed") - } else { - if mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0] != history.FirstStoreFirstPubKeyAttestations[0] { - t.Errorf( - "Attestations not merged correctly: expected %v vs received %v", - history.FirstStoreFirstPubKeyAttestations[0], - mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0]) - } - if mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0] != history.FirstStoreSecondPubKeyAttestations[0] { - t.Errorf( - "Attestations not merged correctly: expected %v vs received %v", - history.FirstStoreSecondPubKeyAttestations, - mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0]) - } - if mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0] != history.SecondStoreFirstPubKeyAttestations[0] { - t.Errorf( - "Attestations not merged correctly: expected %v vs received %v", - history.SecondStoreFirstPubKeyAttestations, - mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0]) - } - if mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0] != history.SecondStoreSecondPubKeyAttestations[0] { - t.Errorf( - "Attestations not merged correctly: expected %v vs received %v", - history.SecondStoreSecondPubKeyAttestations, - mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0]) - } - } -} - -func cleanupAfterMerge(t *testing.T, directories []string) { - for _, dir := range directories { - if err := os.RemoveAll(dir); err != nil { - t.Logf("Could not remove directory %s: %v", dir, err) - } + t.Fatalf("Retrieving merged attestation history failed") + } + if mergedAttestationHistory[firstStorePubKey].TargetToSource[0] != history.FirstStorePubKeyAttestations[0] { + t.Fatalf( + "Attestations not merged correctly: expected %v vs received %v", + history.FirstStorePubKeyAttestations[0], + mergedAttestationHistory[firstStorePubKey].TargetToSource[0]) + } + if mergedAttestationHistory[secondStorePubKey].TargetToSource[0] != history.SecondStorePubKeyAttestations[0] { + t.Fatalf( + "Attestations not merged correctly: expected %v vs received %v", + history.SecondStorePubKeyAttestations, + mergedAttestationHistory[secondStorePubKey].TargetToSource[0]) } } diff --git a/validator/client/service.go b/validator/client/service.go index 36519aa79e3..0e0f04870b5 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -114,7 +114,7 @@ func (v *ValidatorService) Start() { return } - valDB, err := db.NewKVStoreWithPublicKeyBuckets(v.dataDir, pubkeys) + valDB, err := db.NewKVStore(v.dataDir, pubkeys) if err != nil { log.Errorf("Could not initialize db: %v", err) return diff --git a/validator/db/BUILD.bazel b/validator/db/BUILD.bazel index d8234511a7b..bd8589c7c42 100644 --- a/validator/db/BUILD.bazel +++ b/validator/db/BUILD.bazel @@ -31,6 +31,7 @@ go_test( name = "go_default_test", srcs = [ "attestation_history_test.go", + "manage_test.go", "proposal_history_test.go", "setup_db_test.go", ], @@ -39,6 +40,8 @@ go_test( "//beacon-chain/core/helpers:go_default_library", "//proto/slashing:go_default_library", "//shared/params:go_default_library", + "//shared/testutil:go_default_library", + "@com_github_pkg_errors//:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", ], ) diff --git a/validator/db/db.go b/validator/db/db.go index 08671396d3d..f7bc19c191d 100644 --- a/validator/db/db.go +++ b/validator/db/db.go @@ -62,24 +62,10 @@ func createBuckets(tx *bolt.Tx, buckets ...[]byte) error { return nil } -// NewKVStoreWithPublicKeyBuckets initializes a new boltDB key-value store at the directory -// path specified, creates the kv-buckets based on the schema and provided public keys, -// and stores an open connection db object as a property of the Store struct. -func NewKVStoreWithPublicKeyBuckets(dirPath string, pubKeys [][48]byte) (*Store, error) { - kv, err := NewKVStore(dirPath) - if err != nil { - return nil, err - } - // Initialize the required public keys into the DB to ensure they're not empty. - if err := kv.initializeSubBuckets(pubKeys); err != nil { - return nil, err - } - return kv, err -} - -// NewKVStore initializes a new boltDB key-value store at the directory path specified -// and stores an open connection db object as a property of the Store struct. -func NewKVStore(dirPath string) (*Store, error) { +// NewKVStore initializes a new boltDB key-value store at the directory +// path specified, creates the kv-buckets based on the schema, and stores +// an open connection db object as a property of the Store struct. +func NewKVStore(dirPath string, pubKeys [][48]byte) (*Store, error) { if err := os.MkdirAll(dirPath, 0700); err != nil { return nil, err } @@ -104,6 +90,11 @@ func NewKVStore(dirPath string) (*Store, error) { return nil, err } + // Initialize the required public keys into the DB to ensure they're not empty. + if err := kv.initializeSubBuckets(pubKeys); err != nil { + return nil, err + } + return kv, err } diff --git a/validator/db/manage.go b/validator/db/manage.go index 5df99b82277..80d52e5f92a 100644 --- a/validator/db/manage.go +++ b/validator/db/manage.go @@ -98,11 +98,11 @@ func getPubKeyProposals(pubKey []byte, proposalsBucket *bolt.Bucket) (*pubKeyPro func createTargetStore( targetDirectory string, allProposals []pubKeyProposals, - allAttestations []pubKeyAttestations) error { + allAttestations []pubKeyAttestations) (err error) { - newStore, err := NewKVStore(targetDirectory) + newStore, err := NewKVStore(targetDirectory, [][48]byte{}) defer func() { - if e := newStore.Close(); e != nil { + if deferErr := newStore.Close(); deferErr != nil { err = errors.Wrap(err, "Could not close the merged database") } }() @@ -110,12 +110,14 @@ func createTargetStore( return errors.Wrapf(err, "Could not initialize a new database in %s", targetDirectory) } - if err := newStore.update(func(tx *bolt.Tx) error { + err = newStore.update(func(tx *bolt.Tx) error { proposalsBucket := tx.Bucket(historicProposalsBucket) for _, pubKeyProposals := range allProposals { pubKeyBucket, err := proposalsBucket.CreateBucket(pubKeyProposals.PubKey) if err != nil { - return errors.Wrapf(err, "Could not create proposals bucket for public key %v", pubKeyProposals.PubKey) + return errors.Wrapf(err, + "Could not create proposals bucket for public key %x", + pubKeyProposals.PubKey[:12]) } for _, epochProposals := range pubKeyProposals.Proposals { if err := pubKeyBucket.Put(epochProposals.Epoch, epochProposals.Proposals); err != nil { @@ -126,12 +128,14 @@ func createTargetStore( attestationsBucket := tx.Bucket(historicAttestationsBucket) for _, attestations := range allAttestations { if err := attestationsBucket.Put(attestations.PubKey, attestations.Attestations); err != nil { - return errors.Wrapf(err, "Could not add public key attestations for public key %v", attestations.PubKey) + return errors.Wrapf( + err, + "Could not add public key attestations for public key %x", + attestations.PubKey[:12]) } } return nil - }); err != nil { - return err - } - return nil + }) + + return err } diff --git a/validator/db/manage_test.go b/validator/db/manage_test.go new file mode 100644 index 00000000000..70ea74f9cad --- /dev/null +++ b/validator/db/manage_test.go @@ -0,0 +1,214 @@ +package db + +import ( + "bytes" + "context" + "os" + "testing" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/go-bitfield" + slashpb "github.com/prysmaticlabs/prysm/proto/slashing" + "github.com/prysmaticlabs/prysm/shared/testutil" +) + +type sourceStoresHistory struct { + ProposalEpoch uint64 + FirstStoreFirstPubKeyProposals bitfield.Bitlist + FirstStoreSecondPubKeyProposals bitfield.Bitlist + SecondStoreFirstPubKeyProposals bitfield.Bitlist + SecondStoreSecondPubKeyProposals bitfield.Bitlist + FirstStoreFirstPubKeyAttestations map[uint64]uint64 + FirstStoreSecondPubKeyAttestations map[uint64]uint64 + SecondStoreFirstPubKeyAttestations map[uint64]uint64 + SecondStoreSecondPubKeyAttestations map[uint64]uint64 +} + +func TestMerge(t *testing.T) { + firstStorePubKeys := [][48]byte{{1}, {2}} + firstStore := SetupDB(t, firstStorePubKeys) + secondStorePubKeys := [][48]byte{{3}, {4}} + secondStore := SetupDB(t, secondStorePubKeys) + + history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore) + if err != nil { + t.Fatal(err) + } + + targetDirectory := testutil.TempDir() + "/target" + t.Cleanup(func() { + if err := os.RemoveAll(targetDirectory); err != nil { + t.Errorf("Could not remove target directory : %v", err) + } + }) + + err = Merge(context.Background(), []*Store{firstStore, secondStore}, targetDirectory) + if err != nil { + t.Fatalf("Merging failed: %v", err) + } + mergedStore, err := GetKVStore(targetDirectory) + if err != nil { + t.Fatalf("Retrieving the merged store failed: %v", err) + } + + assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history) +} + +func prepareSourcesForMerging( + firstStorePubKeys [][48]byte, + firstStore *Store, + secondStorePubKeys [][48]byte, + secondStore *Store) (*sourceStoresHistory, error) { + + proposalEpoch := uint64(0) + proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01} + if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[0][:], proposalEpoch, proposalHistory1); err != nil { + return nil, errors.Wrapf(err, "Saving proposal history failed") + } + proposalHistory2 := bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x01} + if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[1][:], proposalEpoch, proposalHistory2); err != nil { + return nil, errors.Wrapf(err, "Saving proposal history failed") + } + proposalHistory3 := bitfield.Bitlist{0x03, 0x00, 0x00, 0x00, 0x01} + if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[0][:], proposalEpoch, proposalHistory3); err != nil { + return nil, errors.Wrapf(err, "Saving proposal history failed") + } + proposalHistory4 := bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x01} + if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[1][:], proposalEpoch, proposalHistory4); err != nil { + return nil, errors.Wrapf(err, "Saving proposal history failed") + } + + attestationHistoryMap1 := make(map[uint64]uint64) + attestationHistoryMap1[0] = 0 + pubKeyAttestationHistory1 := &slashpb.AttestationHistory{ + TargetToSource: attestationHistoryMap1, + LatestEpochWritten: 0, + } + attestationHistoryMap2 := make(map[uint64]uint64) + attestationHistoryMap2[0] = 1 + pubKeyAttestationHistory2 := &slashpb.AttestationHistory{ + TargetToSource: attestationHistoryMap2, + LatestEpochWritten: 0, + } + dbAttestationHistory1 := make(map[[48]byte]*slashpb.AttestationHistory) + dbAttestationHistory1[firstStorePubKeys[0]] = pubKeyAttestationHistory1 + dbAttestationHistory1[firstStorePubKeys[1]] = pubKeyAttestationHistory2 + if err := firstStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory1); err != nil { + return nil, errors.Wrapf(err, "Saving attestation history failed") + } + attestationHistoryMap3 := make(map[uint64]uint64) + attestationHistoryMap3[0] = 2 + pubKeyAttestationHistory3 := &slashpb.AttestationHistory{ + TargetToSource: attestationHistoryMap3, + LatestEpochWritten: 0, + } + attestationHistoryMap4 := make(map[uint64]uint64) + attestationHistoryMap4[0] = 3 + pubKeyAttestationHistory4 := &slashpb.AttestationHistory{ + TargetToSource: attestationHistoryMap4, + LatestEpochWritten: 0, + } + dbAttestationHistory2 := make(map[[48]byte]*slashpb.AttestationHistory) + dbAttestationHistory2[secondStorePubKeys[0]] = pubKeyAttestationHistory3 + dbAttestationHistory2[secondStorePubKeys[1]] = pubKeyAttestationHistory4 + if err := secondStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory2); err != nil { + return nil, errors.Wrapf(err, "Saving attestation history failed") + } + + mergeHistory := &sourceStoresHistory{ + ProposalEpoch: proposalEpoch, + FirstStoreFirstPubKeyProposals: proposalHistory1, + FirstStoreSecondPubKeyProposals: proposalHistory2, + SecondStoreFirstPubKeyProposals: proposalHistory3, + SecondStoreSecondPubKeyProposals: proposalHistory4, + FirstStoreFirstPubKeyAttestations: attestationHistoryMap1, + FirstStoreSecondPubKeyAttestations: attestationHistoryMap2, + SecondStoreFirstPubKeyAttestations: attestationHistoryMap3, + SecondStoreSecondPubKeyAttestations: attestationHistoryMap4, + } + + return mergeHistory, nil +} + +func assertMergedStore( + t *testing.T, + mergedStore *Store, + firstStorePubKeys [][48]byte, + secondStorePubKeys [][48]byte, + history *sourceStoresHistory) { + + mergedProposalHistory1, err := mergedStore.ProposalHistoryForEpoch( + context.Background(), firstStorePubKeys[0][:], history.ProposalEpoch) + if err != nil { + t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[0]) + } + if !bytes.Equal(mergedProposalHistory1, history.FirstStoreFirstPubKeyProposals) { + t.Fatalf( + "Proposals not merged correctly: expected %v vs received %v", + history.FirstStoreFirstPubKeyProposals, + mergedProposalHistory1) + } + mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch( + context.Background(), firstStorePubKeys[1][:], history.ProposalEpoch) + if err != nil { + t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[1]) + } + if !bytes.Equal(mergedProposalHistory2, history.FirstStoreSecondPubKeyProposals) { + t.Fatalf( + "Proposals not merged correctly: expected %v vs received %v", + history.FirstStoreSecondPubKeyProposals, + mergedProposalHistory2) + } + mergedProposalHistory3, err := mergedStore.ProposalHistoryForEpoch( + context.Background(), secondStorePubKeys[0][:], history.ProposalEpoch) + if err != nil { + t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[0]) + } + if !bytes.Equal(mergedProposalHistory3, history.SecondStoreFirstPubKeyProposals) { + t.Fatalf( + "Proposals not merged correctly: expected %v vs received %v", + history.SecondStoreFirstPubKeyProposals, + mergedProposalHistory3) + } + mergedProposalHistory4, err := mergedStore.ProposalHistoryForEpoch( + context.Background(), secondStorePubKeys[1][:], history.ProposalEpoch) + if err != nil { + t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[1]) + } + if !bytes.Equal(mergedProposalHistory4, history.SecondStoreSecondPubKeyProposals) { + t.Fatalf("Proposals not merged correctly: expected %v vs received %v", + history.SecondStoreSecondPubKeyProposals, + mergedProposalHistory4) + } + + mergedAttestationHistory, err := mergedStore.AttestationHistoryForPubKeys( + context.Background(), + append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1])) + if err != nil { + t.Fatalf("Retrieving merged attestation history failed") + } + if mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0] != history.FirstStoreFirstPubKeyAttestations[0] { + t.Fatalf( + "Attestations not merged correctly: expected %v vs received %v", + history.FirstStoreFirstPubKeyAttestations[0], + mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0]) + } + if mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0] != history.FirstStoreSecondPubKeyAttestations[0] { + t.Fatalf( + "Attestations not merged correctly: expected %v vs received %v", + history.FirstStoreSecondPubKeyAttestations, + mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0]) + } + if mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0] != history.SecondStoreFirstPubKeyAttestations[0] { + t.Fatalf( + "Attestations not merged correctly: expected %v vs received %v", + history.SecondStoreFirstPubKeyAttestations, + mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0]) + } + if mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0] != history.SecondStoreSecondPubKeyAttestations[0] { + t.Fatalf( + "Attestations not merged correctly: expected %v vs received %v", + history.SecondStoreSecondPubKeyAttestations, + mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0]) + } +} diff --git a/validator/db/setup_db.go b/validator/db/setup_db.go index 00b0b015116..8bc075ec5e7 100644 --- a/validator/db/setup_db.go +++ b/validator/db/setup_db.go @@ -19,7 +19,7 @@ func SetupDB(t testing.TB, pubkeys [][48]byte) *Store { if err := os.RemoveAll(p); err != nil { t.Fatalf("Failed to remove directory: %v", err) } - db, err := NewKVStoreWithPublicKeyBuckets(p, pubkeys) + db, err := NewKVStore(p, pubkeys) if err != nil { t.Fatalf("Failed to instantiate DB: %v", err) } diff --git a/validator/db/setup_db_test.go b/validator/db/setup_db_test.go index 77099b623bd..9774b97954d 100644 --- a/validator/db/setup_db_test.go +++ b/validator/db/setup_db_test.go @@ -19,7 +19,7 @@ func TestClearDB(t *testing.T) { if err := os.RemoveAll(p); err != nil { t.Fatalf("Failed to remove directory: %v", err) } - db, err := NewKVStoreWithPublicKeyBuckets(p, [][48]byte{}) + db, err := NewKVStore(p, [][48]byte{}) if err != nil { t.Fatalf("Failed to instantiate DB: %v", err) } diff --git a/validator/node/node.go b/validator/node/node.go index 733a0c7125e..47731c29b6d 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -317,7 +317,7 @@ func clearDB(dataDir string, pubkeys [][48]byte, force bool) error { } if clearDBConfirmed { - valDB, err := db.NewKVStoreWithPublicKeyBuckets(dataDir, pubkeys) + valDB, err := db.NewKVStore(dataDir, pubkeys) if err != nil { return errors.Wrapf(err, "Could not create DB in dir %s", dataDir) }