-
Notifications
You must be signed in to change notification settings - Fork 179
/
event_loop.go
205 lines (155 loc) · 6.32 KB
/
event_loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package hotstuff
import (
"time"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
)
// EventLoop buffers all incoming events to the hotstuff EventHandler, and feeds EventHandler one event at a time.
type EventLoop struct {
log zerolog.Logger
eventHandler EventHandler
metrics module.HotstuffMetrics
proposals chan *model.Proposal
votes chan *model.Vote
unit *engine.Unit // lock for preventing concurrent state transitions
}
// NewEventLoop creates an instance of EventLoop.
func NewEventLoop(log zerolog.Logger, metrics module.HotstuffMetrics, eventHandler EventHandler) (*EventLoop, error) {
proposals := make(chan *model.Proposal)
votes := make(chan *model.Vote)
el := &EventLoop{
log: log,
eventHandler: eventHandler,
metrics: metrics,
proposals: proposals,
votes: votes,
unit: engine.NewUnit(),
}
return el, nil
}
func (el *EventLoop) loop() {
err := el.eventHandler.Start()
if err != nil {
el.log.Fatal().Err(err).Msg("could not start event handler")
}
// hotstuff will run in an event loop to process all events synchronously. And this is what will happen when hitting errors:
// if hotstuff hits a known critical error, it will exit the loop (for instance, there is a conflicting block with a QC against finalized blocks
// if hotstuff hits a known error indicating some assumption between components is broken, it will exit the loop (for instance, hotstuff receives a block whose parent is missing)
// if hotstuff hits a known error that is safe to be ignored, it will not exit the loop (for instance, double voting/invalid vote)
// if hotstuff hits any unknown error, it will exit the loop
for {
quitted := el.unit.Quit()
// Giving timeout events the priority to be processed first
// This is to prevent attacks from malicious nodes that attempt
// to block honest nodes' pacemaker from progressing by sending
// other events.
timeoutChannel := el.eventHandler.TimeoutChannel()
// the first select makes sure we process timeouts with priority
select {
// if we receive the shutdown signal, exit the loop
case <-quitted:
return
// if we receive a time out, process it and log errors
case <-timeoutChannel:
processStart := time.Now()
err := el.eventHandler.OnLocalTimeout()
// measure how long it takes for a timeout event to be processed
el.metrics.HotStuffBusyDuration(time.Since(processStart), metrics.HotstuffEventTypeTimeout)
if err != nil {
el.log.Fatal().Err(err).Msg("could not process timeout")
}
// At this point, we have received and processed an event from the timeout channel.
// A timeout also means, we have made progress. A new timeout will have
// been started and el.eventHandler.TimeoutChannel() will be a NEW channel (for the just-started timeout)
// Very important to start the for loop from the beginning, to continue the with the new timeout channel!
continue
default:
// fall through to non-priority events
}
idleStart := time.Now()
// select for block headers/votes here
select {
// same as before
case <-quitted:
return
// same as before
case <-timeoutChannel:
// measure how long the event loop was idle waiting for an
// incoming event
el.metrics.HotStuffIdleDuration(time.Since(idleStart))
processStart := time.Now()
err := el.eventHandler.OnLocalTimeout()
// measure how long it takes for a timeout event to be processed
el.metrics.HotStuffBusyDuration(time.Since(processStart), metrics.HotstuffEventTypeTimeout)
if err != nil {
el.log.Fatal().Err(err).Msg("could not process timeout")
}
// if we have a new proposal, process it
case p := <-el.proposals:
// measure how long the event loop was idle waiting for an
// incoming event
el.metrics.HotStuffIdleDuration(time.Since(idleStart))
processStart := time.Now()
err := el.eventHandler.OnReceiveProposal(p)
// measure how long it takes for a proposal to be processed
el.metrics.HotStuffBusyDuration(time.Since(processStart), metrics.HotstuffEventTypeOnProposal)
if err != nil {
el.log.Fatal().Err(err).Msg("could not process proposal")
}
// if we have a new vote, process it
case v := <-el.votes:
// measure how long the event loop was idle waiting for an
// incoming event
el.metrics.HotStuffIdleDuration(time.Since(idleStart))
processStart := time.Now()
err := el.eventHandler.OnReceiveVote(v)
// measure how long it takes for a vote to be processed
el.metrics.HotStuffBusyDuration(time.Since(processStart), metrics.HotstuffEventTypeOnVote)
if err != nil {
el.log.Fatal().Err(err).Msg("could not process vote")
}
}
}
}
// SubmitProposal pushes the received block to the blockheader channel
func (el *EventLoop) SubmitProposal(proposalHeader *flow.Header, parentView uint64) {
received := time.Now()
proposal := model.ProposalFromFlow(proposalHeader, parentView)
select {
case el.proposals <- proposal:
case <-el.unit.Quit():
return
}
// the wait duration is measured as how long it takes from a block being
// received to event handler commencing the processing of the block
el.metrics.HotStuffWaitDuration(time.Since(received), metrics.HotstuffEventTypeOnProposal)
}
// SubmitVote pushes the received vote to the votes channel
func (el *EventLoop) SubmitVote(originID flow.Identifier, blockID flow.Identifier, view uint64, sigData []byte) {
received := time.Now()
vote := model.VoteFromFlow(originID, blockID, view, sigData)
select {
case el.votes <- vote:
case <-el.unit.Quit():
return
}
// the wait duration is measured as how long it takes from a vote being
// received to event handler commencing the processing of the vote
el.metrics.HotStuffWaitDuration(time.Since(received), metrics.HotstuffEventTypeOnVote)
}
// Ready implements interface module.ReadyDoneAware
// Method call will starts the EventLoop's internal processing loop.
// Multiple calls are handled gracefully and the event loop will only start
// once.
func (el *EventLoop) Ready() <-chan struct{} {
el.unit.Launch(el.loop)
return el.unit.Ready()
}
// Done implements interface module.ReadyDoneAware
func (el *EventLoop) Done() <-chan struct{} {
return el.unit.Done()
}