Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Micro optimizations on new-state-mgmt service for initial syncing #5241

Merged
merged 57 commits into from Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
512ef28
Starting a quick PoC
terencechain Mar 25, 2020
7a370b0
Rate limit to one epoch worth of blocks in memory
terencechain Mar 26, 2020
5ef3f0b
Proof of concept working
terencechain Mar 26, 2020
96d78ea
Quick comment out
terencechain Mar 26, 2020
4197c1d
Save previous finalized checkpoint
terencechain Mar 26, 2020
15d629e
Merge branch 'master' of github.com:prysmaticlabs/prysm into batch-save
terencechain Mar 26, 2020
cf92a43
Test
terencechain Mar 26, 2020
b6013af
Merge branch 'prev-finalized-getter' into batch-save
terencechain Mar 26, 2020
7e8c0b9
Minor fixes
terencechain Mar 26, 2020
14e6379
Use a map
terencechain Mar 28, 2020
7e7273a
More run time fixes
terencechain Mar 28, 2020
9ba5b11
Remove panic
terencechain Mar 28, 2020
0281a5f
Feature flag
terencechain Mar 28, 2020
8df754c
Removed unused methods
terencechain Mar 28, 2020
1b6f64c
Fixed tests
terencechain Mar 28, 2020
09729aa
E2e test
terencechain Mar 28, 2020
fc2589e
Merge branch 'master' into batch-save
terencechain Mar 28, 2020
1b7f614
comment
terencechain Mar 28, 2020
3b52bd1
Merge branch 'master' into batch-save
terencechain Mar 28, 2020
9e88161
Compatible with current initial sync
terencechain Mar 28, 2020
280ea97
Merge branch 'batch-save' of github.com:prysmaticlabs/prysm into batc…
terencechain Mar 28, 2020
e7ee4d5
Merge refs/heads/master into batch-save
prylabs-bulldozer[bot] Mar 29, 2020
a0fad8a
Merge refs/heads/master into batch-save
prylabs-bulldozer[bot] Mar 29, 2020
271b489
Merge refs/heads/master into batch-save
prylabs-bulldozer[bot] Mar 29, 2020
2a617b5
Starting
terencechain Mar 29, 2020
f8cdf33
New cache
terencechain Mar 29, 2020
2d810ac
Cache getters and setters
terencechain Mar 29, 2020
70cbaf8
It should be part of state gen
terencechain Mar 29, 2020
785a605
Need to use cache for DB
terencechain Mar 29, 2020
1ea753c
Don't have to use finalized state
terencechain Mar 29, 2020
e4a5a64
Rm unused file
terencechain Mar 29, 2020
b297cb1
Merge branch 'master' of github.com:prysmaticlabs/prysm into new-stat…
terencechain Mar 29, 2020
f5b3e7f
some changes to memory mgmt when using mempool
prestonvanloon Mar 29, 2020
9b9ef91
More run time fixes
terencechain Mar 30, 2020
1ad8566
Can sync to head
terencechain Mar 30, 2020
add721e
Merge branch 'master' of github.com:prysmaticlabs/prysm into batch-save
terencechain Mar 30, 2020
acfba22
Feedback
terencechain Mar 30, 2020
ef3263f
Merge branch 'batch-save' of github.com:prysmaticlabs/prysm into batc…
terencechain Mar 30, 2020
f3b415f
Merge refs/heads/master into batch-save
prylabs-bulldozer[bot] Mar 30, 2020
918fb4e
Merge refs/heads/master into new-state-init-sync
prylabs-bulldozer[bot] Mar 30, 2020
20cd8d1
Merge branch 'batch-save' of github.com:prysmaticlabs/prysm into new-…
terencechain Mar 30, 2020
b8cb68b
Revert "some changes to memory mgmt when using mempool"
terencechain Mar 30, 2020
85e8317
Fixed sync tests
terencechain Mar 30, 2020
6707fb0
Fixed existing tests
terencechain Mar 30, 2020
056a647
Merge branch 'new-state-init-sync' of github.com:prysmaticlabs/prysm …
terencechain Mar 30, 2020
6bfbb19
Merge branch 'master' into new-state-init-sync
terencechain Mar 30, 2020
c46ec81
Test for state summary getter
terencechain Mar 30, 2020
a8b6ef9
Merge branch 'new-state-init-sync' of github.com:prysmaticlabs/prysm …
terencechain Mar 30, 2020
6231667
Gaz
terencechain Mar 30, 2020
6a5bb43
Fix kafka passthrough
terencechain Mar 30, 2020
cf10be8
Fixed inputs
terencechain Mar 30, 2020
26b72a9
Gaz
terencechain Mar 30, 2020
736f71e
Fixed build
terencechain Mar 30, 2020
a195f1c
Fixed visibility
terencechain Mar 30, 2020
d5a2744
Trying without the ignore
terencechain Mar 30, 2020
2281a80
Didn't work..
terencechain Mar 30, 2020
7e5ebcc
Fix kafka
terencechain Mar 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion beacon-chain/blockchain/init_sync_process_block_test.go
Expand Up @@ -6,6 +6,7 @@ import (

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestPruneNonBoundary_CanPrune(t *testing.T) {
func TestGenerateState_CorrectlyGenerated(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
cfg := &Config{BeaconDB: db, StateGen: stategen.New(db)}
cfg := &Config{BeaconDB: db, StateGen: stategen.New(db, cache.NewStateSummaryCache())}
service, err := NewService(context.Background(), cfg)
if err != nil {
t.Fatal(err)
Expand Down
18 changes: 9 additions & 9 deletions beacon-chain/blockchain/process_block.go
Expand Up @@ -140,12 +140,13 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock)
}

if featureconfig.Get().NewStateMgmt {
finalizedState, err := s.stateGen.StateByRoot(ctx, fRoot)
fRoot := bytesutil.ToBytes32(postState.FinalizedCheckpoint().Root)
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "could not get finalized block to migrate")
}
if err := s.stateGen.MigrateToCold(ctx, finalizedState, fRoot); err != nil {
return nil, err
if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil {
return nil, errors.Wrap(err, "could not migrate to cold")
}
}
}
Expand Down Expand Up @@ -300,13 +301,12 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed

if featureconfig.Get().NewStateMgmt {
fRoot := bytesutil.ToBytes32(postState.FinalizedCheckpoint().Root)
finalizedState, err := s.stateGen.StateByRoot(ctx, fRoot)
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return errors.Wrap(err, "could not get state by root for migration")
return errors.Wrap(err, "could not get finalized block to migrate")
}
if err := s.stateGen.MigrateToCold(ctx, finalizedState, fRoot); err != nil {
return errors.Wrap(err, "could not migrate with new finalized root")

if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil {
return errors.Wrap(err, "could not migrate to cold")
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion beacon-chain/cache/BUILD.bazel
Expand Up @@ -11,11 +11,16 @@ go_library(
"eth1_data.go",
"hot_state_cache.go",
"skip_slot_cache.go",
"state_summary.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/cache",
visibility = ["//beacon-chain:__subpackages__"],
visibility = [
"//beacon-chain:__subpackages__",
"//tools:__subpackages__",
],
deps = [
"//beacon-chain/state:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
Expand Down
65 changes: 65 additions & 0 deletions beacon-chain/cache/state_summary.go
@@ -0,0 +1,65 @@
package cache

import (
"sync"

pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)

// StateSummaryCache caches state summary object.
type StateSummaryCache struct {
initSyncStateSummaries map[[32]byte]*pb.StateSummary
initSyncStateSummariesLock sync.RWMutex
}

// NewStateSummaryCache creates a new state summary cache.
func NewStateSummaryCache() *StateSummaryCache {
return &StateSummaryCache{
initSyncStateSummaries: make(map[[32]byte]*pb.StateSummary),
}
}

// Put saves a state summary to the initial sync state summaries cache.
func (s *StateSummaryCache) Put(r [32]byte, b *pb.StateSummary) {
s.initSyncStateSummariesLock.Lock()
defer s.initSyncStateSummariesLock.Unlock()
s.initSyncStateSummaries[r] = b
}

// Has checks if a state summary exists in the initial sync state summaries cache using the root
// of the block.
func (s *StateSummaryCache) Has(r [32]byte) bool {
s.initSyncStateSummariesLock.RLock()
defer s.initSyncStateSummariesLock.RUnlock()
_, ok := s.initSyncStateSummaries[r]
return ok
}

// Get retrieves a state summary from the initial sync state summaries cache using the root of
// the block.
func (s *StateSummaryCache) Get(r [32]byte) *pb.StateSummary {
s.initSyncStateSummariesLock.RLock()
defer s.initSyncStateSummariesLock.RUnlock()
b := s.initSyncStateSummaries[r]
return b
}

// GetAll retrieves all the beacon state summaries from the initial sync state summaries cache, the returned
// state summaries are unordered.
func (s *StateSummaryCache) GetAll() []*pb.StateSummary {
s.initSyncStateSummariesLock.RLock()
defer s.initSyncStateSummariesLock.RUnlock()

blks := make([]*pb.StateSummary, 0, len(s.initSyncStateSummaries))
for _, b := range s.initSyncStateSummaries {
blks = append(blks, b)
}
return blks
}

// Clear clears out the initial sync state summaries cache.
func (s *StateSummaryCache) Clear() {
s.initSyncStateSummariesLock.Lock()
defer s.initSyncStateSummariesLock.Unlock()
s.initSyncStateSummaries = make(map[[32]byte]*pb.StateSummary)
}
1 change: 1 addition & 0 deletions beacon-chain/db/BUILD.bazel
Expand Up @@ -26,6 +26,7 @@ go_library(
"//tools:__subpackages__",
],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
Expand Down
9 changes: 6 additions & 3 deletions beacon-chain/db/db.go
@@ -1,8 +1,11 @@
package db

import "github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
import (
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
)

// NewDB initializes a new DB.
func NewDB(dirPath string) (Database, error) {
return kv.NewKVStore(dirPath)
func NewDB(dirPath string, stateSummaryCache *cache.StateSummaryCache) (Database, error) {
return kv.NewKVStore(dirPath, stateSummaryCache)
}
5 changes: 3 additions & 2 deletions beacon-chain/db/db_kafka_wrapped.go
@@ -1,13 +1,14 @@
package db

import (
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kafka"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
)

// NewDB initializes a new DB with kafka wrapper.
func NewDB(dirPath string) (Database, error) {
db, err := kv.NewKVStore(dirPath)
func NewDB(dirPath string, stateSummaryCache *cache.StateSummaryCache) (Database, error) {
db, err := kv.NewKVStore(dirPath, stateSummaryCache)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/db/iface/interface.go
Expand Up @@ -90,6 +90,7 @@ type NoHeadAccessDatabase interface {
DeleteState(ctx context.Context, blockRoot [32]byte) error
DeleteStates(ctx context.Context, blockRoots [][32]byte) error
SaveStateSummary(ctx context.Context, summary *ethereum_beacon_p2p_v1.StateSummary) error
SaveStateSummaries(ctx context.Context, summaries []*ethereum_beacon_p2p_v1.StateSummary) error
// Slashing operations.
SaveProposerSlashing(ctx context.Context, slashing *eth.ProposerSlashing) error
SaveAttesterSlashing(ctx context.Context, slashing *eth.AttesterSlashing) error
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/db/kafka/passthrough.go
Expand Up @@ -238,6 +238,11 @@ func (e Exporter) SaveStateSummary(ctx context.Context, summary *pb.StateSummary
return e.db.SaveStateSummary(ctx, summary)
}

// SaveStateSummaries -- passthrough.
func (e Exporter) SaveStateSummaries(ctx context.Context, summaries []*pb.StateSummary) error {
return e.db.SaveStateSummaries(ctx, summaries)
}

// SaveStates -- passthrough.
func (e Exporter) SaveStates(ctx context.Context, states []*state.BeaconState, blockRoots [][32]byte) error {
return e.db.SaveStates(ctx, states, blockRoots)
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/db/kv/BUILD.bazel
Expand Up @@ -26,6 +26,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kv",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/iface:go_default_library",
Expand Down Expand Up @@ -74,6 +75,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/state:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/db/kv/blocks.go
Expand Up @@ -263,7 +263,7 @@ func (k *Store) SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error
defer span.End()
return k.db.Update(func(tx *bolt.Tx) error {
if featureconfig.Get().NewStateMgmt {
if tx.Bucket(stateSummaryBucket).Get(blockRoot[:]) == nil {
if tx.Bucket(stateSummaryBucket).Get(blockRoot[:]) == nil && !k.stateSummaryCache.Has(blockRoot) {
return errors.New("no state summary found with head block root")
}
} else {
Expand Down
5 changes: 3 additions & 2 deletions beacon-chain/db/kv/checkpoint.go
Expand Up @@ -5,6 +5,7 @@ import (
"errors"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/traceutil"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -65,7 +66,7 @@ func (k *Store) SaveJustifiedCheckpoint(ctx context.Context, checkpoint *ethpb.C
return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
if featureconfig.Get().NewStateMgmt {
if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil {
if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil && !k.stateSummaryCache.Has(bytesutil.ToBytes32(checkpoint.Root)) {
return errors.New("missing state summary for finalized root")
}
} else {
Expand Down Expand Up @@ -93,7 +94,7 @@ func (k *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.C
return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
if featureconfig.Get().NewStateMgmt {
if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil {
if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil && !k.stateSummaryCache.Has(bytesutil.ToBytes32(checkpoint.Root)) {
return errors.New("missing state summary for finalized root")
}
} else {
Expand Down
5 changes: 4 additions & 1 deletion beacon-chain/db/kv/kv.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prombolt "github.com/prysmaticlabs/prombbolt"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/db/iface"
bolt "go.etcd.io/bbolt"
)
Expand Down Expand Up @@ -38,12 +39,13 @@ type Store struct {
validatorIndexCache *ristretto.Cache
stateSlotBitLock sync.Mutex
blockSlotBitLock sync.Mutex
stateSummaryCache *cache.StateSummaryCache
}

// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(dirPath string) (*Store, error) {
func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*Store, error) {
if err := os.MkdirAll(dirPath, 0700); err != nil {
return nil, err
}
Expand Down Expand Up @@ -79,6 +81,7 @@ func NewKVStore(dirPath string) (*Store, error) {
databasePath: dirPath,
blockCache: blockCache,
validatorIndexCache: validatorCache,
stateSummaryCache: stateSummaryCache,
}

if err := kv.db.Update(func(tx *bolt.Tx) error {
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/db/kv/kv_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"testing"

"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/shared/testutil"
)

Expand All @@ -21,7 +22,7 @@ func setupDB(t testing.TB) *Store {
if err := os.RemoveAll(path); err != nil {
t.Fatalf("Failed to remove directory: %v", err)
}
db, err := NewKVStore(path)
db, err := NewKVStore(path, cache.NewStateSummaryCache())
if err != nil {
t.Fatalf("Failed to instantiate DB: %v", err)
}
Expand Down
20 changes: 20 additions & 0 deletions beacon-chain/db/kv/state_summary.go
Expand Up @@ -23,6 +23,26 @@ func (k *Store) SaveStateSummary(ctx context.Context, summary *pb.StateSummary)
})
}

// SaveStateSummaries saves state summary objects to the DB.
func (k *Store) SaveStateSummaries(ctx context.Context, summaries []*pb.StateSummary) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveStateSummaries")
defer span.End()

return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateSummaryBucket)
for _, summary := range summaries {
enc, err := encode(summary)
if err != nil {
return err
}
if err := bucket.Put(summary.Root, enc); err != nil {
return err
}
}
return nil
})
}

// StateSummary returns the state summary object from the db using input block root.
func (k *Store) StateSummary(ctx context.Context, blockRoot [32]byte) (*pb.StateSummary, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.StateSummary")
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/db/testing/BUILD.bazel
Expand Up @@ -7,6 +7,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/testing",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//shared/testutil:go_default_library",
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/db/testing/setup_db.go
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"testing"

"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/shared/testutil"
Expand All @@ -23,7 +24,7 @@ func SetupDB(t testing.TB) db.Database {
if err := os.RemoveAll(p); err != nil {
t.Fatalf("failed to remove directory: %v", err)
}
s, err := kv.NewKVStore(p)
s, err := kv.NewKVStore(p, cache.NewStateSummaryCache())
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/node/BUILD.bazel
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//beacon-chain/archiver:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/cache/depositcache:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/flags:go_default_library",
Expand Down