-
Notifications
You must be signed in to change notification settings - Fork 175
/
engine.go
315 lines (284 loc) · 11.6 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
package sealing
import (
"fmt"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/mempool"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/fifoqueue"
)
type Event struct {
OriginID flow.Identifier
Msg interface{}
}
// defaultReceiptQueueCapacity maximum capacity of receipts queue
const defaultReceiptQueueCapacity = 10000
// defaultApprovalQueueCapacity maximum capacity of approvals queue
const defaultApprovalQueueCapacity = 10000
// defaultApprovalResponseQueueCapacity maximum capacity of approval requests queue
const defaultApprovalResponseQueueCapacity = 10000
type (
EventSink chan *Event // Channel to push pending events
)
// Engine is a wrapper for sealing `Core` which implements logic for
// queuing and filtering network messages which later will be processed by sealing engine.
// Purpose of this struct is to provide an efficient way how to consume messages from network layer and pass
// them to `Core`. Engine runs 2 separate gorourtines that perform pre-processing and consuming messages by Core.
type Engine struct {
unit *engine.Unit
log zerolog.Logger
me module.Local
core *Core
cacheMetrics module.MempoolMetrics
engineMetrics module.EngineMetrics
receiptSink EventSink
approvalSink EventSink
requestedApprovalSink EventSink
pendingReceipts *fifoqueue.FifoQueue
pendingApprovals *fifoqueue.FifoQueue
pendingRequestedApprovals *fifoqueue.FifoQueue
pendingEventSink EventSink
requiredApprovalsForSealConstruction uint
}
// NewEngine constructs new `EngineEngine` which runs on it's own unit.
func NewEngine(log zerolog.Logger,
engineMetrics module.EngineMetrics,
tracer module.Tracer,
mempool module.MempoolMetrics,
conMetrics module.ConsensusMetrics,
net module.Network,
state protocol.State,
me module.Local,
receiptRequester module.Requester,
receiptsDB storage.ExecutionReceipts,
headersDB storage.Headers,
indexDB storage.Index,
incorporatedResults mempool.IncorporatedResults,
receipts mempool.ExecutionTree,
approvals mempool.Approvals,
seals mempool.IncorporatedResultSeals,
pendingReceipts mempool.PendingReceipts,
assigner module.ChunkAssigner,
receiptValidator module.ReceiptValidator,
approvalValidator module.ApprovalValidator,
requiredApprovalsForSealConstruction uint,
emergencySealingActive bool) (*Engine, error) {
e := &Engine{
unit: engine.NewUnit(),
log: log,
me: me,
core: nil,
engineMetrics: engineMetrics,
cacheMetrics: mempool,
receiptSink: make(EventSink),
approvalSink: make(EventSink),
requestedApprovalSink: make(EventSink),
pendingEventSink: make(EventSink),
requiredApprovalsForSealConstruction: requiredApprovalsForSealConstruction,
}
// FIFO queue for inbound receipts
var err error
e.pendingReceipts, err = fifoqueue.NewFifoQueue(
fifoqueue.WithCapacity(defaultReceiptQueueCapacity),
fifoqueue.WithLengthObserver(func(len int) { mempool.MempoolEntries(metrics.ResourceReceiptQueue, uint(len)) }),
)
if err != nil {
return nil, fmt.Errorf("failed to create queue for inbound receipts: %w", err)
}
// FIFO queue for broadcasted approvals
e.pendingApprovals, err = fifoqueue.NewFifoQueue(
fifoqueue.WithCapacity(defaultApprovalQueueCapacity),
fifoqueue.WithLengthObserver(func(len int) { mempool.MempoolEntries(metrics.ResourceApprovalQueue, uint(len)) }),
)
if err != nil {
return nil, fmt.Errorf("failed to create queue for inbound approvals: %w", err)
}
// FiFo queue for requested approvals
e.pendingRequestedApprovals, err = fifoqueue.NewFifoQueue(
fifoqueue.WithCapacity(defaultApprovalResponseQueueCapacity),
fifoqueue.WithLengthObserver(func(len int) { mempool.MempoolEntries(metrics.ResourceApprovalResponseQueue, uint(len)) }),
)
if err != nil {
return nil, fmt.Errorf("failed to create queue for requested approvals: %w", err)
}
// register engine with the receipt provider
_, err = net.Register(engine.ReceiveReceipts, e)
if err != nil {
return nil, fmt.Errorf("could not register for results: %w", err)
}
// register engine with the approval provider
_, err = net.Register(engine.ReceiveApprovals, e)
if err != nil {
return nil, fmt.Errorf("could not register for approvals: %w", err)
}
// register engine to the channel for requesting missing approvals
approvalConduit, err := net.Register(engine.RequestApprovalsByChunk, e)
if err != nil {
return nil, fmt.Errorf("could not register for requesting approvals: %w", err)
}
e.core, err = NewCore(log, engineMetrics, tracer, mempool, conMetrics, state, me, receiptRequester, receiptsDB, headersDB,
indexDB, incorporatedResults, receipts, approvals, seals, pendingReceipts, assigner, receiptValidator, approvalValidator,
requiredApprovalsForSealConstruction, emergencySealingActive, approvalConduit)
if err != nil {
return nil, fmt.Errorf("failed to init sealing engine: %w", err)
}
return e, nil
}
// Process sends event into channel with pending events. Generally speaking shouldn't lock for too long.
func (e *Engine) Process(originID flow.Identifier, event interface{}) error {
e.pendingEventSink <- &Event{
OriginID: originID,
Msg: event,
}
return nil
}
// processEvents is processor of pending events which drives events from networking layer to business logic in `Core`.
// Effectively consumes messages from networking layer and dispatches them into corresponding sinks which are connected with `Core`.
// Should be run as a separate goroutine.
func (e *Engine) processEvents() {
// takes pending event from one of the queues
// nil sink means nothing to send, this prevents blocking on select
fetchEvent := func() (*Event, EventSink, *fifoqueue.FifoQueue) {
if val, ok := e.pendingReceipts.Front(); ok {
return val.(*Event), e.receiptSink, e.pendingReceipts
}
if val, ok := e.pendingRequestedApprovals.Front(); ok {
return val.(*Event), e.requestedApprovalSink, e.pendingRequestedApprovals
}
if val, ok := e.pendingApprovals.Front(); ok {
return val.(*Event), e.approvalSink, e.pendingApprovals
}
return nil, nil, nil
}
for {
pendingEvent, sink, fifo := fetchEvent()
select {
case event := <-e.pendingEventSink:
e.processPendingEvent(event)
case sink <- pendingEvent:
fifo.Pop()
continue
case <-e.unit.Quit():
return
}
}
}
// processPendingEvent saves pending event in corresponding queue for further processing by `Core`.
// While this function runs in separate goroutine it shouldn't do heavy processing to maintain efficient data polling/pushing.
func (e *Engine) processPendingEvent(event *Event) {
switch event.Msg.(type) {
case *flow.ExecutionReceipt:
e.engineMetrics.MessageReceived(metrics.EngineSealing, metrics.MessageExecutionReceipt)
e.pendingReceipts.Push(event)
case *flow.ResultApproval:
e.engineMetrics.MessageReceived(metrics.EngineSealing, metrics.MessageResultApproval)
if e.requiredApprovalsForSealConstruction < 1 {
// if we don't require approvals to construct a seal, don't even process approvals.
return
}
e.pendingApprovals.Push(event)
case *messages.ApprovalResponse:
e.engineMetrics.MessageReceived(metrics.EngineSealing, metrics.MessageResultApproval)
if e.requiredApprovalsForSealConstruction < 1 {
// if we don't require approvals to construct a seal, don't even process approvals.
return
}
e.pendingRequestedApprovals.Push(event)
}
}
// consumeEvents consumes events that are ready to be processed.
func (e *Engine) consumeEvents() {
// Context:
// We expect a lot more Approvals compared to blocks or receipts. However, the level of
// information only changes significantly with new blocks or new receipts.
// We used to kick off the sealing check after every approval and receipt. In cases where
// the sealing check takes a lot more time than processing the actual messages (which we
// assume for the current implementation), we incur a large overhead as we check a lot
// of conditions, which only change with new blocks or new receipts.
// TEMPORARY FIX: to avoid sealing checks to monopolize the engine and delay processing
// of receipts and approvals. Specifically, we schedule sealing checks every 2 seconds.
checkSealingTicker := make(chan struct{})
defer close(checkSealingTicker)
e.unit.LaunchPeriodically(func() {
checkSealingTicker <- struct{}{}
}, 2*time.Second, 10*time.Second)
for {
var err error
select {
case event := <-e.receiptSink:
err = e.core.OnReceipt(event.OriginID, event.Msg.(*flow.ExecutionReceipt))
e.engineMetrics.MessageHandled(metrics.EngineSealing, metrics.MessageExecutionReceipt)
case event := <-e.approvalSink:
err = e.core.OnApproval(event.OriginID, event.Msg.(*flow.ResultApproval))
e.engineMetrics.MessageHandled(metrics.EngineSealing, metrics.MessageResultApproval)
case event := <-e.requestedApprovalSink:
err = e.core.OnApproval(event.OriginID, &event.Msg.(*messages.ApprovalResponse).Approval)
e.engineMetrics.MessageHandled(metrics.EngineSealing, metrics.MessageResultApproval)
case <-checkSealingTicker:
err = e.core.CheckSealing()
case <-e.unit.Quit():
return
}
if err != nil {
// Public methods of `Core` are supposed to handle all errors internally.
// Here if error happens it means that internal state is corrupted or we have caught
// exception while processing. In such case best just to abort the node.
e.log.Fatal().Err(err).Msgf("fatal internal error in sealing core logic")
}
}
}
// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
e.Submit(e.me.NodeID(), event)
}
// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(originID flow.Identifier, event interface{}) {
err := e.Process(originID, event)
if err != nil {
engine.LogError(e.log, err)
}
}
// ProcessLocal processes an event originating on the local node.
func (e *Engine) ProcessLocal(event interface{}) error {
return e.Process(e.me.NodeID(), event)
}
// HandleReceipt pipes explicitly requested receipts to the process function.
// Receipts can come from this function or the receipt provider setup in the
// engine constructor.
func (e *Engine) HandleReceipt(originID flow.Identifier, receipt flow.Entity) {
e.log.Debug().Msg("received receipt from requester engine")
err := e.Process(originID, receipt)
if err != nil {
e.log.Error().Err(err).Hex("origin", originID[:]).Msg("could not process receipt")
}
}
// Ready returns a ready channel that is closed once the engine has fully
// started. For the propagation engine, we consider the engine up and running
// upon initialization.
func (e *Engine) Ready() <-chan struct{} {
var wg sync.WaitGroup
wg.Add(2)
e.unit.Launch(func() {
wg.Done()
e.processEvents()
})
e.unit.Launch(func() {
wg.Done()
e.consumeEvents()
})
return e.unit.Ready(func() {
wg.Wait()
})
}
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
}