Skip to content

Commit

Permalink
Prune dangling states in DB upon start up (#4697)
Browse files Browse the repository at this point in the history
* Add pruneGarbageState and test
* Comments
* Update beacon-chain/blockchain/service.go

Co-Authored-By: Ivan Martinez <ivanthegreatdev@gmail.com>
* Update beacon-chain/blockchain/service.go

Co-Authored-By: Ivan Martinez <ivanthegreatdev@gmail.com>
* Update beacon-chain/blockchain/service.go

Co-Authored-By: Ivan Martinez <ivanthegreatdev@gmail.com>
* Fixed test
* Merge refs/heads/master into prune-start-up
* Fixed test
  • Loading branch information
terencechain committed Jan 31, 2020
1 parent cc741ed commit d8c2659
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 4 deletions.
22 changes: 22 additions & 0 deletions beacon-chain/blockchain/service.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
f "github.com/prysmaticlabs/prysm/beacon-chain/forkchoice"
"github.com/prysmaticlabs/prysm/beacon-chain/forkchoice/protoarray"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
Expand Down Expand Up @@ -167,6 +168,12 @@ func (s *Service) Start() {
}
}

if finalizedCheckpoint.Epoch > 1 {
if err := s.pruneGarbageState(ctx, helpers.StartSlot(finalizedCheckpoint.Epoch)-params.BeaconConfig().SlotsPerEpoch); err != nil {
log.Fatalf("Could not prune garbaged state: %v", err)
}
}

s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
Expand Down Expand Up @@ -452,6 +459,21 @@ func (s *Service) initializeChainInfo(ctx context.Context) error {
return nil
}

// This is called when a client starts from a non-genesis slot. It deletes the states in DB
// from slot 1 (avoid genesis state) to `slot`.
func (s *Service) pruneGarbageState(ctx context.Context, slot uint64) error {
filter := filters.NewFilter().SetStartSlot(1).SetEndSlot(slot)
roots, err := s.beaconDB.BlockRoots(ctx, filter)
if err != nil {
return err
}
if err := s.beaconDB.DeleteStates(ctx, roots); err != nil {
return err
}

return nil
}

// This is called when a client starts from non-genesis slot. This passes last justified and finalized
// information to fork choice service to initializes fork choice store.
func (s *Service) resumeForkChoice(
Expand Down
50 changes: 50 additions & 0 deletions beacon-chain/blockchain/service_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
Expand Down Expand Up @@ -381,3 +382,52 @@ func TestChainService_SaveHeadNoDB(t *testing.T) {
t.Error("head block should not be equal")
}
}

func TestChainService_PruneOldStates(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
ctx := context.Background()
s := &Service{
beaconDB: db,
}

for i := 0; i < 100; i++ {
block := &ethpb.BeaconBlock{Slot: uint64(i)}
if err := s.beaconDB.SaveBlock(ctx, &ethpb.SignedBeaconBlock{Block: block}); err != nil {
t.Fatal(err)
}
r, err := ssz.HashTreeRoot(block)
if err != nil {
t.Fatal(err)
}
state := &pb.BeaconState{Slot: uint64(i)}
newState, err := beaconstate.InitializeFromProto(state)
if err != nil {
t.Fatal(err)
}
if err := s.beaconDB.SaveState(ctx, newState, r); err != nil {
t.Fatal(err)
}
}

// Delete half of the states.
if err := s.pruneGarbageState(ctx, 50); err != nil {
t.Fatal(err)
}

filter := filters.NewFilter().SetStartSlot(1).SetEndSlot(100)
roots, err := s.beaconDB.BlockRoots(ctx, filter)
if err != nil {
t.Fatal(err)
}

for i := 1; i < 50; i++ {
s, err := s.beaconDB.State(ctx, roots[i])
if err != nil {
t.Fatal(err)
}
if s != nil {
t.Errorf("wanted nil for slot %d", i)
}
}
}
1 change: 0 additions & 1 deletion beacon-chain/sync/initial-sync/round_robin.go
Expand Up @@ -42,7 +42,6 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()


if cfg := featureconfig.Get(); cfg.EnableSkipSlotsCache {
cfg.EnableSkipSlotsCache = false
featureconfig.Init(cfg)
Expand Down
4 changes: 2 additions & 2 deletions endtoend/endtoend_test.go
Expand Up @@ -100,10 +100,10 @@ func runEndToEndTest(t *testing.T, config *end2EndConfig) {

syncNodeInfo := startNewBeaconNode(t, config, beaconNodes)
beaconNodes = append(beaconNodes, syncNodeInfo)
index := uint64(len(beaconNodes)-1)
index := uint64(len(beaconNodes) - 1)

// Sleep until the next epoch to give time for the newly started node to sync.
extraTimeToSync := (config.epochsToRun+3)*epochSeconds+60
extraTimeToSync := (config.epochsToRun+3)*epochSeconds + 60
genesisTime.Add(time.Duration(extraTimeToSync) * time.Second)
// Wait until middle of epoch to request to prevent conflicts.
time.Sleep(time.Until(genesisTime))
Expand Down
2 changes: 1 addition & 1 deletion endtoend/validator.go
Expand Up @@ -59,7 +59,7 @@ func initializeValidators(
fmt.Sprintf("--monitoring-port=%d", 9280+n),
fmt.Sprintf("--datadir=%s/eth2-val-%d", tmpPath, n),
fmt.Sprintf("--beacon-rpc-provider=localhost:%d", 4200+n),
fmt.Sprintf("--log-file=%s",file.Name()),
fmt.Sprintf("--log-file=%s", file.Name()),
}
args = append(args, config.validatorFlags...)

Expand Down

0 comments on commit d8c2659

Please sign in to comment.