-
Notifications
You must be signed in to change notification settings - Fork 178
/
exec_fork_suppressor.go
409 lines (366 loc) · 15.2 KB
/
exec_fork_suppressor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
package consensus
import (
"encoding/json"
"errors"
"fmt"
"sync"
"github.com/dgraph-io/badger/v2"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.uber.org/atomic"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/mempool"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger/operation"
)
// ExecForkSuppressor is a wrapper around a conventional mempool.IncorporatedResultSeals
// mempool. It implements the following mitigation strategy for execution forks:
// - In case two conflicting results are considered sealable for the same block,
// sealing should halt. Specifically, two results are considered conflicting,
// if they differ in their start or end state.
// - Even after a restart, the sealing should not resume.
// - We rely on human intervention to resolve the conflict.
//
// The ExecForkSuppressor implements this mitigation strategy as follows:
// - For each candidate seal inserted into the mempool, indexes seal
// by respective blockID, storing all seals in the internal map `sealsForBlock`.
// - Whenever client perform any query, we check if there are conflicting seals.
// - We pick first seal available for a block and check whether
// the seal has the same state transition as other seals included for same block.
// - If conflicting state transitions for the same block are detected,
// ExecForkSuppressor sets an internal flag and thereafter
// reports the mempool as empty, which will lead to the respective
// consensus node not including any more seals.
// - Evidence for an execution fork stored in a database (persisted across restarts).
//
// Implementation is concurrency safe.
type ExecForkSuppressor struct {
mutex sync.RWMutex
seals mempool.IncorporatedResultSeals
sealsForBlock map[flow.Identifier]sealSet // map BlockID -> set of IncorporatedResultSeal
byHeight map[uint64]map[flow.Identifier]struct{} // map height -> set of executed block IDs at height
lowestHeight uint64
execForkDetected atomic.Bool
onExecFork ExecForkActor
db *badger.DB
log zerolog.Logger
}
var _ mempool.IncorporatedResultSeals = (*ExecForkSuppressor)(nil)
// sealSet is a set of seals; internally represented as a map from sealID -> to seal
type sealSet map[flow.Identifier]*flow.IncorporatedResultSeal
// sealsList is a list of seals
type sealsList []*flow.IncorporatedResultSeal
func NewExecStateForkSuppressor(seals mempool.IncorporatedResultSeals, onExecFork ExecForkActor, db *badger.DB, log zerolog.Logger) (*ExecForkSuppressor, error) {
conflictingSeals, err := checkExecutionForkEvidence(db)
if err != nil {
return nil, fmt.Errorf("failed to interface with storage: %w", err)
}
execForkDetectedFlag := len(conflictingSeals) != 0
if execForkDetectedFlag {
onExecFork(conflictingSeals)
}
wrapper := ExecForkSuppressor{
mutex: sync.RWMutex{},
seals: seals,
sealsForBlock: make(map[flow.Identifier]sealSet),
byHeight: make(map[uint64]map[flow.Identifier]struct{}),
execForkDetected: *atomic.NewBool(execForkDetectedFlag),
onExecFork: onExecFork,
db: db,
log: log.With().Str("mempool", "ExecForkSuppressor").Logger(),
}
return &wrapper, nil
}
// Add adds the given seal to the mempool. Return value indicates whether seal was added to the mempool.
// Internally indexes every added seal by blockID. Expects that underlying mempool never eject items.
// Error returns:
// - engine.InvalidInputError (sentinel error)
// In case a seal fails one of the required consistency checks;
func (s *ExecForkSuppressor) Add(newSeal *flow.IncorporatedResultSeal) (bool, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.execForkDetected.Load() {
return false, nil
}
if newSeal.Header.Height < s.lowestHeight {
return false, nil
}
// STEP 1: ensure locally that newSeal's chunks are non zero, which means
// that the new seal contains start and end state values.
// This wrapper is a temporary safety layer; we check all conditions that are
// required for its correct functioning locally, to not delegate safety-critical
// implementation aspects to external components
err := s.enforceValidChunks(newSeal)
if err != nil {
return false, fmt.Errorf("invalid candidate seal: %w", err)
}
blockID := newSeal.Seal.BlockID
// This mempool allows adding multiple seals for same blockID even if they have different state transition.
// When builder logic tries to query such seals we will check whenever we have an execution fork. The main reason for
// detecting forks at query time(not at adding time) is ability to add extra logic in underlying mempools. For instance
// we could filter seals comming from underlying mempool by some criteria.
// STEP 2: add newSeal to the wrapped mempool
added, err := s.seals.Add(newSeal) // internally de-duplicates
if err != nil {
return added, fmt.Errorf("failed to add seal to wrapped mempool: %w", err)
}
if !added { // if underlying mempool did not accept the seal => nothing to do anymore
return false, nil
}
// STEP 3: add newSeal to secondary index of this wrapper
// CAUTION: We expect that underlying mempool NEVER ejects seals because it breaks liveness.
blockSeals, found := s.sealsForBlock[blockID]
if !found {
// no other seal for this block was in mempool before => create a set for the seals for this block
blockSeals = make(sealSet)
s.sealsForBlock[blockID] = blockSeals
}
blockSeals[newSeal.ID()] = newSeal
// cache block height to prune additional index by height
blocksAtHeight, found := s.byHeight[newSeal.Header.Height]
if !found {
blocksAtHeight = make(map[flow.Identifier]struct{})
s.byHeight[newSeal.Header.Height] = blocksAtHeight
}
blocksAtHeight[blockID] = struct{}{}
return true, nil
}
// All returns all the IncorporatedResultSeals in the mempool.
// Note: This call might crash if the block of the seal has multiple seals in mempool for conflicting
// incorporated results.
func (s *ExecForkSuppressor) All() []*flow.IncorporatedResultSeal {
s.mutex.RLock()
seals := s.seals.All()
s.mutex.RUnlock()
// index seals retrieved from underlying mepool by blockID to check
// for conflicting seals
sealsByBlockID := make(map[flow.Identifier]sealsList, 0)
for _, seal := range seals {
sealsPerBlock := sealsByBlockID[seal.Seal.BlockID]
sealsByBlockID[seal.Seal.BlockID] = append(sealsPerBlock, seal)
}
// check for conflicting seals
return s.filterConflictingSeals(sealsByBlockID)
}
// ByID returns an IncorporatedResultSeal by its ID.
// The IncorporatedResultSeal's ID is the same as IncorporatedResult's ID,
// so this call essentially is to find the seal for the incorporated result in the mempool.
// Note: This call might crash if the block of the seal has multiple seals in mempool for conflicting
// incorporated results. Usually the builder will call this method to find a seal for an incorporated
// result, so the builder might crash if multiple conflicting seals exist.
func (s *ExecForkSuppressor) ByID(identifier flow.Identifier) (*flow.IncorporatedResultSeal, bool) {
s.mutex.RLock()
seal, found := s.seals.ByID(identifier)
// if we haven't found seal in underlying storage - exit early
if !found {
s.mutex.RUnlock()
return seal, found
}
sealsForBlock := s.sealsForBlock[seal.Seal.BlockID]
// if there are no other seals for this block previously seen - then no possible execution forks
if len(sealsForBlock) == 1 {
s.mutex.RUnlock()
return seal, true
}
// convert map into list
var sealsPerBlock sealsList
for _, otherSeal := range sealsForBlock {
sealsPerBlock = append(sealsPerBlock, otherSeal)
}
s.mutex.RUnlock()
// check for conflicting seals
seals := s.filterConflictingSeals(map[flow.Identifier]sealsList{seal.Seal.BlockID: sealsPerBlock})
if len(seals) == 0 {
return nil, false
}
return seals[0], true
}
// Remove removes the IncorporatedResultSeal with id from the mempool
func (s *ExecForkSuppressor) Remove(id flow.Identifier) bool {
s.mutex.Lock()
defer s.mutex.Unlock()
seal, found := s.seals.ByID(id)
if found {
s.seals.Remove(id)
set, found := s.sealsForBlock[seal.Seal.BlockID]
if !found {
// In the current implementation, this cannot happen, as every entity in the mempool is also contained in sealsForBlock.
// we nevertheless perform this sanity check here, to catch future inconsistent code modifications
s.log.Fatal().Msg("inconsistent state detected: seal not in secondary index")
}
if len(set) > 1 {
delete(set, id)
} else {
delete(s.sealsForBlock, seal.Seal.BlockID)
}
}
return found
}
// Size returns the number of items in the mempool
func (s *ExecForkSuppressor) Size() uint {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.seals.Size()
}
// Limit returns the size limit of the mempool
func (s *ExecForkSuppressor) Limit() uint {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.seals.Limit()
}
// Clear removes all entities from the pool.
// The wrapper clears the internal state as well as its local (additional) state.
func (s *ExecForkSuppressor) Clear() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.sealsForBlock = make(map[flow.Identifier]sealSet)
s.seals.Clear()
}
// PruneUpToHeight remove all seals for blocks whose height is strictly
// smaller that height. Note: seals for blocks at height are retained.
func (s *ExecForkSuppressor) PruneUpToHeight(height uint64) error {
err := s.seals.PruneUpToHeight(height)
if err != nil {
return err
}
s.mutex.Lock()
defer s.mutex.Unlock()
if len(s.sealsForBlock) == 0 {
s.lowestHeight = height
return nil
}
// Optimization: if there are less height in the index than the height range to prune,
// range to prune, then just go through each seal.
// Otherwise, go through each height to prune.
if uint64(len(s.byHeight)) < height-s.lowestHeight {
for h := range s.byHeight {
if h < height {
s.removeByHeight(h)
}
}
} else {
for h := s.lowestHeight; h < height; h++ {
s.removeByHeight(h)
}
}
return nil
}
func (s *ExecForkSuppressor) removeByHeight(height uint64) {
for blockID := range s.byHeight[height] {
delete(s.sealsForBlock, blockID)
}
delete(s.byHeight, height)
}
// enforceValidChunks checks that seal has valid non-zero number of chunks.
// In case a seal fails the check, a detailed error message is logged and an
// engine.InvalidInputError (sentinel error) is returned.
func (s *ExecForkSuppressor) enforceValidChunks(irSeal *flow.IncorporatedResultSeal) error {
result := irSeal.IncorporatedResult.Result
if !result.ValidateChunksLength() {
scjson, errjson := json.Marshal(irSeal)
if errjson != nil {
return errjson
}
s.log.Error().
Str("seal", string(scjson)).
Msg("seal's execution result has no chunks")
return engine.NewInvalidInputErrorf("seal's execution result has no chunks: %x", result.ID())
}
return nil
}
// enforceConsistentStateTransitions checks whether the execution results in the seals
// have matching state transitions. If a fork in the execution state is detected:
// - wrapped mempool is cleared
// - internal execForkDetected flag is ste to true
// - the new value of execForkDetected is persisted to data base
//
// and executionForkErr (sentinel error) is returned
// The function assumes the execution results in the seals have a non-zero number of chunks.
func hasConsistentStateTransitions(irSeal, irSeal2 *flow.IncorporatedResultSeal) bool {
if irSeal.IncorporatedResult.Result.ID() == irSeal2.IncorporatedResult.Result.ID() {
// happy case: candidate seals are for the same result
return true
}
// the results for the seals have different IDs (!)
// => check whether initial and final state match in both seals
// unsafe: we assume validity of chunks has been checked before
irSeal1InitialState, _ := irSeal.IncorporatedResult.Result.InitialStateCommit()
irSeal1FinalState, _ := irSeal.IncorporatedResult.Result.FinalStateCommitment()
irSeal2InitialState, _ := irSeal2.IncorporatedResult.Result.InitialStateCommit()
irSeal2FinalState, _ := irSeal2.IncorporatedResult.Result.FinalStateCommitment()
if irSeal1InitialState != irSeal2InitialState || irSeal1FinalState != irSeal2FinalState {
log.Error().Msg("inconsistent seals for the same block")
return false
}
log.Warn().Msg("seals with different ID but consistent state transition")
return true
}
// checkExecutionForkDetected checks the database whether evidence
// about an execution fork is stored. Returns the stored evidence.
func checkExecutionForkEvidence(db *badger.DB) ([]*flow.IncorporatedResultSeal, error) {
var conflictingSeals []*flow.IncorporatedResultSeal
err := db.View(func(tx *badger.Txn) error {
err := operation.RetrieveExecutionForkEvidence(&conflictingSeals)(tx)
if errors.Is(err, storage.ErrNotFound) {
return nil // no evidence in data base; conflictingSeals is still nil slice
}
if err != nil {
return fmt.Errorf("failed to load evidence whether or not an execution fork occured: %w", err)
}
return nil
})
return conflictingSeals, err
}
// storeExecutionForkEvidence stores the provided seals in the database
// as evidence for an execution fork.
func storeExecutionForkEvidence(conflictingSeals []*flow.IncorporatedResultSeal, db *badger.DB) error {
err := operation.RetryOnConflict(db.Update, func(tx *badger.Txn) error {
err := operation.InsertExecutionForkEvidence(conflictingSeals)(tx)
if errors.Is(err, storage.ErrAlreadyExists) {
// some evidence about execution fork already stored;
// we only keep the first evidence => noting more to do
return nil
}
if err != nil {
return fmt.Errorf("failed to store evidence about execution fork: %w", err)
}
return nil
})
return err
}
// filterConflictingSeals performs filtering of provided seals by checking if there are conflicting seals for same block.
// For every block we check if first seal has same state transitions as others. Multiple seals for same block are allowed
// but their state transitions should be the same. Upon detecting seal with inconsistent state transition we will clear our mempool,
// stop accepting new seals and querying old seals and store execution fork evidence into DB. Creator of mempool will be notified
// by callback.
func (s *ExecForkSuppressor) filterConflictingSeals(sealsByBlockID map[flow.Identifier]sealsList) sealsList {
var result sealsList
for _, sealsInBlock := range sealsByBlockID {
if len(sealsInBlock) > 1 {
// enforce that newSeal's state transition does not conflict with other stored seals for the same block
// already other seal for this block in mempool => compare consistency of results' state transitions
var conflictingSeals sealsList
candidateSeal := sealsInBlock[0]
for _, otherSeal := range sealsInBlock[1:] {
if !hasConsistentStateTransitions(candidateSeal, otherSeal) {
conflictingSeals = append(conflictingSeals, otherSeal)
}
}
// check if inconsistent state transition detected
if len(conflictingSeals) > 0 {
s.execForkDetected.Store(true)
s.Clear()
conflictingSeals = append(sealsList{candidateSeal}, conflictingSeals...)
err := storeExecutionForkEvidence(conflictingSeals, s.db)
if err != nil {
panic("failed to store execution fork evidence")
}
s.onExecFork(conflictingSeals)
return nil
}
}
result = append(result, sealsInBlock...)
}
return result
}