Skip to content

Commit

Permalink
Merge validators enhancements (#6027)
Browse files Browse the repository at this point in the history
* merge validator enhancements
* added test dependency to db's build file
* Merge branch 'master' into merge-validators-enhancements
* changed formatting of public key
* Merge branch 'master' into merge-validators-enhancements
* Merge branch 'master' into merge-validators-enhancements
* removed unused import
  • Loading branch information
rkapka committed May 29, 2020
1 parent 0243bdc commit d1e2901
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 209 deletions.
9 changes: 4 additions & 5 deletions validator/accounts/account.go
Expand Up @@ -222,12 +222,12 @@ func HandleEmptyKeystoreFlags(cliCtx *cli.Context, confirmPassword bool) (string
}

// Merge merges data from validator databases in sourceDirectories into a new store, which is created in targetDirectory.
func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) error {
func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) (err error) {
var sourceStores []*db.Store
defer func() {
for _, store := range sourceStores {
if err := store.Close(); err != nil {
err = errors.Wrapf(err, "Failed to close the database in %s", store.DatabasePath())
if deferErr := store.Close(); deferErr != nil {
err = errors.Wrapf(deferErr, "Failed to close the database in %s", store.DatabasePath())
}
}
}()
Expand All @@ -247,8 +247,7 @@ func Merge(ctx context.Context, sourceDirectories []string, targetDirectory stri
return errors.New("no validator databases found in source directories")
}

err := db.Merge(ctx, sourceStores, targetDirectory)
if err != nil {
if err := db.Merge(ctx, sourceStores, targetDirectory); err != nil {
return errors.Wrapf(err, "Failed to merge validator databases into %s", targetDirectory)
}

Expand Down
242 changes: 70 additions & 172 deletions validator/accounts/account_test.go
Expand Up @@ -24,22 +24,18 @@ import (
)

type sourceStoresHistory struct {
ProposalEpoch uint64
FirstStoreFirstPubKeyProposals bitfield.Bitlist
FirstStoreSecondPubKeyProposals bitfield.Bitlist
SecondStoreFirstPubKeyProposals bitfield.Bitlist
SecondStoreSecondPubKeyProposals bitfield.Bitlist
FirstStoreFirstPubKeyAttestations map[uint64]uint64
FirstStoreSecondPubKeyAttestations map[uint64]uint64
SecondStoreFirstPubKeyAttestations map[uint64]uint64
SecondStoreSecondPubKeyAttestations map[uint64]uint64
ProposalEpoch uint64
FirstStorePubKeyProposals bitfield.Bitlist
SecondStorePubKeyProposals bitfield.Bitlist
FirstStorePubKeyAttestations map[uint64]uint64
SecondStorePubKeyAttestations map[uint64]uint64
}

func TestNewValidatorAccount_AccountExists(t *testing.T) {
directory := testutil.TempDir() + "/testkeystore"
defer func() {
if err := os.RemoveAll(directory); err != nil {
t.Logf("Could not remove directory: %v", err)
t.Errorf("Could not remove directory: %v", err)
}
}()
validatorKey, err := keystore.NewKey()
Expand Down Expand Up @@ -176,46 +172,13 @@ func TestChangePassword_KeyNotMatchingOldPasswordNotEncryptedWithNewPassword(t *
}
}

func TestMerge(t *testing.T) {
firstStorePubKeys := [][48]byte{{1}, {2}}
firstStore := db.SetupDB(t, firstStorePubKeys)
secondStorePubKeys := [][48]byte{{3}, {4}}
secondStore := db.SetupDB(t, secondStorePubKeys)

history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
if err != nil {
t.Fatalf(err.Error())
}

if err := firstStore.Close(); err != nil {
t.Fatalf("Closing source store failed: %v", err)
}
if err := secondStore.Close(); err != nil {
t.Fatalf("Closing source store failed: %v", err)
}

targetDirectory := testutil.TempDir() + "/target"
err = Merge(context.Background(), []string{firstStore.DatabasePath(), secondStore.DatabasePath()}, targetDirectory)
if err != nil {
t.Fatalf("Merging failed: %v", err)
}
mergedStore, err := db.GetKVStore(targetDirectory)
if err != nil {
t.Fatalf("Retrieving the merged store failed: %v", err)
}

assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)

cleanupAfterMerge(t, []string{firstStore.DatabasePath(), secondStore.DatabasePath(), targetDirectory})
}

func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
firstStorePubKeys := [][48]byte{{1}, {2}}
firstStore := db.SetupDB(t, firstStorePubKeys)
secondStorePubKeys := [][48]byte{{3}, {4}}
secondStore := db.SetupDB(t, secondStorePubKeys)
firstStorePubKey := [48]byte{1}
firstStore := db.SetupDB(t, [][48]byte{firstStorePubKey})
secondStorePubKey := [48]byte{2}
secondStore := db.SetupDB(t, [][48]byte{secondStorePubKey})

history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
history, err := prepareSourcesForMerging(firstStorePubKey, firstStore, secondStorePubKey, secondStore)
if err != nil {
t.Fatalf(err.Error())
}
Expand All @@ -232,6 +195,12 @@ func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
t.Fatalf("Could not create directory %s", sourceDirectoryWithoutStore)
}
targetDirectory := testutil.TempDir() + "/target"
t.Cleanup(func() {
if err := os.RemoveAll(targetDirectory); err != nil {
t.Errorf("Could not remove target directory : %v", err)
}
})

err = Merge(
context.Background(),
[]string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore}, targetDirectory)
Expand All @@ -243,11 +212,7 @@ func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
t.Fatalf("Retrieving the merged store failed: %v", err)
}

assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)

cleanupAfterMerge(
t,
[]string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore, targetDirectory})
assertMergedStore(t, mergedStore, firstStorePubKey, secondStorePubKey, history)
}

func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) {
Expand All @@ -263,32 +228,29 @@ func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) {
if err := os.MkdirAll(targetDirectory, 0700); err != nil {
t.Fatalf("Could not create directory %s", targetDirectory)
}
t.Cleanup(func() {
for _, dir := range []string{sourceDirectory1, sourceDirectory2, targetDirectory} {
if err := os.RemoveAll(dir); err != nil {
t.Errorf("Could not remove directory : %v", err)
}
}
})

err := Merge(context.Background(), []string{sourceDirectory1, sourceDirectory2}, targetDirectory)
expected := "no validator databases found in source directories"
if err == nil || !strings.Contains(err.Error(), expected) {
t.Errorf("Expected: %s vs received %v", expected, err)
}

cleanupAfterMerge(t, []string{sourceDirectory1, sourceDirectory2, targetDirectory})
}

func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store, secondStorePubKeys [][48]byte, secondStore *db.Store) (*sourceStoresHistory, error) {
func prepareSourcesForMerging(firstStorePubKey [48]byte, firstStore *db.Store, secondStorePubKey [48]byte, secondStore *db.Store) (*sourceStoresHistory, error) {
proposalEpoch := uint64(0)
proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01}
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[0][:], proposalEpoch, proposalHistory1); err != nil {
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKey[:], proposalEpoch, proposalHistory1); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory2 := bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x01}
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[1][:], proposalEpoch, proposalHistory2); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory3 := bitfield.Bitlist{0x03, 0x00, 0x00, 0x00, 0x01}
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[0][:], proposalEpoch, proposalHistory3); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory4 := bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x01}
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[1][:], proposalEpoch, proposalHistory4); err != nil {
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKey[:], proposalEpoch, proposalHistory2); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}

Expand All @@ -298,47 +260,29 @@ func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store
TargetToSource: attestationHistoryMap1,
LatestEpochWritten: 0,
}
attestationHistoryMap2 := make(map[uint64]uint64)
attestationHistoryMap2[0] = 1
pubKeyAttestationHistory2 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap2,
LatestEpochWritten: 0,
}
dbAttestationHistory1 := make(map[[48]byte]*slashpb.AttestationHistory)
dbAttestationHistory1[firstStorePubKeys[0]] = pubKeyAttestationHistory1
dbAttestationHistory1[firstStorePubKeys[1]] = pubKeyAttestationHistory2
dbAttestationHistory1[firstStorePubKey] = pubKeyAttestationHistory1
if err := firstStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory1); err != nil {
return nil, errors.Wrapf(err, "Saving attestation history failed")
}
attestationHistoryMap3 := make(map[uint64]uint64)
attestationHistoryMap3[0] = 2
pubKeyAttestationHistory3 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap3,
LatestEpochWritten: 0,
}
attestationHistoryMap4 := make(map[uint64]uint64)
attestationHistoryMap4[0] = 3
pubKeyAttestationHistory4 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap4,
attestationHistoryMap2 := make(map[uint64]uint64)
attestationHistoryMap2[0] = 1
pubKeyAttestationHistory2 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap2,
LatestEpochWritten: 0,
}
dbAttestationHistory2 := make(map[[48]byte]*slashpb.AttestationHistory)
dbAttestationHistory2[secondStorePubKeys[0]] = pubKeyAttestationHistory3
dbAttestationHistory2[secondStorePubKeys[1]] = pubKeyAttestationHistory4
dbAttestationHistory2[secondStorePubKey] = pubKeyAttestationHistory2
if err := secondStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory2); err != nil {
return nil, errors.Wrapf(err, "Saving attestation history failed")
}

mergeHistory := &sourceStoresHistory{
ProposalEpoch: proposalEpoch,
FirstStoreFirstPubKeyProposals: proposalHistory1,
FirstStoreSecondPubKeyProposals: proposalHistory2,
SecondStoreFirstPubKeyProposals: proposalHistory3,
SecondStoreSecondPubKeyProposals: proposalHistory4,
FirstStoreFirstPubKeyAttestations: attestationHistoryMap1,
FirstStoreSecondPubKeyAttestations: attestationHistoryMap2,
SecondStoreFirstPubKeyAttestations: attestationHistoryMap3,
SecondStoreSecondPubKeyAttestations: attestationHistoryMap4,
ProposalEpoch: proposalEpoch,
FirstStorePubKeyProposals: proposalHistory1,
SecondStorePubKeyProposals: proposalHistory2,
FirstStorePubKeyAttestations: attestationHistoryMap1,
SecondStorePubKeyAttestations: attestationHistoryMap2,
}

return mergeHistory, nil
Expand All @@ -347,95 +291,49 @@ func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store
func assertMergedStore(
t *testing.T,
mergedStore *db.Store,
firstStorePubKeys [][48]byte,
secondStorePubKeys [][48]byte,
firstStorePubKey [48]byte,
secondStorePubKey [48]byte,
history *sourceStoresHistory) {

mergedProposalHistory1, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), firstStorePubKeys[0][:], history.ProposalEpoch)
context.Background(), firstStorePubKey[:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[0])
} else {
if !bytes.Equal(mergedProposalHistory1, history.FirstStoreFirstPubKeyProposals) {
t.Errorf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStoreFirstPubKeyProposals,
mergedProposalHistory1)
}
t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKey)
}
mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), firstStorePubKeys[1][:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[1])
} else {
if !bytes.Equal(mergedProposalHistory2, history.FirstStoreSecondPubKeyProposals) {
t.Errorf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStoreSecondPubKeyProposals,
mergedProposalHistory2)
}
if !bytes.Equal(mergedProposalHistory1, history.FirstStorePubKeyProposals) {
t.Fatalf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStorePubKeyProposals,
mergedProposalHistory1)
}
mergedProposalHistory3, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKeys[0][:], history.ProposalEpoch)
mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKey[:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[0])
} else {
if !bytes.Equal(mergedProposalHistory3, history.SecondStoreFirstPubKeyProposals) {
t.Errorf(
"Proposals not merged correctly: expected %v vs received %v",
history.SecondStoreFirstPubKeyProposals,
mergedProposalHistory3)
}
t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKey)
}
mergedProposalHistory4, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKeys[1][:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[1])
} else {
if !bytes.Equal(mergedProposalHistory4, history.SecondStoreSecondPubKeyProposals) {
t.Errorf("Proposals not merged correctly: expected %v vs received %v",
history.SecondStoreSecondPubKeyProposals,
mergedProposalHistory4)
}
if !bytes.Equal(mergedProposalHistory2, history.SecondStorePubKeyProposals) {
t.Fatalf(
"Proposals not merged correctly: expected %v vs received %v",
history.SecondStorePubKeyProposals,
mergedProposalHistory2)
}

mergedAttestationHistory, err := mergedStore.AttestationHistoryForPubKeys(
context.Background(),
append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1]))
[][48]byte{firstStorePubKey, secondStorePubKey})
if err != nil {
t.Error("Retrieving merged attestation history failed")
} else {
if mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0] != history.FirstStoreFirstPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStoreFirstPubKeyAttestations[0],
mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0])
}
if mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0] != history.FirstStoreSecondPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStoreSecondPubKeyAttestations,
mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0] != history.SecondStoreFirstPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStoreFirstPubKeyAttestations,
mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0] != history.SecondStoreSecondPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStoreSecondPubKeyAttestations,
mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0])
}
}
}

func cleanupAfterMerge(t *testing.T, directories []string) {
for _, dir := range directories {
if err := os.RemoveAll(dir); err != nil {
t.Logf("Could not remove directory %s: %v", dir, err)
}
t.Fatalf("Retrieving merged attestation history failed")
}
if mergedAttestationHistory[firstStorePubKey].TargetToSource[0] != history.FirstStorePubKeyAttestations[0] {
t.Fatalf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStorePubKeyAttestations[0],
mergedAttestationHistory[firstStorePubKey].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKey].TargetToSource[0] != history.SecondStorePubKeyAttestations[0] {
t.Fatalf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStorePubKeyAttestations,
mergedAttestationHistory[secondStorePubKey].TargetToSource[0])
}
}
2 changes: 1 addition & 1 deletion validator/client/service.go
Expand Up @@ -114,7 +114,7 @@ func (v *ValidatorService) Start() {
return
}

valDB, err := db.NewKVStoreWithPublicKeyBuckets(v.dataDir, pubkeys)
valDB, err := db.NewKVStore(v.dataDir, pubkeys)
if err != nil {
log.Errorf("Could not initialize db: %v", err)
return
Expand Down
3 changes: 3 additions & 0 deletions validator/db/BUILD.bazel
Expand Up @@ -31,6 +31,7 @@ go_test(
name = "go_default_test",
srcs = [
"attestation_history_test.go",
"manage_test.go",
"proposal_history_test.go",
"setup_db_test.go",
],
Expand All @@ -39,6 +40,8 @@ go_test(
"//beacon-chain/core/helpers:go_default_library",
"//proto/slashing:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)

0 comments on commit d1e2901

Please sign in to comment.