-
Notifications
You must be signed in to change notification settings - Fork 178
/
participant.go
197 lines (171 loc) · 7.57 KB
/
participant.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package consensus
import (
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/blockproducer"
"github.com/onflow/flow-go/consensus/hotstuff/eventhandler"
"github.com/onflow/flow-go/consensus/hotstuff/eventloop"
"github.com/onflow/flow-go/consensus/hotstuff/forks"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/consensus/hotstuff/pacemaker"
"github.com/onflow/flow-go/consensus/hotstuff/pacemaker/timeout"
"github.com/onflow/flow-go/consensus/hotstuff/safetyrules"
"github.com/onflow/flow-go/consensus/hotstuff/signature"
validatorImpl "github.com/onflow/flow-go/consensus/hotstuff/validator"
"github.com/onflow/flow-go/consensus/hotstuff/verification"
"github.com/onflow/flow-go/consensus/recovery"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/storage"
)
// NewParticipant initialize the EventLoop instance with needed dependencies
func NewParticipant(
log zerolog.Logger,
metrics module.HotstuffMetrics,
mempoolMetrics module.MempoolMetrics,
builder module.Builder,
finalized *flow.Header,
pending []*flow.Header,
modules *HotstuffModules,
options ...Option,
) (*eventloop.EventLoop, error) {
// initialize the default configuration and apply the configuration options
cfg := DefaultParticipantConfig()
for _, option := range options {
option(&cfg)
}
// prune vote aggregator to initial view
modules.VoteAggregator.PruneUpToView(finalized.View)
modules.TimeoutAggregator.PruneUpToView(finalized.View)
// recover HotStuff state from all pending blocks
qcCollector := recovery.NewCollector[*flow.QuorumCertificate]()
tcCollector := recovery.NewCollector[*flow.TimeoutCertificate]()
err := recovery.Recover(log, pending,
recovery.ForksState(modules.Forks), // add pending blocks to Forks
recovery.VoteAggregatorState(modules.VoteAggregator), // accept votes for all pending blocks
recovery.CollectParentQCs(qcCollector), // collect QCs from all pending block to initialize PaceMaker (below)
recovery.CollectTCs(tcCollector), // collect TCs from all pending block to initialize PaceMaker (below)
)
if err != nil {
return nil, fmt.Errorf("failed to scan tree of pending blocks: %w", err)
}
// initialize dynamically updatable timeout config
timeoutConfig, err := timeout.NewConfig(cfg.TimeoutMinimum, cfg.TimeoutMaximum, cfg.TimeoutAdjustmentFactor, cfg.HappyPathMaxRoundFailures, cfg.MaxTimeoutObjectRebroadcastInterval)
if err != nil {
return nil, fmt.Errorf("could not initialize timeout config: %w", err)
}
// initialize the pacemaker
controller := timeout.NewController(timeoutConfig)
pacemaker, err := pacemaker.New(controller, cfg.ProposalDurationProvider, modules.Notifier, modules.Persist,
pacemaker.WithQCs(qcCollector.Retrieve()...),
pacemaker.WithTCs(tcCollector.Retrieve()...),
)
if err != nil {
return nil, fmt.Errorf("could not initialize flow pacemaker: %w", err)
}
// initialize block producer
producer, err := blockproducer.New(modules.Signer, modules.Committee, builder)
if err != nil {
return nil, fmt.Errorf("could not initialize block producer: %w", err)
}
// initialize the safetyRules
safetyRules, err := safetyrules.New(modules.Signer, modules.Persist, modules.Committee)
if err != nil {
return nil, fmt.Errorf("could not initialize safety rules: %w", err)
}
// initialize the event handler
eventHandler, err := eventhandler.NewEventHandler(
log,
pacemaker,
producer,
modules.Forks,
modules.Persist,
modules.Committee,
safetyRules,
modules.Notifier,
)
if err != nil {
return nil, fmt.Errorf("could not initialize event handler: %w", err)
}
// initialize and return the event loop
loop, err := eventloop.NewEventLoop(log, metrics, mempoolMetrics, eventHandler, cfg.StartupTime)
if err != nil {
return nil, fmt.Errorf("could not initialize event loop: %w", err)
}
// add observer, event loop needs to receive events from distributor
modules.VoteCollectorDistributor.AddVoteCollectorConsumer(loop)
modules.TimeoutCollectorDistributor.AddTimeoutCollectorConsumer(loop)
return loop, nil
}
// NewValidator creates new instance of hotstuff validator needed for votes & proposal validation
func NewValidator(metrics module.HotstuffMetrics, committee hotstuff.DynamicCommittee) hotstuff.Validator {
packer := signature.NewConsensusSigDataPacker(committee)
verifier := verification.NewCombinedVerifier(committee, packer)
// initialize the Validator
validator := validatorImpl.New(committee, verifier)
return validatorImpl.NewMetricsWrapper(validator, metrics) // wrapper for measuring time spent in Validator component
}
// NewForks recovers trusted root and creates new forks manager
func NewForks(final *flow.Header, headers storage.Headers, updater module.Finalizer, notifier hotstuff.FollowerConsumer, rootHeader *flow.Header, rootQC *flow.QuorumCertificate) (*forks.Forks, error) {
// recover the trusted root
trustedRoot, err := recoverTrustedRoot(final, headers, rootHeader, rootQC)
if err != nil {
return nil, fmt.Errorf("could not recover trusted root: %w", err)
}
// initialize the forks
forks, err := forks.New(trustedRoot, updater, notifier)
if err != nil {
return nil, fmt.Errorf("could not initialize forks: %w", err)
}
return forks, nil
}
// recoverTrustedRoot based on our local state returns root block and QC that can be used to initialize base state
func recoverTrustedRoot(final *flow.Header, headers storage.Headers, rootHeader *flow.Header, rootQC *flow.QuorumCertificate) (*model.CertifiedBlock, error) {
if final.View < rootHeader.View {
return nil, fmt.Errorf("finalized Block has older view than trusted root")
}
// if finalized view is genesis block, then use genesis block as the trustedRoot
if final.View == rootHeader.View {
if final.ID() != rootHeader.ID() {
return nil, fmt.Errorf("finalized Block conflicts with trusted root")
}
certifiedRoot, err := makeCertifiedRootBlock(rootHeader, rootQC)
if err != nil {
return nil, fmt.Errorf("constructing certified root block failed: %w", err)
}
return &certifiedRoot, nil
}
// find a valid child of the finalized block in order to get its QC
children, err := headers.ByParentID(final.ID())
if err != nil {
// a finalized block must have a valid child, if err happens, we exit
return nil, fmt.Errorf("could not get children for finalized block (ID: %v, view: %v): %w", final.ID(), final.View, err)
}
if len(children) == 0 {
return nil, fmt.Errorf("finalized block has no children")
}
child := model.BlockFromFlow(children[0])
// create the root block to use
trustedRoot, err := model.NewCertifiedBlock(model.BlockFromFlow(final), child.QC)
if err != nil {
return nil, fmt.Errorf("constructing certified root block failed: %w", err)
}
return &trustedRoot, nil
}
func makeCertifiedRootBlock(header *flow.Header, qc *flow.QuorumCertificate) (model.CertifiedBlock, error) {
// By convention of Forks, the trusted root block does not need to have a qc
// (as is the case for the genesis block). For simplify of the implementation, we always omit
// the QC of the root block. Thereby, we have one algorithm which handles all cases,
// instead of having to distinguish between a genesis block without a qc
// and a later-finalized root block where we can retrieve the qc.
rootBlock := &model.Block{
View: header.View,
BlockID: header.ID(),
ProposerID: header.ProposerID,
QC: nil, // QC is omitted
PayloadHash: header.PayloadHash,
Timestamp: header.Timestamp,
}
return model.NewCertifiedBlock(rootBlock, qc)
}