Skip to content

Commit

Permalink
Recover skipped archived point (#5950)
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain committed May 22, 2020
1 parent 4fb3307 commit 8eb7c7a
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 1 deletion.
1 change: 1 addition & 0 deletions beacon-chain/state/stategen/BUILD.bazel
Expand Up @@ -30,6 +30,7 @@ go_library(
"//shared/params:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
Expand Down
55 changes: 55 additions & 0 deletions beacon-chain/state/stategen/migrate.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"

"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -55,6 +56,15 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finaliz
nextArchivedPointSlot := (lastArchivedIndex + 1) * s.slotsPerArchivedPoint
// Only migrate if current slot is equal to or greater than next archived point slot.
if stateSummary.Slot >= nextArchivedPointSlot {
// If was a skipped archival index. The node should recover previous last archived index and state.
if skippedArchivedPoint(archivedPointIndex, lastArchivedIndex) {
recoveredIndex, err := s.recoverArchivedPoint(ctx, archivedPointIndex)
if err != nil {
return err
}
lastArchivedIndex = recoveredIndex
}

if !s.beaconDB.HasState(ctx, r) {
continue
}
Expand Down Expand Up @@ -99,3 +109,48 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finaliz

return nil
}

// This recovers the last archived point. By passing in the current archived point, this recomputes
// the state of last skipped archived point and save the missing state, archived point root, archived index to the DB.
func (s *State) recoverArchivedPoint(ctx context.Context, currentArchivedPoint uint64) (uint64, error) {
missingIndex := currentArchivedPoint - 1
missingIndexSlot := missingIndex * s.slotsPerArchivedPoint
blks, err := s.beaconDB.HighestSlotBlocksBelow(ctx, missingIndexSlot)
if err != nil {
return 0, err
}
if len(blks) != 1 {
return 0, errUnknownBlock
}
missingRoot, err := ssz.HashTreeRoot(blks[0].Block)
if err != nil {
return 0, err
}
missingState, err := s.StateByRoot(ctx, missingRoot)
if err != nil {
return 0, err
}

if err := s.beaconDB.SaveState(ctx, missingState, missingRoot); err != nil {
return 0, err
}
if err := s.beaconDB.SaveArchivedPointRoot(ctx, missingRoot, missingIndex); err != nil {
return 0, err
}
if err := s.beaconDB.SaveLastArchivedIndex(ctx, missingIndex); err != nil {
return 0, err
}

log.WithFields(logrus.Fields{
"slot": blks[0].Block.Slot,
"archiveIndex": missingIndex,
"root": hex.EncodeToString(bytesutil.Trunc(missingRoot[:])),
}).Info("Saved recovered archived point during state migration")

return missingIndex, nil
}

// This returns true if the last archived point was skipped.
func skippedArchivedPoint(currentArchivedPoint uint64, lastArchivedPoint uint64) bool {
return currentArchivedPoint-lastArchivedPoint > 1
}
75 changes: 75 additions & 0 deletions beacon-chain/state/stategen/migrate_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
Expand Down Expand Up @@ -173,3 +174,77 @@ func TestMigrateToCold_CantDeleteCurrentArchivedIndex(t *testing.T) {
t.Error("State should not be deleted")
}
}

func TestSkippedArchivedPoint_CanRecover(t *testing.T) {
db := testDB.SetupDB(t)
ctx := context.Background()
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 32

b := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 31}}
if err := service.beaconDB.SaveBlock(ctx, b); err != nil {
t.Fatal(err)
}
r, err := ssz.HashTreeRoot(b.Block)
if err != nil {
t.Fatal(err)
}

beaconState, _ := testutil.DeterministicGenesisState(t, 32)
if err := beaconState.SetSlot(31); err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveState(ctx, beaconState, r); err != nil {
t.Fatal(err)
}

currentArchivedPoint := uint64(2)
lastPoint, err := service.recoverArchivedPoint(ctx, currentArchivedPoint)
if err != nil {
t.Fatal(err)
}

if lastPoint != currentArchivedPoint-1 {
t.Error("Did not get wanted point")
}
if !service.beaconDB.HasArchivedPoint(ctx, lastPoint) {
t.Error("Did not save archived point index")
}
if service.beaconDB.ArchivedPointRoot(ctx, lastPoint) != r {
t.Error("Did not get wanted archived index root")
}
}

func TestSkippedArchivedPoint(t *testing.T) {
tests := []struct {
a uint64
b uint64
c bool
}{
{
a: 0,
b: 1,
c: false,
},
{
a: 1,
b: 1,
c: false,
},
{
a: 1,
b: 2,
c: false,
},
{
a: 1,
b: 3,
c: true,
},
}
for _, tt := range tests {
if tt.c != skippedArchivedPoint(tt.b, tt.a) {
t.Fatalf("skippedArchivedPoint(%d, %d) = %v, wanted: %v", tt.b, tt.a, skippedArchivedPoint(tt.b, tt.a), tt.c)
}
}
}
2 changes: 1 addition & 1 deletion shared/cmd/flags.go
Expand Up @@ -189,7 +189,7 @@ var (
// GrpcMaxCallRecvMsgSizeFlag defines the max call message size for GRPC
GrpcMaxCallRecvMsgSizeFlag = &cli.IntFlag{
Name: "grpc-max-msg-size",
Usage: "Integer to define max recieve message call size (default: 4194304 (for 40MB))",
Usage: "Integer to define max recieve message call size (default: 4194304 (for 4MB))",
Value: 1 << 22,
}
)

0 comments on commit 8eb7c7a

Please sign in to comment.