-
Notifications
You must be signed in to change notification settings - Fork 179
/
event_loop.go
221 lines (172 loc) · 7.05 KB
/
event_loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package eventloop
import (
"context"
"fmt"
"time"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
)
// EventLoop buffers all incoming events to the hotstuff EventHandler, and feeds EventHandler one event at a time.
type EventLoop struct {
*component.ComponentManager
log zerolog.Logger
eventHandler hotstuff.EventHandler
metrics module.HotstuffMetrics
proposals chan *model.Proposal
quorumCertificates chan *flow.QuorumCertificate
startTime time.Time
}
var _ hotstuff.EventLoop = (*EventLoop)(nil)
var _ component.Component = (*EventLoop)(nil)
// NewEventLoop creates an instance of EventLoop.
func NewEventLoop(log zerolog.Logger, metrics module.HotstuffMetrics, eventHandler hotstuff.EventHandler, startTime time.Time) (*EventLoop, error) {
proposals := make(chan *model.Proposal)
quorumCertificates := make(chan *flow.QuorumCertificate, 1)
el := &EventLoop{
log: log,
eventHandler: eventHandler,
metrics: metrics,
proposals: proposals,
quorumCertificates: quorumCertificates,
startTime: startTime,
}
componentBuilder := component.NewComponentManagerBuilder()
componentBuilder.AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
// launch when scheduled by el.startTime
el.log.Info().Msgf("event loop will start at: %v", startTime)
select {
case <-ctx.Done():
return
case <-time.After(time.Until(startTime)):
el.log.Info().Msgf("starting event loop")
err := el.loop(ctx)
if err != nil {
el.log.Error().Err(err).Msg("irrecoverable event loop error")
ctx.Throw(err)
}
}
})
el.ComponentManager = componentBuilder.Build()
return el, nil
}
func (el *EventLoop) loop(ctx context.Context) error {
err := el.eventHandler.Start()
if err != nil {
return fmt.Errorf("could not start event handler: %w", err)
}
// hotstuff will run in an event loop to process all events synchronously. And this is what will happen when hitting errors:
// if hotstuff hits a known critical error, it will exit the loop (for instance, there is a conflicting block with a QC against finalized blocks
// if hotstuff hits a known error indicating some assumption between components is broken, it will exit the loop (for instance, hotstuff receives a block whose parent is missing)
// if hotstuff hits a known error that is safe to be ignored, it will not exit the loop (for instance, invalid proposal)
// if hotstuff hits any unknown error, it will exit the loop
shutdownSignaled := ctx.Done()
for {
// Giving timeout events the priority to be processed first
// This is to prevent attacks from malicious nodes that attempt
// to block honest nodes' pacemaker from progressing by sending
// other events.
timeoutChannel := el.eventHandler.TimeoutChannel()
// the first select makes sure we process timeouts with priority
select {
// if we receive the shutdown signal, exit the loop
case <-shutdownSignaled:
return nil
// if we receive a time out, process it and log errors
case <-timeoutChannel:
processStart := time.Now()
err := el.eventHandler.OnLocalTimeout()
// measure how long it takes for a timeout event to be processed
el.metrics.HotStuffBusyDuration(time.Since(processStart), metrics.HotstuffEventTypeTimeout)
if err != nil {
return fmt.Errorf("could not process timeout: %w", err)
}
// At this point, we have received and processed an event from the timeout channel.
// A timeout also means, we have made progress. A new timeout will have
// been started and el.eventHandler.TimeoutChannel() will be a NEW channel (for the just-started timeout)
// Very important to start the for loop from the beginning, to continue the with the new timeout channel!
continue
default:
// fall through to non-priority events
}
idleStart := time.Now()
// select for block headers/QCs here
select {
// same as before
case <-shutdownSignaled:
return nil
// same as before
case <-timeoutChannel:
// measure how long the event loop was idle waiting for an
// incoming event
el.metrics.HotStuffIdleDuration(time.Since(idleStart))
processStart := time.Now()
err := el.eventHandler.OnLocalTimeout()
// measure how long it takes for a timeout event to be processed
el.metrics.HotStuffBusyDuration(time.Since(processStart), metrics.HotstuffEventTypeTimeout)
if err != nil {
return fmt.Errorf("could not process timeout: %w", err)
}
// if we have a new proposal, process it
case p := <-el.proposals:
// measure how long the event loop was idle waiting for an
// incoming event
el.metrics.HotStuffIdleDuration(time.Since(idleStart))
processStart := time.Now()
err := el.eventHandler.OnReceiveProposal(p)
// measure how long it takes for a proposal to be processed
el.metrics.HotStuffBusyDuration(time.Since(processStart), metrics.HotstuffEventTypeOnProposal)
if err != nil {
return fmt.Errorf("could not process proposal %v: %w", p.Block.BlockID, err)
}
el.log.Info().
Dur("dur_ms", time.Since(processStart)).
Uint64("view", p.Block.View).
Hex("block_id", p.Block.BlockID[:]).
Msg("block proposal has been processed successfully")
// if we have a new QC, process it
case qc := <-el.quorumCertificates:
// measure how long the event loop was idle waiting for an
// incoming event
el.metrics.HotStuffIdleDuration(time.Since(idleStart))
processStart := time.Now()
err := el.eventHandler.OnQCConstructed(qc)
// measure how long it takes for a QC to be processed
el.metrics.HotStuffBusyDuration(time.Since(processStart), metrics.HotstuffEventTypeOnQC)
if err != nil {
return fmt.Errorf("could not process QC: %w", err)
}
}
}
}
// SubmitProposal pushes the received block to the blockheader channel
func (el *EventLoop) SubmitProposal(proposalHeader *flow.Header, parentView uint64) {
received := time.Now()
proposal := model.ProposalFromFlow(proposalHeader, parentView)
select {
case el.proposals <- proposal:
case <-el.ComponentManager.ShutdownSignal():
return
}
// the wait duration is measured as how long it takes from a block being
// received to event handler commencing the processing of the block
el.metrics.HotStuffWaitDuration(time.Since(received), metrics.HotstuffEventTypeOnProposal)
}
// SubmitTrustedQC pushes the received QC to the quorumCertificates channel
func (el *EventLoop) SubmitTrustedQC(qc *flow.QuorumCertificate) {
received := time.Now()
select {
case el.quorumCertificates <- qc:
case <-el.ComponentManager.ShutdownSignal():
return
}
// the wait duration is measured as how long it takes from a qc being
// received to event handler commencing the processing of the qc
el.metrics.HotStuffWaitDuration(time.Since(received), metrics.HotstuffEventTypeOnQC)
}