-
Notifications
You must be signed in to change notification settings - Fork 172
/
register_store.go
294 lines (250 loc) · 10.5 KB
/
register_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
package storehouse
import (
"errors"
"fmt"
"go.uber.org/atomic"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/engine/execution"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)
type RegisterStore struct {
memStore *InMemoryRegisterStore
diskStore execution.OnDiskRegisterStore
wal execution.ExecutedFinalizedWAL
finalized execution.FinalizedReader
log zerolog.Logger
finalizing *atomic.Bool // making sure only one goroutine is finalizing at a time
notifier execution.RegisterStoreNotifier
}
var _ execution.RegisterStore = (*RegisterStore)(nil)
type NoopNotifier struct{}
func NewNoopNotifier() *NoopNotifier { return &NoopNotifier{} }
func (n *NoopNotifier) OnFinalizedAndExecutedHeightUpdated(height uint64) {}
var _ execution.RegisterStoreNotifier = (*NoopNotifier)(nil)
func NewRegisterStore(
diskStore execution.OnDiskRegisterStore,
wal execution.ExecutedFinalizedWAL,
finalized execution.FinalizedReader,
log zerolog.Logger,
notifier execution.RegisterStoreNotifier,
) (*RegisterStore, error) {
if notifier == nil {
return nil, fmt.Errorf("notifier is empty, use NoopNotifier if you don't need it")
}
// replay the executed and finalized blocks from the write ahead logs
// to the OnDiskRegisterStore
height, err := syncDiskStore(wal, diskStore, log)
if err != nil {
return nil, fmt.Errorf("cannot sync disk store: %w", err)
}
// fetch the last executed and finalized block ID
finalizedID, err := finalized.FinalizedBlockIDAtHeight(height)
if err != nil {
return nil, fmt.Errorf("cannot get finalized block ID at height %d: %w", height, err)
}
// init the memStore with the last executed and finalized block ID
memStore := NewInMemoryRegisterStore(height, finalizedID)
log.Info().Msgf("initialized in memory register store at block %v, height %v", finalizedID, height)
return &RegisterStore{
memStore: memStore,
diskStore: diskStore,
wal: wal,
finalized: finalized,
finalizing: atomic.NewBool(false),
log: log.With().Str("module", "register-store").Logger(),
notifier: notifier,
}, nil
}
// GetRegister first try to get the register from InMemoryRegisterStore, then OnDiskRegisterStore
// 1. below pruned height, and is conflicting
// 2. below pruned height, and is finalized
// 3. above pruned height, and is not executed
// 4. above pruned height, and is executed, and register is updated
// 5. above pruned height, and is executed, but register is not updated since pruned height
// It returns:
// - (value, nil) if the register value is found at the given block
// - (nil, nil) if the register is not found
// - (nil, storage.ErrHeightNotIndexed) if the height is below the first height that is indexed.
// - (nil, storehouse.ErrNotExecuted) if the block is not executed yet
// - (nil, storehouse.ErrNotExecuted) if the block is conflicting iwth finalized block
// - (nil, err) for any other exceptions
func (r *RegisterStore) GetRegister(height uint64, blockID flow.Identifier, register flow.RegisterID) (flow.RegisterValue, error) {
reg, err := r.memStore.GetRegister(height, blockID, register)
// the height might be lower than the lowest height in memStore,
// or the register might not be found in memStore.
if err == nil {
// this register was updated before its block is finalized
return reg, nil
}
prunedError, ok := IsPrunedError(err)
if !ok {
// this means we ran into an exception. finding a register from in-memory store should either
// getting the register value or getting a ErrPruned error.
return flow.RegisterValue{}, fmt.Errorf("cannot get register from memStore: %w", err)
}
// if in memory store returns PrunedError, and register height is above the pruned height,
// then it means the block is connected to the pruned block of in memory store, which is
// a finalized block and executed block, so we can get its value from on disk store.
if height > prunedError.PrunedHeight {
return r.getAndConvertNotFoundErr(register, prunedError.PrunedHeight)
}
// if the block is below or equal to the pruned height, then there are two cases:
// the block is a finalized block, or a conflicting block.
// In order to distinguish, we need to query the finalized block ID at that height
var finalizedID flow.Identifier
if height == prunedError.PrunedHeight {
// if the block is at the pruned height, then the finalized ID is the pruned ID from in memory store,
// this saves a DB query
finalizedID = prunedError.PrunedID
} else {
// if the block is below the pruned height, we query the finalized ID from the finalized reader
finalizedID, err = r.finalized.FinalizedBlockIDAtHeight(height)
if err != nil {
return nil, fmt.Errorf("cannot get finalized block ID at height %d: %w", height, err)
}
}
isConflictingBlock := blockID != finalizedID
if isConflictingBlock {
// conflicting blocks are considered as un-executed
return flow.RegisterValue{}, fmt.Errorf("getting registers from conflicting block %v at height %v: %w", blockID, height, ErrNotExecuted)
}
return r.getAndConvertNotFoundErr(register, height)
}
// getAndConvertNotFoundErr returns nil if the register is not found from storage
func (r *RegisterStore) getAndConvertNotFoundErr(register flow.RegisterID, height uint64) (flow.RegisterValue, error) {
val, err := r.diskStore.Get(register, height)
if errors.Is(err, storage.ErrNotFound) {
// FVM expects the error to be nil when register is not found
return nil, nil
}
return val, err
}
// SaveRegisters saves to InMemoryRegisterStore first, then trigger the same check as OnBlockFinalized
// Depend on InMemoryRegisterStore.SaveRegisters
// It returns:
// - nil if the registers are saved successfully
// - exception is the block is above the pruned height but does not connect to the pruned height (conflicting block).
// - exception if the block is below the pruned height
// - exception if the save block is saved again
// - exception for any other exception
func (r *RegisterStore) SaveRegisters(header *flow.Header, registers flow.RegisterEntries) error {
err := r.memStore.SaveRegisters(header.Height, header.ID(), header.ParentID, registers)
if err != nil {
return fmt.Errorf("cannot save register to memStore: %w", err)
}
err = r.OnBlockFinalized()
if err != nil {
return fmt.Errorf("cannot trigger OnBlockFinalized: %w", err)
}
return nil
}
// Depend on FinalizedReader's FinalizedBlockIDAtHeight
// Depend on ExecutedFinalizedWAL.Append
// Depend on OnDiskRegisterStore.SaveRegisters
// OnBlockFinalized trigger the check of whether a block at the next height becomes finalized and executed.
// the next height is the existing finalized and executed block's height + 1.
// If a block at next height becomes finalized and executed, then:
// 1. write the registers to write ahead logs
// 2. save the registers of the block to OnDiskRegisterStore
// 3. prune the height in InMemoryRegisterStore
func (r *RegisterStore) OnBlockFinalized() error {
// only one goroutine can execute OnBlockFinalized at a time
if !r.finalizing.CompareAndSwap(false, true) {
return nil
}
defer r.finalizing.Store(false)
return r.onBlockFinalized()
}
func (r *RegisterStore) onBlockFinalized() error {
latest := r.diskStore.LatestHeight()
next := latest + 1
blockID, err := r.finalized.FinalizedBlockIDAtHeight(next)
if errors.Is(err, storage.ErrNotFound) {
// next block is not finalized yet
return nil
}
regs, err := r.memStore.GetUpdatedRegisters(next, blockID)
if errors.Is(err, ErrNotExecuted) {
// next block is not executed yet
return nil
}
// TODO: append WAL
// err = r.wal.Append(next, regs)
// if err != nil {
// return fmt.Errorf("cannot write %v registers to write ahead logs for height %v: %w", len(regs), next, err)
// }
err = r.diskStore.Store(regs, next)
if err != nil {
return fmt.Errorf("cannot save %v registers to disk store for height %v: %w", len(regs), next, err)
}
r.notifier.OnFinalizedAndExecutedHeightUpdated(next)
err = r.memStore.Prune(next, blockID)
if err != nil {
return fmt.Errorf("cannot prune memStore for height %v: %w", next, err)
}
return r.onBlockFinalized() // check again until there is no more finalized block
}
// LastFinalizedAndExecutedHeight returns the height of the last finalized and executed block,
// which has been saved in OnDiskRegisterStore
func (r *RegisterStore) LastFinalizedAndExecutedHeight() uint64 {
// diskStore caches the latest height in memory
return r.diskStore.LatestHeight()
}
// IsBlockExecuted returns true if the block is executed, false if not executed
// Note: it returns (true, nil) even if the block has been pruned from on disk register store,
func (r *RegisterStore) IsBlockExecuted(height uint64, blockID flow.Identifier) (bool, error) {
executed, err := r.memStore.IsBlockExecuted(height, blockID)
if err != nil {
// the only error memStore would return is when the given height is lower than the pruned height in memStore.
// Since the pruned height in memStore is a finalized and executed height, in order to know if the block
// is executed, we just need to check if this block is the finalized blcok at the given height.
executed, err = r.isBlockFinalized(height, blockID)
return executed, err
}
return executed, nil
}
func (r *RegisterStore) isBlockFinalized(height uint64, blockID flow.Identifier) (bool, error) {
finalizedID, err := r.finalized.FinalizedBlockIDAtHeight(height)
if err != nil {
return false, fmt.Errorf("cannot get finalized block ID at height %d: %w", height, err)
}
return finalizedID == blockID, nil
}
// syncDiskStore replay WAL to disk store
func syncDiskStore(
wal execution.ExecutedFinalizedWAL,
diskStore execution.OnDiskRegisterStore,
log zerolog.Logger,
) (uint64, error) {
// TODO: replace diskStore.Latest with wal.Latest
// latest, err := r.wal.Latest()
var err error
latest := diskStore.LatestHeight() // tmp
if err != nil {
return 0, fmt.Errorf("cannot get latest height from write ahead logs: %w", err)
}
stored := diskStore.LatestHeight()
if stored > latest {
return 0, fmt.Errorf("latest height in storehouse %v is larger than latest height %v in write ahead logs", stored, latest)
}
if stored < latest {
// replay
reader := wal.GetReader(stored + 1)
for {
height, registers, err := reader.Next()
// TODO: to rename
if errors.Is(err, storage.ErrNotFound) {
break
}
if err != nil {
return 0, fmt.Errorf("cannot read registers from write ahead logs: %w", err)
}
err = diskStore.Store(registers, height)
if err != nil {
return 0, fmt.Errorf("cannot save registers to disk store at height %v : %w", height, err)
}
}
}
return latest, nil
}