Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge validators enhancements #6027

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 4 additions & 5 deletions validator/accounts/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ func HandleEmptyKeystoreFlags(cliCtx *cli.Context, confirmPassword bool) (string
}

// Merge merges data from validator databases in sourceDirectories into a new store, which is created in targetDirectory.
func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) error {
func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) (err error) {
var sourceStores []*db.Store
defer func() {
for _, store := range sourceStores {
if err := store.Close(); err != nil {
err = errors.Wrapf(err, "Failed to close the database in %s", store.DatabasePath())
if deferErr := store.Close(); deferErr != nil {
err = errors.Wrapf(deferErr, "Failed to close the database in %s", store.DatabasePath())
rkapka marked this conversation as resolved.
Show resolved Hide resolved
}
}
}()
Expand All @@ -247,8 +247,7 @@ func Merge(ctx context.Context, sourceDirectories []string, targetDirectory stri
return errors.New("no validator databases found in source directories")
}

err := db.Merge(ctx, sourceStores, targetDirectory)
if err != nil {
if err := db.Merge(ctx, sourceStores, targetDirectory); err != nil {
return errors.Wrapf(err, "Failed to merge validator databases into %s", targetDirectory)
}

Expand Down
242 changes: 70 additions & 172 deletions validator/accounts/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,18 @@ import (
)

type sourceStoresHistory struct {
ProposalEpoch uint64
FirstStoreFirstPubKeyProposals bitfield.Bitlist
FirstStoreSecondPubKeyProposals bitfield.Bitlist
SecondStoreFirstPubKeyProposals bitfield.Bitlist
SecondStoreSecondPubKeyProposals bitfield.Bitlist
FirstStoreFirstPubKeyAttestations map[uint64]uint64
FirstStoreSecondPubKeyAttestations map[uint64]uint64
SecondStoreFirstPubKeyAttestations map[uint64]uint64
SecondStoreSecondPubKeyAttestations map[uint64]uint64
ProposalEpoch uint64
FirstStorePubKeyProposals bitfield.Bitlist
SecondStorePubKeyProposals bitfield.Bitlist
FirstStorePubKeyAttestations map[uint64]uint64
SecondStorePubKeyAttestations map[uint64]uint64
}

func TestNewValidatorAccount_AccountExists(t *testing.T) {
directory := testutil.TempDir() + "/testkeystore"
defer func() {
if err := os.RemoveAll(directory); err != nil {
t.Logf("Could not remove directory: %v", err)
t.Errorf("Could not remove directory: %v", err)
}
}()
validatorKey, err := keystore.NewKey()
Expand Down Expand Up @@ -176,46 +172,13 @@ func TestChangePassword_KeyNotMatchingOldPasswordNotEncryptedWithNewPassword(t *
}
}

func TestMerge(t *testing.T) {
firstStorePubKeys := [][48]byte{{1}, {2}}
firstStore := db.SetupDB(t, firstStorePubKeys)
secondStorePubKeys := [][48]byte{{3}, {4}}
secondStore := db.SetupDB(t, secondStorePubKeys)

history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
if err != nil {
t.Fatalf(err.Error())
}

if err := firstStore.Close(); err != nil {
t.Fatalf("Closing source store failed: %v", err)
}
if err := secondStore.Close(); err != nil {
t.Fatalf("Closing source store failed: %v", err)
}

targetDirectory := testutil.TempDir() + "/target"
err = Merge(context.Background(), []string{firstStore.DatabasePath(), secondStore.DatabasePath()}, targetDirectory)
if err != nil {
t.Fatalf("Merging failed: %v", err)
}
mergedStore, err := db.GetKVStore(targetDirectory)
if err != nil {
t.Fatalf("Retrieving the merged store failed: %v", err)
}

assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)

cleanupAfterMerge(t, []string{firstStore.DatabasePath(), secondStore.DatabasePath(), targetDirectory})
}

func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
firstStorePubKeys := [][48]byte{{1}, {2}}
firstStore := db.SetupDB(t, firstStorePubKeys)
secondStorePubKeys := [][48]byte{{3}, {4}}
secondStore := db.SetupDB(t, secondStorePubKeys)
firstStorePubKey := [48]byte{1}
firstStore := db.SetupDB(t, [][48]byte{firstStorePubKey})
secondStorePubKey := [48]byte{2}
secondStore := db.SetupDB(t, [][48]byte{secondStorePubKey})

history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
history, err := prepareSourcesForMerging(firstStorePubKey, firstStore, secondStorePubKey, secondStore)
if err != nil {
t.Fatalf(err.Error())
}
Expand All @@ -232,6 +195,12 @@ func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
t.Fatalf("Could not create directory %s", sourceDirectoryWithoutStore)
}
targetDirectory := testutil.TempDir() + "/target"
t.Cleanup(func() {
if err := os.RemoveAll(targetDirectory); err != nil {
t.Errorf("Could not remove target directory : %v", err)
}
})

err = Merge(
context.Background(),
[]string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore}, targetDirectory)
Expand All @@ -243,11 +212,7 @@ func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
t.Fatalf("Retrieving the merged store failed: %v", err)
}

assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)

cleanupAfterMerge(
t,
[]string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore, targetDirectory})
assertMergedStore(t, mergedStore, firstStorePubKey, secondStorePubKey, history)
}

func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) {
Expand All @@ -263,32 +228,29 @@ func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) {
if err := os.MkdirAll(targetDirectory, 0700); err != nil {
t.Fatalf("Could not create directory %s", targetDirectory)
}
t.Cleanup(func() {
for _, dir := range []string{sourceDirectory1, sourceDirectory2, targetDirectory} {
if err := os.RemoveAll(dir); err != nil {
t.Errorf("Could not remove directory : %v", err)
}
}
})

err := Merge(context.Background(), []string{sourceDirectory1, sourceDirectory2}, targetDirectory)
expected := "no validator databases found in source directories"
if err == nil || !strings.Contains(err.Error(), expected) {
t.Errorf("Expected: %s vs received %v", expected, err)
}

cleanupAfterMerge(t, []string{sourceDirectory1, sourceDirectory2, targetDirectory})
}

func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store, secondStorePubKeys [][48]byte, secondStore *db.Store) (*sourceStoresHistory, error) {
func prepareSourcesForMerging(firstStorePubKey [48]byte, firstStore *db.Store, secondStorePubKey [48]byte, secondStore *db.Store) (*sourceStoresHistory, error) {
proposalEpoch := uint64(0)
proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01}
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[0][:], proposalEpoch, proposalHistory1); err != nil {
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKey[:], proposalEpoch, proposalHistory1); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory2 := bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x01}
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[1][:], proposalEpoch, proposalHistory2); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory3 := bitfield.Bitlist{0x03, 0x00, 0x00, 0x00, 0x01}
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[0][:], proposalEpoch, proposalHistory3); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory4 := bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x01}
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[1][:], proposalEpoch, proposalHistory4); err != nil {
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKey[:], proposalEpoch, proposalHistory2); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}

Expand All @@ -298,47 +260,29 @@ func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store
TargetToSource: attestationHistoryMap1,
LatestEpochWritten: 0,
}
attestationHistoryMap2 := make(map[uint64]uint64)
attestationHistoryMap2[0] = 1
pubKeyAttestationHistory2 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap2,
LatestEpochWritten: 0,
}
dbAttestationHistory1 := make(map[[48]byte]*slashpb.AttestationHistory)
dbAttestationHistory1[firstStorePubKeys[0]] = pubKeyAttestationHistory1
dbAttestationHistory1[firstStorePubKeys[1]] = pubKeyAttestationHistory2
dbAttestationHistory1[firstStorePubKey] = pubKeyAttestationHistory1
if err := firstStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory1); err != nil {
return nil, errors.Wrapf(err, "Saving attestation history failed")
}
attestationHistoryMap3 := make(map[uint64]uint64)
attestationHistoryMap3[0] = 2
pubKeyAttestationHistory3 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap3,
LatestEpochWritten: 0,
}
attestationHistoryMap4 := make(map[uint64]uint64)
attestationHistoryMap4[0] = 3
pubKeyAttestationHistory4 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap4,
attestationHistoryMap2 := make(map[uint64]uint64)
attestationHistoryMap2[0] = 1
pubKeyAttestationHistory2 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap2,
LatestEpochWritten: 0,
}
dbAttestationHistory2 := make(map[[48]byte]*slashpb.AttestationHistory)
dbAttestationHistory2[secondStorePubKeys[0]] = pubKeyAttestationHistory3
dbAttestationHistory2[secondStorePubKeys[1]] = pubKeyAttestationHistory4
dbAttestationHistory2[secondStorePubKey] = pubKeyAttestationHistory2
if err := secondStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory2); err != nil {
return nil, errors.Wrapf(err, "Saving attestation history failed")
}

mergeHistory := &sourceStoresHistory{
ProposalEpoch: proposalEpoch,
FirstStoreFirstPubKeyProposals: proposalHistory1,
FirstStoreSecondPubKeyProposals: proposalHistory2,
SecondStoreFirstPubKeyProposals: proposalHistory3,
SecondStoreSecondPubKeyProposals: proposalHistory4,
FirstStoreFirstPubKeyAttestations: attestationHistoryMap1,
FirstStoreSecondPubKeyAttestations: attestationHistoryMap2,
SecondStoreFirstPubKeyAttestations: attestationHistoryMap3,
SecondStoreSecondPubKeyAttestations: attestationHistoryMap4,
ProposalEpoch: proposalEpoch,
FirstStorePubKeyProposals: proposalHistory1,
SecondStorePubKeyProposals: proposalHistory2,
FirstStorePubKeyAttestations: attestationHistoryMap1,
SecondStorePubKeyAttestations: attestationHistoryMap2,
}

return mergeHistory, nil
Expand All @@ -347,95 +291,49 @@ func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store
func assertMergedStore(
t *testing.T,
mergedStore *db.Store,
firstStorePubKeys [][48]byte,
secondStorePubKeys [][48]byte,
firstStorePubKey [48]byte,
secondStorePubKey [48]byte,
history *sourceStoresHistory) {

mergedProposalHistory1, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), firstStorePubKeys[0][:], history.ProposalEpoch)
context.Background(), firstStorePubKey[:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[0])
} else {
if !bytes.Equal(mergedProposalHistory1, history.FirstStoreFirstPubKeyProposals) {
t.Errorf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStoreFirstPubKeyProposals,
mergedProposalHistory1)
}
t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKey)
}
mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), firstStorePubKeys[1][:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[1])
} else {
if !bytes.Equal(mergedProposalHistory2, history.FirstStoreSecondPubKeyProposals) {
t.Errorf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStoreSecondPubKeyProposals,
mergedProposalHistory2)
}
if !bytes.Equal(mergedProposalHistory1, history.FirstStorePubKeyProposals) {
t.Fatalf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStorePubKeyProposals,
mergedProposalHistory1)
}
mergedProposalHistory3, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKeys[0][:], history.ProposalEpoch)
mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKey[:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[0])
} else {
if !bytes.Equal(mergedProposalHistory3, history.SecondStoreFirstPubKeyProposals) {
t.Errorf(
"Proposals not merged correctly: expected %v vs received %v",
history.SecondStoreFirstPubKeyProposals,
mergedProposalHistory3)
}
t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKey)
}
mergedProposalHistory4, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKeys[1][:], history.ProposalEpoch)
if err != nil {
t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[1])
} else {
if !bytes.Equal(mergedProposalHistory4, history.SecondStoreSecondPubKeyProposals) {
t.Errorf("Proposals not merged correctly: expected %v vs received %v",
history.SecondStoreSecondPubKeyProposals,
mergedProposalHistory4)
}
if !bytes.Equal(mergedProposalHistory2, history.SecondStorePubKeyProposals) {
t.Fatalf(
"Proposals not merged correctly: expected %v vs received %v",
history.SecondStorePubKeyProposals,
mergedProposalHistory2)
}

mergedAttestationHistory, err := mergedStore.AttestationHistoryForPubKeys(
context.Background(),
append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1]))
[][48]byte{firstStorePubKey, secondStorePubKey})
if err != nil {
t.Error("Retrieving merged attestation history failed")
} else {
if mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0] != history.FirstStoreFirstPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStoreFirstPubKeyAttestations[0],
mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0])
}
if mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0] != history.FirstStoreSecondPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStoreSecondPubKeyAttestations,
mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0] != history.SecondStoreFirstPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStoreFirstPubKeyAttestations,
mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0] != history.SecondStoreSecondPubKeyAttestations[0] {
t.Errorf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStoreSecondPubKeyAttestations,
mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0])
}
}
}

func cleanupAfterMerge(t *testing.T, directories []string) {
for _, dir := range directories {
if err := os.RemoveAll(dir); err != nil {
t.Logf("Could not remove directory %s: %v", dir, err)
}
t.Fatalf("Retrieving merged attestation history failed")
}
if mergedAttestationHistory[firstStorePubKey].TargetToSource[0] != history.FirstStorePubKeyAttestations[0] {
t.Fatalf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStorePubKeyAttestations[0],
mergedAttestationHistory[firstStorePubKey].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKey].TargetToSource[0] != history.SecondStorePubKeyAttestations[0] {
t.Fatalf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStorePubKeyAttestations,
mergedAttestationHistory[secondStorePubKey].TargetToSource[0])
}
}
2 changes: 1 addition & 1 deletion validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (v *ValidatorService) Start() {
return
}

valDB, err := db.NewKVStoreWithPublicKeyBuckets(v.dataDir, pubkeys)
valDB, err := db.NewKVStore(v.dataDir, pubkeys)
if err != nil {
log.Errorf("Could not initialize db: %v", err)
return
Expand Down
2 changes: 2 additions & 0 deletions validator/db/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_test(
name = "go_default_test",
srcs = [
"attestation_history_test.go",
"manage_test.go",
"proposal_history_test.go",
"setup_db_test.go",
],
Expand All @@ -39,6 +40,7 @@ go_test(
"//beacon-chain/core/helpers:go_default_library",
"//proto/slashing:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)