Skip to content

Commit

Permalink
Save state to DB during long non-finality (#7597)
Browse files Browse the repository at this point in the history
* Starting saving state during hot

* Add a log

* Add helpers to turn on/off mode

* Add locks

* Add missing return

* Clean up

* Add logic to migration to handle db roots

* Add tests for on and off

* Add more tests

* Add test for migrate

* @prestonvanloon's feedback

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Oct 23, 2020
1 parent ca081e8 commit 840ffc8
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 0 deletions.
27 changes: 27 additions & 0 deletions beacon-chain/blockchain/receive_block.go
Expand Up @@ -7,12 +7,16 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

// This defines how many epochs since finality the run time will begin to save hot state on to the DB.
var epochsSinceFinalitySaveHotStateDB = 100

// BlockReceiver interface defines the methods of chain service receive and processing new blocks.
type BlockReceiver interface {
ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
Expand Down Expand Up @@ -58,6 +62,11 @@ func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlo
return err
}

// Have we been finalizing? Should we start saving hot states to db?
if err := s.checkSaveHotStateDB(ctx); err != nil {
return err
}

// Reports on block and fork choice metrics.
reportSlotMetrics(blockCopy.Block.Slot, s.HeadSlot(), s.CurrentSlot(), s.finalizedCheckpt)

Expand Down Expand Up @@ -179,3 +188,21 @@ func (s *Service) handlePostBlockOperations(b *ethpb.BeaconBlock) error {
}
return nil
}

// This checks whether it's time to start saving hot state to DB.
// It's time when there's `epochsSinceFinalitySaveHotStateDB` epochs of non-finality.
func (s *Service) checkSaveHotStateDB(ctx context.Context) error {
currentEpoch := helpers.SlotToEpoch(s.CurrentSlot())
// Prevent `sinceFinality` going underflow.
var sinceFinality uint64
if currentEpoch > s.finalizedCheckpt.Epoch {
sinceFinality = currentEpoch - s.finalizedCheckpt.Epoch
}

if sinceFinality >= uint64(epochsSinceFinalitySaveHotStateDB) {
s.stateGen.EnableSaveHotStateToDB(ctx)
return nil
}

return s.stateGen.DisableSaveHotStateToDB(ctx)
}
40 changes: 40 additions & 0 deletions beacon-chain/blockchain/receive_block_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"
"testing"
"time"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
blockchainTesting "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
logTest "github.com/sirupsen/logrus/hooks/test"
)

func TestService_ReceiveBlock(t *testing.T) {
Expand Down Expand Up @@ -373,3 +375,41 @@ func TestService_HasInitSyncBlock(t *testing.T) {
t.Error("Should have block")
}
}

func TestCheckSaveHotStateDB_Enabling(t *testing.T) {
db, stateSummaryCache := testDB.SetupDB(t)
hook := logTest.NewGlobal()
s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)})
require.NoError(t, err)
st := params.BeaconConfig().SlotsPerEpoch * uint64(epochsSinceFinalitySaveHotStateDB)
s.genesisTime = time.Now().Add(time.Duration(-1*int64(st)*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second)
s.finalizedCheckpt = &ethpb.Checkpoint{}

require.NoError(t, s.checkSaveHotStateDB(context.Background()))
assert.LogsContain(t, hook, "Entering mode to save hot states in DB")
}

func TestCheckSaveHotStateDB_Disabling(t *testing.T) {
db, stateSummaryCache := testDB.SetupDB(t)
hook := logTest.NewGlobal()
s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)})
require.NoError(t, err)
s.finalizedCheckpt = &ethpb.Checkpoint{}
require.NoError(t, s.checkSaveHotStateDB(context.Background()))
s.genesisTime = time.Now()

require.NoError(t, s.checkSaveHotStateDB(context.Background()))
assert.LogsContain(t, hook, "Exiting mode to save hot states in DB")
}

func TestCheckSaveHotStateDB_Overflow(t *testing.T) {
db, stateSummaryCache := testDB.SetupDB(t)
hook := logTest.NewGlobal()
s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)})
require.NoError(t, err)
s.finalizedCheckpt = &ethpb.Checkpoint{Epoch: 10000000}
s.genesisTime = time.Now()

require.NoError(t, s.checkSaveHotStateDB(context.Background()))
assert.LogsDoNotContain(t, hook, "Entering mode to save hot states in DB")
}
12 changes: 12 additions & 0 deletions beacon-chain/state/stategen/migrate.go
Expand Up @@ -76,6 +76,18 @@ func (s *State) MigrateToCold(ctx context.Context, fRoot [32]byte) error {
aState = missingState
}
if s.beaconDB.HasState(ctx, aRoot) {
// Remove hot state DB root to prevent it gets deleted later when we turn hot state save DB mode off.
s.saveHotStateDB.lock.Lock()
roots := s.saveHotStateDB.savedStateRoots
for i := 0; i < len(roots); i++ {
if aRoot == roots[i] {
s.saveHotStateDB.savedStateRoots = append(roots[:i], roots[i+1:]...)
// There shouldn't be duplicated roots in `savedStateRoots`.
// Break here is ok.
break
}
}
s.saveHotStateDB.lock.Unlock()
continue
}

Expand Down
24 changes: 24 additions & 0 deletions beacon-chain/state/stategen/migrate_test.go
Expand Up @@ -96,3 +96,27 @@ func TestMigrateToCold_RegeneratePath(t *testing.T) {

require.LogsContain(t, hook, "Saved state in DB")
}

func TestMigrateToCold_StateExistsInDB(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)

service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 1
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
stateSlot := uint64(1)
require.NoError(t, beaconState.SetSlot(stateSlot))
b := testutil.NewBeaconBlock()
b.Block.Slot = 2
fRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.beaconDB.SaveBlock(ctx, b))
require.NoError(t, service.epochBoundaryStateCache.put(fRoot, beaconState))
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, fRoot))

service.saveHotStateDB.savedStateRoots = [][32]byte{{1}, {2}, {3}, {4}, fRoot}
require.NoError(t, service.MigrateToCold(ctx, fRoot))
assert.DeepEqual(t, [][32]byte{{1}, {2}, {3}, {4}}, service.saveHotStateDB.savedStateRoots)
assert.LogsDoNotContain(t, hook, "Saved state in DB")
}
16 changes: 16 additions & 0 deletions beacon-chain/state/stategen/service.go
Expand Up @@ -14,6 +14,8 @@ import (
"go.opencensus.io/trace"
)

var defaultHotStateDBInterval uint64 = 128 // slots

// State represents a management object that handles the internal
// logic of maintaining both hot and cold states in DB.
type State struct {
Expand All @@ -23,6 +25,17 @@ type State struct {
finalizedInfo *finalizedInfo
stateSummaryCache *cache.StateSummaryCache
epochBoundaryStateCache *epochBoundaryState
saveHotStateDB *saveHotStateDbConfig
}

// This tracks the config in the event of long non-finality,
// how often does the node save hot states to db? what are
// the saved hot states in db?... etc
type saveHotStateDbConfig struct {
enabled bool
lock sync.Mutex
duration uint64
savedStateRoots [][32]byte
}

// This tracks the finalized point. It's also the point where slot and the block root of
Expand All @@ -43,6 +56,9 @@ func New(db db.NoHeadAccessDatabase, stateSummaryCache *cache.StateSummaryCache)
slotsPerArchivedPoint: params.BeaconConfig().SlotsPerArchivedPoint,
stateSummaryCache: stateSummaryCache,
epochBoundaryStateCache: newBoundaryStateCache(),
saveHotStateDB: &saveHotStateDbConfig{
duration: defaultHotStateDBInterval,
},
}
}

Expand Down
60 changes: 60 additions & 0 deletions beacon-chain/state/stategen/setter.go
Expand Up @@ -2,13 +2,15 @@ package stategen

import (
"context"
"math"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

Expand Down Expand Up @@ -58,6 +60,23 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, state *
ctx, span := trace.StartSpan(ctx, "stateGen.saveStateByRoot")
defer span.End()

// Duration can't be 0 to prevent panic for division.
duration := uint64(math.Max(float64(s.saveHotStateDB.duration), 1))

s.saveHotStateDB.lock.Lock()
if s.saveHotStateDB.enabled && state.Slot()%duration == 0 {
if err := s.beaconDB.SaveState(ctx, state, blockRoot); err != nil {
return err
}
s.saveHotStateDB.savedStateRoots = append(s.saveHotStateDB.savedStateRoots, blockRoot)

log.WithFields(logrus.Fields{
"slot": state.Slot(),
"totalHotStateSavedInDB": len(s.saveHotStateDB.savedStateRoots),
}).Info("Saving hot state to DB")
}
s.saveHotStateDB.lock.Unlock()

// If the hot state is already in cache, one can be sure the state was processed and in the DB.
if s.hotStateCache.Has(blockRoot) {
return nil
Expand All @@ -81,3 +100,44 @@ func (s *State) saveStateByRoot(ctx context.Context, blockRoot [32]byte, state *

return nil
}

// EnableSaveHotStateToDB enters the mode that saves hot beacon state to the DB.
// This usually gets triggered when there's long duration since finality.
func (s *State) EnableSaveHotStateToDB(_ context.Context) {
s.saveHotStateDB.lock.Lock()
defer s.saveHotStateDB.lock.Unlock()
if s.saveHotStateDB.enabled {
return
}

s.saveHotStateDB.enabled = true

log.WithFields(logrus.Fields{
"enabled": s.saveHotStateDB.enabled,
"slotsInterval": s.saveHotStateDB.duration,
}).Warn("Entering mode to save hot states in DB")
}

// DisableSaveHotStateToDB exits the mode that saves beacon state to DB for the hot states.
// This usually gets triggered once there's finality after long duration since finality.
func (s *State) DisableSaveHotStateToDB(ctx context.Context) error {
s.saveHotStateDB.lock.Lock()
defer s.saveHotStateDB.lock.Unlock()
if !s.saveHotStateDB.enabled {
return nil
}

log.WithFields(logrus.Fields{
"enabled": s.saveHotStateDB.enabled,
"deletedHotStates": len(s.saveHotStateDB.savedStateRoots),
}).Warn("Exiting mode to save hot states in DB")

// Delete previous saved states in DB as we are turning this mode off.
s.saveHotStateDB.enabled = false
if err := s.beaconDB.DeleteStates(ctx, s.saveHotStateDB.savedStateRoots); err != nil {
return err
}
s.saveHotStateDB.savedStateRoots = nil

return nil
}
66 changes: 66 additions & 0 deletions beacon-chain/state/stategen/setter_test.go
Expand Up @@ -109,6 +109,8 @@ func TestSaveState_CanSaveOnEpochBoundary(t *testing.T) {
require.NoError(t, err)
require.Equal(t, true, ok, "Did not save epoch boundary state")
assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary")
// Should have not been saved in DB.
require.Equal(t, false, db.HasState(ctx, r))
}

func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) {
Expand All @@ -131,4 +133,68 @@ func TestSaveState_NoSaveNotEpochBoundary(t *testing.T) {
assert.Equal(t, false, service.beaconDB.HasState(ctx, r), "Should not have saved the state")
assert.Equal(t, true, service.stateSummaryCache.Has(r), "Should have saved the state summary")
require.LogsDoNotContain(t, hook, "Saved full state on epoch boundary")
// Should have not been saved in DB.
require.Equal(t, false, db.HasState(ctx, r))
}

func TestSaveState_CanSaveHotStateToDB(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
service.EnableSaveHotStateToDB(ctx)
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
require.NoError(t, beaconState.SetSlot(defaultHotStateDBInterval))

r := [32]byte{'A'}
require.NoError(t, service.saveStateByRoot(ctx, r, beaconState))

require.LogsContain(t, hook, "Saving hot state to DB")
// Should have saved in DB.
require.Equal(t, true, db.HasState(ctx, r))
}

func TestEnableSaveHotStateToDB_Enabled(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())

service.EnableSaveHotStateToDB(ctx)
require.LogsContain(t, hook, "Entering mode to save hot states in DB")
require.Equal(t, true, service.saveHotStateDB.enabled)
}

func TestEnableSaveHotStateToDB_AlreadyEnabled(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
service.saveHotStateDB.enabled = true
service.EnableSaveHotStateToDB(ctx)
require.LogsDoNotContain(t, hook, "Entering mode to save hot states in DB")
require.Equal(t, true, service.saveHotStateDB.enabled)
}

func TestEnableSaveHotStateToDB_Disabled(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
service.saveHotStateDB.enabled = true
service.saveHotStateDB.savedStateRoots = [][32]byte{{'a'}}
require.NoError(t, service.DisableSaveHotStateToDB(ctx))
require.LogsContain(t, hook, "Exiting mode to save hot states in DB")
require.Equal(t, false, service.saveHotStateDB.enabled)
require.Equal(t, 0, len(service.saveHotStateDB.savedStateRoots))
}

func TestEnableSaveHotStateToDB_AlreadyDisabled(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
require.NoError(t, service.DisableSaveHotStateToDB(ctx))
require.LogsDoNotContain(t, hook, "Exiting mode to save hot states in DB")
require.Equal(t, false, service.saveHotStateDB.enabled)
}

0 comments on commit 840ffc8

Please sign in to comment.