/
kv.go
208 lines (185 loc) · 5.97 KB
/
kv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// Package kv defines a bolt-db, key-value store implementation
// of the Database interface defined by a Prysm beacon node.
package kv
import (
"context"
"os"
"path"
"time"
"github.com/dgraph-io/ristretto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prombolt "github.com/prysmaticlabs/prombbolt"
"github.com/prysmaticlabs/prysm/beacon-chain/db/iface"
"github.com/prysmaticlabs/prysm/shared/fileutil"
"github.com/prysmaticlabs/prysm/shared/params"
bolt "go.etcd.io/bbolt"
)
var _ iface.Database = (*Store)(nil)
const (
// VotesCacheSize with 1M validators will be 8MB.
VotesCacheSize = 1 << 23
// NumOfVotes specifies the vote cache size.
NumOfVotes = 1 << 20
// BeaconNodeDbDirName is the name of the directory containing the beacon node database.
BeaconNodeDbDirName = "beaconchaindata"
// DatabaseFileName is the name of the beacon node database.
DatabaseFileName = "beaconchain.db"
boltAllocSize = 8 * 1024 * 1024
)
// BlockCacheSize specifies 1000 slots worth of blocks cached, which
// would be approximately 2MB
var BlockCacheSize = int64(1 << 21)
// blockedBuckets represents the buckets that we want to restrict
// from our metrics fetching for performance reasons. For a detailed
// summary, it can be read in https://github.com/prysmaticlabs/prysm/issues/8274.
var blockedBuckets = [][]byte{
blocksBucket,
stateSummaryBucket,
blockParentRootIndicesBucket,
blockSlotIndicesBucket,
finalizedBlockRootsIndexBucket,
}
// Config for the bolt db kv store.
type Config struct {
InitialMMapSize int
}
// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for Ethereum Beacon Nodes.
type Store struct {
db *bolt.DB
databasePath string
blockCache *ristretto.Cache
validatorIndexCache *ristretto.Cache
stateSummaryCache *stateSummaryCache
ctx context.Context
}
// KVStoreDatafilePath is the canonical construction of a full
// database file path from the directory path, so that code outside
// this package can find the full path in a consistent way.
func KVStoreDatafilePath(dirPath string) string {
return path.Join(dirPath, DatabaseFileName)
}
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, error) {
hasDir, err := fileutil.HasDir(dirPath)
if err != nil {
return nil, err
}
if !hasDir {
if err := fileutil.MkdirAll(dirPath); err != nil {
return nil, err
}
}
datafile := KVStoreDatafilePath(dirPath)
boltDB, err := bolt.Open(
datafile,
params.BeaconIoConfig().ReadWritePermissions,
&bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: config.InitialMMapSize,
},
)
if err != nil {
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
}
return nil, err
}
boltDB.AllocSize = boltAllocSize
blockCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1000, // number of keys to track frequency of (1000).
MaxCost: BlockCacheSize, // maximum cost of cache (1000 Blocks).
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
return nil, err
}
validatorCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: NumOfVotes, // number of keys to track frequency of (1M).
MaxCost: VotesCacheSize, // maximum cost of cache (8MB).
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
return nil, err
}
kv := &Store{
db: boltDB,
databasePath: dirPath,
blockCache: blockCache,
validatorIndexCache: validatorCache,
stateSummaryCache: newStateSummaryCache(),
ctx: ctx,
}
if err := kv.db.Update(func(tx *bolt.Tx) error {
return createBuckets(
tx,
attestationsBucket,
blocksBucket,
stateBucket,
proposerSlashingsBucket,
attesterSlashingsBucket,
voluntaryExitsBucket,
chainMetadataBucket,
checkpointBucket,
powchainBucket,
stateSummaryBucket,
// Indices buckets.
attestationHeadBlockRootBucket,
attestationSourceRootIndicesBucket,
attestationSourceEpochIndicesBucket,
attestationTargetRootIndicesBucket,
attestationTargetEpochIndicesBucket,
blockSlotIndicesBucket,
stateSlotIndicesBucket,
blockParentRootIndicesBucket,
finalizedBlockRootsIndexBucket,
// State management service bucket.
newStateServiceCompatibleBucket,
// Migrations
migrationsBucket,
)
}); err != nil {
return nil, err
}
err = prometheus.Register(createBoltCollector(kv.db))
return kv, err
}
// ClearDB removes the previously stored database in the data directory.
func (s *Store) ClearDB() error {
if _, err := os.Stat(s.databasePath); os.IsNotExist(err) {
return nil
}
prometheus.Unregister(createBoltCollector(s.db))
if err := os.Remove(path.Join(s.databasePath, DatabaseFileName)); err != nil {
return errors.Wrap(err, "could not remove database file")
}
return nil
}
// Close closes the underlying BoltDB database.
func (s *Store) Close() error {
prometheus.Unregister(createBoltCollector(s.db))
// Before DB closes, we should dump the cached state summary objects to DB.
if err := s.saveCachedStateSummariesDB(s.ctx); err != nil {
return err
}
return s.db.Close()
}
// DatabasePath at which this database writes files.
func (s *Store) DatabasePath() string {
return s.databasePath
}
func createBuckets(tx *bolt.Tx, buckets ...[]byte) error {
for _, bucket := range buckets {
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return err
}
}
return nil
}
// createBoltCollector returns a prometheus collector specifically configured for boltdb.
func createBoltCollector(db *bolt.DB) prometheus.Collector {
return prombolt.New("boltDB", db, blockedBuckets...)
}