Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge validator databases #5968

Merged
merged 13 commits into from
May 25, 2020
3 changes: 3 additions & 0 deletions validator/accounts/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ go_library(
],
deps = [
"//contracts/deposit-contract:go_default_library",
"//proto/slashing:go_default_library",
"//shared/cmd:go_default_library",
"//shared/keystore:go_default_library",
"//shared/params:go_default_library",
"//validator/db:go_default_library",
"//validator/flags:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@in_gopkg_urfave_cli_v2//:go_default_library",
Expand Down
34 changes: 34 additions & 0 deletions validator/accounts/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package accounts

import (
"bufio"
"context"
"encoding/hex"
"fmt"
"io"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/keystore"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/flags"
"github.com/sirupsen/logrus"
"gopkg.in/urfave/cli.v2"
Expand Down Expand Up @@ -219,6 +221,38 @@ func HandleEmptyKeystoreFlags(cliCtx *cli.Context, confirmPassword bool) (string
return path, passphrase, nil
}

func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) error {
var sourceStores []*db.Store

for _, dir := range sourceDirectories {
store, err := db.GetKVStore(dir)
if err != nil {
return errors.Wrapf(err, "Failed to prepare the database in %s for merging", dir)
}
if store == nil {
continue
}
sourceStores = append(sourceStores, store)
}

if len(sourceStores) == 0 {
return errors.New("No validator databases found in source directories")
}

err := db.Merge(ctx, sourceStores, targetDirectory)
if err != nil {
return errors.Wrapf(err, "Failed to merge validator databases into %s", targetDirectory)
}

for _, store := range sourceStores {
if err := store.Close(); err != nil {
return errors.Wrapf(err,"Failed to close the database in %s", store.DatabasePath())
}
}

return nil
}

// ChangePassword changes the password for all keys located in a keystore.
// Password is changed only for keys that can be decrypted using the old password.
func ChangePassword(keystorePath string, oldPassword string, newPassword string) error {
Expand Down
285 changes: 285 additions & 0 deletions validator/accounts/account_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
package accounts

import (
"bytes"
"context"
"encoding/hex"
"flag"
"fmt"
"io/ioutil"
"os"
"strings"
"testing"

"github.com/pkg/errors"

"github.com/prysmaticlabs/go-bitfield"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/validator/db"

"github.com/prysmaticlabs/prysm/shared/keystore"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/validator/flags"
"gopkg.in/urfave/cli.v2"
)

type sourceStoresHistory struct {
ProposalEpoch uint64
FirstStoreFirstPubKeyProposals bitfield.Bitlist
FirstStoreSecondPubKeyProposals bitfield.Bitlist
SecondStoreFirstPubKeyProposals bitfield.Bitlist
SecondStoreSecondPubKeyProposals bitfield.Bitlist
FirstStoreFirstPubKeyAttestations map[uint64]uint64
FirstStoreSecondPubKeyAttestations map[uint64]uint64
SecondStoreFirstPubKeyAttestations map[uint64]uint64
SecondStoreSecondPubKeyAttestations map[uint64]uint64
}

func TestNewValidatorAccount_AccountExists(t *testing.T) {
directory := testutil.TempDir() + "/testkeystore"
defer func() {
Expand Down Expand Up @@ -155,3 +176,267 @@ func TestChangePassword_KeyNotMatchingOldPasswordNotEncryptedWithNewPassword(t *
t.Error("Key incorrectly encrypted using the new password")
}
}

func TestMerge(t *testing.T) {
rkapka marked this conversation as resolved.
Show resolved Hide resolved
firstStorePubKeys := [][48]byte{{1}, {2}}
firstStore := db.SetupDB(t, firstStorePubKeys)
secondStorePubKeys := [][48]byte{{3}, {4}}
secondStore := db.SetupDB(t, secondStorePubKeys)

history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
if err != nil {
t.Fatalf(err.Error())
rkapka marked this conversation as resolved.
Show resolved Hide resolved
}

if err := firstStore.Close(); err != nil {
t.Fatalf("Closing source store failed: %v", err)
}
if err := secondStore.Close(); err != nil {
t.Fatalf("Closing source store failed: %v", err)
}

targetDirectory := testutil.TempDir() + "/target"
err = Merge(context.Background(), []string{firstStore.DatabasePath(), secondStore.DatabasePath()}, targetDirectory)
if err != nil {
t.Fatalf("Merging failed: %v", err)
}
mergedStore, err := db.GetKVStore(targetDirectory)
if err != nil {
t.Fatalf("Retrieving the merged store failed: %v", err)
}

assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)

cleanupAfterMerge(t, []string{firstStore.DatabasePath(), secondStore.DatabasePath(), targetDirectory})
}

func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
firstStorePubKeys := [][48]byte{{1}, {2}}
firstStore := db.SetupDB(t, firstStorePubKeys)
secondStorePubKeys := [][48]byte{{3}, {4}}
secondStore := db.SetupDB(t, secondStorePubKeys)

history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
if err != nil {
t.Fatalf(err.Error())
}

if err := firstStore.Close(); err != nil {
t.Fatalf("Closing source store failed: %v", err)
}
if err := secondStore.Close(); err != nil {
t.Fatalf("Closing source store failed: %v", err)
}

sourceDirectoryWithoutStore := testutil.TempDir() + "/nodb"
if err := os.MkdirAll(sourceDirectoryWithoutStore, 0700); err != nil {
t.Fatalf("Could not create directory %s", sourceDirectoryWithoutStore)
}
targetDirectory := testutil.TempDir() + "/target"
err = Merge(
context.Background(),
[]string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore}, targetDirectory)
rkapka marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("Merging failed: %v", err)
}
mergedStore, err := db.GetKVStore(targetDirectory)
if err != nil {
t.Fatalf("Retrieving the merged store failed: %v", err)
}

assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)

cleanupAfterMerge(
t,
[]string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore, targetDirectory})
}

func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) {
sourceDirectory1 := testutil.TempDir() + "/source1"
sourceDirectory2 := testutil.TempDir() + "/source2"
targetDirectory := testutil.TempDir() + "/target"
if err := os.MkdirAll(sourceDirectory1, 0700); err != nil {
t.Fatalf("Could not create directory %s", sourceDirectory1)
}
if err := os.MkdirAll(sourceDirectory2, 0700); err != nil {
t.Fatalf("Could not create directory %s", sourceDirectory2)
}
if err := os.MkdirAll(targetDirectory, 0700); err != nil {
t.Fatalf("Could not create directory %s", targetDirectory)
}

err := Merge(context.Background(), []string{sourceDirectory1, sourceDirectory2}, targetDirectory)
expected := "No validator databases found in source directories"
if err == nil || !strings.Contains(err.Error(), expected) {
t.Errorf("Expected: %s vs received %v", expected, err)
}

cleanupAfterMerge(t, []string{sourceDirectory1, sourceDirectory2, targetDirectory})
}

func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store, secondStorePubKeys [][48]byte, secondStore *db.Store) (*sourceStoresHistory, error) {
proposalEpoch := uint64(0)
proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01}
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[0][:], proposalEpoch, proposalHistory1); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory2 := bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x01}
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[1][:], proposalEpoch, proposalHistory2); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory3 := bitfield.Bitlist{0x03, 0x00, 0x00, 0x00, 0x01}
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[0][:], proposalEpoch, proposalHistory3); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory4 := bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x01}
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[1][:], proposalEpoch, proposalHistory4); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}

attestationHistoryMap1 := make(map[uint64]uint64)
attestationHistoryMap1[0] = 0
pubKeyAttestationHistory1 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap1,
LatestEpochWritten: 0,
}
attestationHistoryMap2 := make(map[uint64]uint64)
attestationHistoryMap2[0] = 1
pubKeyAttestationHistory2 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap2,
LatestEpochWritten: 0,
}
dbAttestationHistory1 := make(map[[48]byte]*slashpb.AttestationHistory)
dbAttestationHistory1[firstStorePubKeys[0]] = pubKeyAttestationHistory1
dbAttestationHistory1[firstStorePubKeys[1]] = pubKeyAttestationHistory2
if err := firstStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory1); err != nil {
return nil, errors.Wrapf(err, "Saving attestation history failed")
}
attestationHistoryMap3 := make(map[uint64]uint64)
attestationHistoryMap3[0] = 2
pubKeyAttestationHistory3 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap3,
LatestEpochWritten: 0,
}
attestationHistoryMap4 := make(map[uint64]uint64)
attestationHistoryMap4[0] = 3
pubKeyAttestationHistory4 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap4,
LatestEpochWritten: 0,
}
dbAttestationHistory2 := make(map[[48]byte]*slashpb.AttestationHistory)
dbAttestationHistory2[secondStorePubKeys[0]] = pubKeyAttestationHistory3
dbAttestationHistory2[secondStorePubKeys[1]] = pubKeyAttestationHistory4
if err := secondStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory2); err != nil {
return nil, errors.Wrapf(err, "Saving attestation history failed")
}

mergeHistory := &sourceStoresHistory{
ProposalEpoch: proposalEpoch,
FirstStoreFirstPubKeyProposals: proposalHistory1,
FirstStoreSecondPubKeyProposals: proposalHistory2,
SecondStoreFirstPubKeyProposals: proposalHistory3,
SecondStoreSecondPubKeyProposals: proposalHistory4,
FirstStoreFirstPubKeyAttestations: attestationHistoryMap1,
FirstStoreSecondPubKeyAttestations: attestationHistoryMap2,
SecondStoreFirstPubKeyAttestations: attestationHistoryMap3,
SecondStoreSecondPubKeyAttestations: attestationHistoryMap4,
}

return mergeHistory, nil
}

func assertMergedStore(
t *testing.T,
mergedStore *db.Store,
firstStorePubKeys [][48]byte,
secondStorePubKeys [][48]byte,
history *sourceStoresHistory) {

mergedProposalHistory1, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), firstStorePubKeys[0][:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[0])
} else {
rkapka marked this conversation as resolved.
Show resolved Hide resolved
if !bytes.Equal(mergedProposalHistory1, history.FirstStoreFirstPubKeyProposals) {
t.Errorf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStoreFirstPubKeyProposals,
mergedProposalHistory1)
}
}
mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), firstStorePubKeys[1][:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[1])
} else {
rkapka marked this conversation as resolved.
Show resolved Hide resolved
if !bytes.Equal(mergedProposalHistory2, history.FirstStoreSecondPubKeyProposals) {
t.Errorf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStoreSecondPubKeyProposals,
mergedProposalHistory2)
}
}
mergedProposalHistory3, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKeys[0][:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[0])
} else {
rkapka marked this conversation as resolved.
Show resolved Hide resolved
if !bytes.Equal(mergedProposalHistory3, history.SecondStoreFirstPubKeyProposals) {
t.Errorf(
"Proposals not merged correctly: expected %v vs received %v",
history.SecondStoreFirstPubKeyProposals,
mergedProposalHistory3)
}
}
mergedProposalHistory4, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKeys[1][:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[1])
} else {
rkapka marked this conversation as resolved.
Show resolved Hide resolved
if !bytes.Equal(mergedProposalHistory4, history.SecondStoreSecondPubKeyProposals) {
t.Errorf("Proposals not merged correctly: expected %v vs received %v",
history.SecondStoreSecondPubKeyProposals,
mergedProposalHistory4)
}
}

mergedAttestationHistory, err := mergedStore.AttestationHistoryForPubKeys(
context.Background(),
append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1]))
if err != nil {
t.Error("Retrieving merged attestation history failed")
} else {
rkapka marked this conversation as resolved.
Show resolved Hide resolved
if mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0] != history.FirstStoreFirstPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStoreFirstPubKeyAttestations[0],
mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0])
}
if mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0] != history.FirstStoreSecondPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStoreSecondPubKeyAttestations,
mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0] != history.SecondStoreFirstPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStoreFirstPubKeyAttestations,
mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0] != history.SecondStoreSecondPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStoreSecondPubKeyAttestations,
mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0])
}
}
}

func cleanupAfterMerge(t *testing.T, directories []string) {
for _, dir := range directories {
if err := os.RemoveAll(dir); err != nil {
t.Logf("Could not remove directory %s: %v", dir, err)
rkapka marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
2 changes: 1 addition & 1 deletion validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (v *ValidatorService) Start() {
return
}

valDB, err := db.NewKVStore(v.dataDir, pubkeys)
valDB, err := db.NewKVStoreWithPublicKeyBuckets(v.dataDir, pubkeys)
rkapka marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Errorf("Could not initialize db: %v", err)
return
Expand Down