-
Notifications
You must be signed in to change notification settings - Fork 177
/
reactor_engine.go
449 lines (398 loc) · 16.5 KB
/
reactor_engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
package dkg
import (
"crypto/rand"
"errors"
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
dkgmodule "github.com/onflow/flow-go/module/dkg"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/events"
"github.com/onflow/flow-go/storage"
)
// DefaultPollStep specifies the default number of views that separate two calls
// to the DKG smart-contract to read broadcast messages.
const DefaultPollStep = 10
// dkgInfo consolidates information about the current DKG protocol instance.
type dkgInfo struct {
identities flow.IdentityList
phase1FinalView uint64
phase2FinalView uint64
phase3FinalView uint64
// seed must be generated for each DKG instance, using a randomness source that is independent from all other nodes.
seed []byte
}
// ReactorEngine is an engine that reacts to chain events to start new DKG runs,
// and manage subsequent phase transitions. Any unexpected error triggers a
// panic as it would undermine the security of the protocol.
// TODO replace engine.Unit with component.Component
type ReactorEngine struct {
events.Noop
unit *engine.Unit
log zerolog.Logger
me module.Local
State protocol.State
dkgState storage.DKGState
controller module.DKGController
controllerFactory module.DKGControllerFactory
viewEvents events.Views
pollStep uint64
}
// NewReactorEngine return a new ReactorEngine.
func NewReactorEngine(
log zerolog.Logger,
me module.Local,
state protocol.State,
dkgState storage.DKGState,
controllerFactory module.DKGControllerFactory,
viewEvents events.Views,
) *ReactorEngine {
logger := log.With().
Str("engine", "dkg_reactor").
Logger()
return &ReactorEngine{
unit: engine.NewUnit(),
log: logger,
me: me,
State: state,
dkgState: dkgState,
controllerFactory: controllerFactory,
viewEvents: viewEvents,
pollStep: DefaultPollStep,
}
}
// Ready implements the module ReadyDoneAware interface. It returns a channel
// that will close when the engine has successfully started.
func (e *ReactorEngine) Ready() <-chan struct{} {
return e.unit.Ready(func() {
// If we are starting up in the EpochSetup phase, try to start the DKG.
// If the DKG for this epoch has been started previously, we will exit
// and fail this epoch's DKG.
snap := e.State.Final()
phase, err := snap.Phase()
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
e.log.Fatal().Err(err).Msg("failed to check epoch phase when starting DKG reactor engine")
return
}
currentCounter, err := snap.Epochs().Current().Counter()
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
e.log.Fatal().Err(err).Msg("failed to retrieve current epoch counter when starting DKG reactor engine")
return
}
first, err := snap.Head()
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
e.log.Fatal().Err(err).Msg("failed to retrieve finalized header when starting DKG reactor engine")
return
}
// If we start up in EpochSetup phase, attempt to start the DKG in case it wasn't started previously
if phase == flow.EpochPhaseSetup {
e.startDKGForEpoch(currentCounter, first)
} else if phase == flow.EpochPhaseCommitted {
// If we start up in EpochCommitted phase, ensure the DKG end state is set correctly.
e.handleEpochCommittedPhaseStarted(currentCounter, first)
}
})
}
// Done implements the module ReadyDoneAware interface. It returns a channel
// that will close when the engine has successfully stopped.
func (e *ReactorEngine) Done() <-chan struct{} {
return e.unit.Done()
}
// EpochSetupPhaseStarted handles the EpochSetupPhaseStarted protocol event by
// starting the DKG process.
// NOTE: ReactorEngine will not recover from mid-DKG crashes, therefore we do not need to handle dropped protocol events here.
func (e *ReactorEngine) EpochSetupPhaseStarted(currentEpochCounter uint64, first *flow.Header) {
e.startDKGForEpoch(currentEpochCounter, first)
}
// EpochCommittedPhaseStarted handles the EpochCommittedPhaseStarted protocol
// event by checking the consistency of our locally computed key share.
// NOTE: ReactorEngine will not recover from mid-DKG crashes, therefore we do not need to handle dropped protocol events here.
func (e *ReactorEngine) EpochCommittedPhaseStarted(currentEpochCounter uint64, first *flow.Header) {
e.handleEpochCommittedPhaseStarted(currentEpochCounter, first)
}
// startDKGForEpoch attempts to start the DKG instance for the given epoch,
// only if we have never started the DKG during setup phase for the given epoch.
// This allows consensus nodes which boot from a state snapshot within the
// EpochSetup phase to run the DKG.
//
// It starts a new controller for the epoch and registers the triggers to regularly
// query the DKG smart-contract and transition between phases at the specified views.
func (e *ReactorEngine) startDKGForEpoch(currentEpochCounter uint64, first *flow.Header) {
firstID := first.ID()
nextEpochCounter := currentEpochCounter + 1
log := e.log.With().
Uint64("cur_epoch", currentEpochCounter). // the epoch we are in the middle of
Uint64("next_epoch", nextEpochCounter). // the epoch we are running the DKG for
Uint64("first_block_view", first.View). // view of first block in EpochSetup phase
Hex("first_block_id", firstID[:]). // id of first block in EpochSetup phase
Logger()
// if we have started the dkg for this epoch already, exit
started, err := e.dkgState.GetDKGStarted(nextEpochCounter)
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("could not check whether DKG is started")
}
if started {
log.Warn().Msg("DKG started before, skipping starting the DKG for this epoch")
return
}
// flag that we are starting the dkg for this epoch
err = e.dkgState.SetDKGStarted(nextEpochCounter)
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("could not set dkg started")
}
curDKGInfo, err := e.getDKGInfo(firstID)
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("could not retrieve epoch info")
}
committee := curDKGInfo.identities.Filter(filter.IsVotingConsensusCommitteeMember)
log.Info().
Uint64("phase1", curDKGInfo.phase1FinalView).
Uint64("phase2", curDKGInfo.phase2FinalView).
Uint64("phase3", curDKGInfo.phase3FinalView).
Interface("members", committee.NodeIDs()).
Msg("epoch info")
if _, ok := committee.GetIndex(e.me.NodeID()); !ok {
// node not found in DKG committee bypass starting the DKG
log.Warn().Str("node_id", e.me.NodeID().String()).Msg("failed to find our node ID in the DKG committee skip starting DKG engine, this node will not participate in consensus after the next epoch starts")
return
}
controller, err := e.controllerFactory.Create(
dkgmodule.CanonicalInstanceID(first.ChainID, nextEpochCounter),
committee,
curDKGInfo.seed,
)
if err != nil {
// no expected errors in controller factory
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("could not create DKG controller")
}
e.controller = controller
e.unit.Launch(func() {
log.Info().Msg("DKG Run")
err := e.controller.Run()
if err != nil {
// TODO handle crypto sentinels and do not crash here
log.Fatal().Err(err).Msg("DKG Run error")
}
})
// NOTE:
// We register two callbacks for views that mark a state transition: one for
// polling broadcast messages, and one for triggering the phase transition.
// It is essential that all polled broadcast messages are processed before
// starting the phase transition. Here we register the polling callback
// before the phase transition, which guarantees that it will be called
// before because callbacks for the same views are executed on a FIFO basis.
// Moreover, the poll callback does not return until all received messages
// are processed by the underlying DKG controller (as guaranteed by the
// specifications and implementations of the DKGBroker and DKGController
// interfaces).
for view := curDKGInfo.phase1FinalView; view > first.View; view -= e.pollStep {
e.registerPoll(view)
}
e.registerPhaseTransition(curDKGInfo.phase1FinalView, dkgmodule.Phase1, e.controller.EndPhase1)
for view := curDKGInfo.phase2FinalView; view > curDKGInfo.phase1FinalView; view -= e.pollStep {
e.registerPoll(view)
}
e.registerPhaseTransition(curDKGInfo.phase2FinalView, dkgmodule.Phase2, e.controller.EndPhase2)
for view := curDKGInfo.phase3FinalView; view > curDKGInfo.phase2FinalView; view -= e.pollStep {
e.registerPoll(view)
}
e.registerPhaseTransition(curDKGInfo.phase3FinalView, dkgmodule.Phase3, e.end(nextEpochCounter))
}
// handleEpochCommittedPhaseStarted is invoked upon the transition to the EpochCommitted
// phase, when the canonical beacon key vector is incorporated into the protocol state.
//
// This function checks that the local DKG completed and that our locally computed
// key share is consistent with the canonical key vector. When this function returns,
// an end state for the just-completed DKG is guaranteed to be stored (if not, the
// program will crash). Since this function is invoked synchronously before the end
// of the current epoch, this guarantees that when we reach the end of the current epoch
// we will either have a usable beacon key (successful DKG) or a DKG failure end state
// stored, so we can safely fall back to using our staking key.
//
// CAUTION: This function is not safe for concurrent use. This is not enforced within
// the ReactorEngine - instead we rely on the protocol event emission being single-threaded
func (e *ReactorEngine) handleEpochCommittedPhaseStarted(currentEpochCounter uint64, firstBlock *flow.Header) {
// the DKG we have just completed produces keys that we will use in the next epoch
nextEpochCounter := currentEpochCounter + 1
log := e.log.With().
Uint64("cur_epoch", currentEpochCounter). // the epoch we are in the middle of
Uint64("next_epoch", nextEpochCounter). // the epoch the just-finished DKG was preparing for
Logger()
// Check whether we have already set the end state for this DKG.
// This can happen if the DKG failed locally, if we failed to generate
// a local private beacon key, or if we crashed while performing this
// check previously.
endState, err := e.dkgState.GetDKGEndState(nextEpochCounter)
if err == nil {
log.Warn().Msgf("checking beacon key consistency: exiting because dkg end state was already set: %s", endState.String())
return
}
// Since epoch phase transitions are emitted when the first block of the new
// phase is finalized, the block's snapshot is guaranteed to already be
// accessible in the protocol state at this point (even though the Badger
// transaction finalizing the block has not been committed yet).
nextDKG, err := e.State.AtBlockID(firstBlock.ID()).Epochs().Next().DKG()
if err != nil {
// CAUTION: this should never happen, indicates a storage failure or corruption
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("checking beacon key consistency: could not retrieve next DKG info")
return
}
myBeaconPrivKey, err := e.dkgState.RetrieveMyBeaconPrivateKey(nextEpochCounter)
if errors.Is(err, storage.ErrNotFound) {
log.Warn().Msg("checking beacon key consistency: no key found")
err := e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateNoKey)
if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("failed to set dkg end state")
}
return
} else if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("checking beacon key consistency: could not retrieve beacon private key for next epoch")
return
}
nextDKGPubKey, err := nextDKG.KeyShare(e.me.NodeID())
if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("checking beacon key consistency: could not retrieve my beacon public key for next epoch")
return
}
localPubKey := myBeaconPrivKey.PublicKey()
// we computed a local beacon key but it is inconsistent with our canonical
// public key - therefore it is unsafe for use
if !nextDKGPubKey.Equals(localPubKey) {
log.Warn().
Str("computed_beacon_pub_key", localPubKey.String()).
Str("canonical_beacon_pub_key", nextDKGPubKey.String()).
Msg("checking beacon key consistency: locally computed beacon public key does not match beacon public key for next epoch")
err := e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateInconsistentKey)
if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("failed to set dkg end state")
}
return
}
err = e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateSuccess)
if err != nil {
// TODO use irrecoverable context
e.log.Fatal().Err(err).Msg("failed to set dkg end state")
}
log.Info().Msgf("successfully ended DKG, my beacon pub key for epoch %d is %s", nextEpochCounter, localPubKey)
}
// TODO document error returns
func (e *ReactorEngine) getDKGInfo(firstBlockID flow.Identifier) (*dkgInfo, error) {
currEpoch := e.State.AtBlockID(firstBlockID).Epochs().Current()
nextEpoch := e.State.AtBlockID(firstBlockID).Epochs().Next()
identities, err := nextEpoch.InitialIdentities()
if err != nil {
return nil, fmt.Errorf("could not retrieve epoch identities: %w", err)
}
phase1Final, phase2Final, phase3Final, err := protocol.DKGPhaseViews(currEpoch)
if err != nil {
return nil, fmt.Errorf("could not retrieve epoch dkg final views: %w", err)
}
seed := make([]byte, crypto.KeyGenSeedMinLen)
_, err = rand.Read(seed)
if err != nil {
return nil, fmt.Errorf("could not generate random seed: %w", err)
}
info := &dkgInfo{
identities: identities,
phase1FinalView: phase1Final,
phase2FinalView: phase2Final,
phase3FinalView: phase3Final,
seed: seed,
}
return info, nil
}
// registerPoll instructs the engine to query the DKG smart-contract for new
// broadcast messages at the specified view.
func (e *ReactorEngine) registerPoll(view uint64) {
e.viewEvents.OnView(view, func(header *flow.Header) {
e.unit.Launch(func() {
e.unit.Lock()
defer e.unit.Unlock()
blockID := header.ID()
log := e.log.With().
Uint64("view", view).
Uint64("height", header.Height).
Hex("block_id", blockID[:]).
Logger()
log.Info().Msg("polling DKG smart-contract...")
err := e.controller.Poll(header.ID())
if err != nil {
log.Err(err).Msg("failed to poll DKG smart-contract")
}
})
})
}
// registerPhaseTransition instructs the engine to change phases at the
// specified view.
func (e *ReactorEngine) registerPhaseTransition(view uint64, fromState dkgmodule.State, phaseTransition func() error) {
e.viewEvents.OnView(view, func(header *flow.Header) {
e.unit.Launch(func() {
e.unit.Lock()
defer e.unit.Unlock()
blockID := header.ID()
log := e.log.With().
Uint64("view", view).
Hex("block_id", blockID[:]).
Logger()
log.Info().Msgf("ending %s...", fromState)
err := phaseTransition()
if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msgf("node failed to end %s", fromState)
}
log.Info().Msgf("ended %s successfully", fromState)
})
})
}
// end returns a callback that is used to end the DKG protocol, save the
// resulting private key to storage, and publish the other results to the DKG
// smart-contract.
func (e *ReactorEngine) end(nextEpochCounter uint64) func() error {
return func() error {
err := e.controller.End()
if crypto.IsDKGFailureError(err) {
e.log.Warn().Err(err).Msgf("node %s with index %d failed DKG locally", e.me.NodeID(), e.controller.GetIndex())
err := e.dkgState.SetDKGEndState(nextEpochCounter, flow.DKGEndStateDKGFailure)
if err != nil {
return fmt.Errorf("failed to set dkg end state following dkg end error: %w", err)
}
} else if err != nil {
return fmt.Errorf("unknown error ending the dkg: %w", err)
}
privateShare, _, _ := e.controller.GetArtifacts()
if privateShare != nil {
// we only store our key if one was computed
err = e.dkgState.InsertMyBeaconPrivateKey(nextEpochCounter, privateShare)
if err != nil {
return fmt.Errorf("could not save beacon private key in db: %w", err)
}
}
err = e.controller.SubmitResult()
if err != nil {
return fmt.Errorf("couldn't publish DKG results: %w", err)
}
return nil
}
}