-
Notifications
You must be signed in to change notification settings - Fork 1k
/
service.go
167 lines (148 loc) · 5.94 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Package stategen defines functions to regenerate beacon chain states
// by replaying blocks from a stored state checkpoint, useful for
// optimization and reducing a beacon node's resource consumption.
package stategen
import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/forkchoice"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/sync/backfill"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
"go.opencensus.io/trace"
)
var defaultHotStateDBInterval primitives.Slot = 128
// StateManager represents a management object that handles the internal
// logic of maintaining both hot and cold states in DB.
type StateManager interface {
Resume(ctx context.Context, fState state.BeaconState) (state.BeaconState, error)
DisableSaveHotStateToDB(ctx context.Context) error
EnableSaveHotStateToDB(_ context.Context)
HasState(ctx context.Context, blockRoot [32]byte) (bool, error)
DeleteStateFromCaches(ctx context.Context, blockRoot [32]byte) error
ForceCheckpoint(ctx context.Context, root []byte) error
SaveState(ctx context.Context, blockRoot [32]byte, st state.BeaconState) error
SaveFinalizedState(fSlot primitives.Slot, fRoot [32]byte, fState state.BeaconState)
MigrateToCold(ctx context.Context, fRoot [32]byte) error
StateByRoot(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
ActiveNonSlashedBalancesByRoot(context.Context, [32]byte) ([]uint64, error)
StateByRootIfCachedNoCopy(blockRoot [32]byte) state.BeaconState
StateByRootInitialSync(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
}
// State is a concrete implementation of StateManager.
type State struct {
beaconDB db.NoHeadAccessDatabase
slotsPerArchivedPoint primitives.Slot
hotStateCache *hotStateCache
finalizedInfo *finalizedInfo
epochBoundaryStateCache *epochBoundaryState
saveHotStateDB *saveHotStateDbConfig
backfillStatus *backfill.Status
migrationLock *sync.Mutex
fc forkchoice.ForkChoicer
}
// This tracks the config in the event of long non-finality,
// how often does the node save hot states to db? what are
// the saved hot states in db?... etc
type saveHotStateDbConfig struct {
enabled bool
lock sync.Mutex
duration primitives.Slot
blockRootsOfSavedStates [][32]byte
}
// This tracks the finalized point. It's also the point where slot and the block root of
// cold and hot sections of the DB splits.
type finalizedInfo struct {
slot primitives.Slot
root [32]byte
state state.BeaconState
lock sync.RWMutex
}
// StateGenOption is a functional option for controlling the initialization of a *State value
type StateGenOption func(*State)
func WithBackfillStatus(bfs *backfill.Status) StateGenOption {
return func(sg *State) {
sg.backfillStatus = bfs
}
}
// New returns a new state management object.
func New(beaconDB db.NoHeadAccessDatabase, fc forkchoice.ForkChoicer, opts ...StateGenOption) *State {
s := &State{
beaconDB: beaconDB,
hotStateCache: newHotStateCache(),
finalizedInfo: &finalizedInfo{slot: 0, root: params.BeaconConfig().ZeroHash},
slotsPerArchivedPoint: params.BeaconConfig().SlotsPerArchivedPoint,
epochBoundaryStateCache: newBoundaryStateCache(),
saveHotStateDB: &saveHotStateDbConfig{
duration: defaultHotStateDBInterval,
},
migrationLock: new(sync.Mutex),
fc: fc,
}
for _, o := range opts {
o(s)
}
fc.Lock()
defer fc.Unlock()
fc.SetBalancesByRooter(s.ActiveNonSlashedBalancesByRoot)
return s
}
// Resume resumes a new state management object from previously saved finalized checkpoint in DB.
func (s *State) Resume(ctx context.Context, fState state.BeaconState) (state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "stateGen.Resume")
defer span.End()
c, err := s.beaconDB.FinalizedCheckpoint(ctx)
if err != nil {
return nil, err
}
fRoot := bytesutil.ToBytes32(c.Root)
// Resume as genesis state if last finalized root is zero hashes.
if fRoot == params.BeaconConfig().ZeroHash {
st, err := s.beaconDB.GenesisState(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get genesis state")
}
// Save genesis state in the hot state cache.
gbr, err := s.beaconDB.GenesisBlockRoot(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get genesis block root")
}
return st, s.SaveState(ctx, gbr, st)
}
if fState == nil || fState.IsNil() {
return nil, errors.New("finalized state is nil")
}
go func() {
if err := s.beaconDB.CleanUpDirtyStates(ctx, s.slotsPerArchivedPoint); err != nil {
log.WithError(err).Error("Could not clean up dirty states")
}
}()
s.finalizedInfo = &finalizedInfo{slot: fState.Slot(), root: fRoot, state: fState.Copy()}
return fState, nil
}
// SaveFinalizedState saves the finalized slot, root and state into memory to be used by state gen service.
// This used for migration at the correct start slot and used for hot state play back to ensure
// lower bound to start is always at the last finalized state.
func (s *State) SaveFinalizedState(fSlot primitives.Slot, fRoot [32]byte, fState state.BeaconState) {
s.finalizedInfo.lock.Lock()
defer s.finalizedInfo.lock.Unlock()
s.finalizedInfo.root = fRoot
s.finalizedInfo.state = fState.Copy()
s.finalizedInfo.slot = fSlot
}
// Returns true if input root equals to cached finalized root.
func (s *State) isFinalizedRoot(r [32]byte) bool {
s.finalizedInfo.lock.RLock()
defer s.finalizedInfo.lock.RUnlock()
return r == s.finalizedInfo.root
}
// Returns the cached and copied finalized state.
func (s *State) finalizedState() state.BeaconState {
s.finalizedInfo.lock.RLock()
defer s.finalizedInfo.lock.RUnlock()
return s.finalizedInfo.state.Copy()
}