-
Notifications
You must be signed in to change notification settings - Fork 174
/
engine.go
188 lines (167 loc) · 6.35 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package compliance
import (
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/consensus/hotstuff/tracker"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/collection"
"github.com/onflow/flow-go/engine/common/fifoqueue"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
// defaultBlockQueueCapacity maximum capacity of inbound queue for `messages.ClusterBlockProposal`s
const defaultBlockQueueCapacity = 10_000
// Engine is a wrapper struct for `Core` which implements cluster consensus algorithm.
// Engine is responsible for handling incoming messages, queueing for processing, broadcasting proposals.
// Implements collection.Compliance interface.
type Engine struct {
*component.ComponentManager
log zerolog.Logger
metrics module.EngineMetrics
me module.Local
headers storage.Headers
payloads storage.ClusterPayloads
state protocol.State
core *Core
pendingBlocks *fifoqueue.FifoQueue // queue for processing inbound blocks
pendingBlocksNotifier engine.Notifier
finalizedBlockTracker *tracker.NewestBlockTracker
finalizedBlockNotifier engine.Notifier
}
var _ collection.Compliance = (*Engine)(nil)
func NewEngine(
log zerolog.Logger,
me module.Local,
state protocol.State,
payloads storage.ClusterPayloads,
core *Core,
) (*Engine, error) {
engineLog := log.With().Str("cluster_compliance", "engine").Logger()
// FIFO queue for block proposals
blocksQueue, err := fifoqueue.NewFifoQueue(
defaultBlockQueueCapacity,
fifoqueue.WithLengthObserver(func(len int) {
core.mempoolMetrics.MempoolEntries(metrics.ResourceClusterBlockProposalQueue, uint(len))
}),
)
if err != nil {
return nil, fmt.Errorf("failed to create queue for inbound block proposals: %w", err)
}
eng := &Engine{
log: engineLog,
metrics: core.engineMetrics,
me: me,
headers: core.headers,
payloads: payloads,
state: state,
core: core,
pendingBlocks: blocksQueue,
pendingBlocksNotifier: engine.NewNotifier(),
finalizedBlockTracker: tracker.NewNewestBlockTracker(),
finalizedBlockNotifier: engine.NewNotifier(),
}
// create the component manager and worker threads
eng.ComponentManager = component.NewComponentManagerBuilder().
AddWorker(eng.processBlocksLoop).
AddWorker(eng.finalizationProcessingLoop).
Build()
return eng, nil
}
// processBlocksLoop processes available blocks as they are queued.
func (e *Engine) processBlocksLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
doneSignal := ctx.Done()
newMessageSignal := e.pendingBlocksNotifier.Channel()
for {
select {
case <-doneSignal:
return
case <-newMessageSignal:
err := e.processQueuedBlocks(doneSignal)
if err != nil {
ctx.Throw(err)
}
}
}
}
// processQueuedBlocks processes any available messages from the inbound queues.
// Only returns when all inbound queues are empty (or the engine is terminated).
// No errors expected during normal operations.
func (e *Engine) processQueuedBlocks(doneSignal <-chan struct{}) error {
for {
select {
case <-doneSignal:
return nil
default:
}
msg, ok := e.pendingBlocks.Pop()
if ok {
inBlock := msg.(flow.Slashable[messages.ClusterBlockProposal])
err := e.core.OnBlockProposal(inBlock.OriginID, inBlock.Message)
e.core.engineMetrics.MessageHandled(metrics.EngineClusterCompliance, metrics.MessageBlockProposal)
if err != nil {
return fmt.Errorf("could not handle block proposal: %w", err)
}
continue
}
// when there is no more messages in the queue, back to the loop to wait
// for the next incoming message to arrive.
return nil
}
}
// OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer`
// It informs compliance.Core about finalization of the respective block.
//
// CAUTION: the input to this callback is treated as trusted; precautions should be taken that messages
// from external nodes cannot be considered as inputs to this function
func (e *Engine) OnFinalizedBlock(block *model.Block) {
if e.finalizedBlockTracker.Track(block) {
e.finalizedBlockNotifier.Notify()
}
}
// OnClusterBlockProposal feeds a new block proposal into the processing pipeline.
// Incoming proposals are queued and eventually dispatched by worker.
func (e *Engine) OnClusterBlockProposal(proposal flow.Slashable[messages.ClusterBlockProposal]) {
e.core.engineMetrics.MessageReceived(metrics.EngineClusterCompliance, metrics.MessageBlockProposal)
if e.pendingBlocks.Push(proposal) {
e.pendingBlocksNotifier.Notify()
} else {
e.core.engineMetrics.InboundMessageDropped(metrics.EngineClusterCompliance, metrics.MessageBlockProposal)
}
}
// OnSyncedClusterBlock feeds a block obtained from sync proposal into the processing pipeline.
// Incoming proposals are queued and eventually dispatched by worker.
func (e *Engine) OnSyncedClusterBlock(syncedBlock flow.Slashable[messages.ClusterBlockProposal]) {
e.core.engineMetrics.MessageReceived(metrics.EngineClusterCompliance, metrics.MessageSyncedClusterBlock)
if e.pendingBlocks.Push(syncedBlock) {
e.pendingBlocksNotifier.Notify()
} else {
e.core.engineMetrics.InboundMessageDropped(metrics.EngineClusterCompliance, metrics.MessageSyncedClusterBlock)
}
}
// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events
func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
doneSignal := ctx.Done()
blockFinalizedSignal := e.finalizedBlockNotifier.Channel()
for {
select {
case <-doneSignal:
return
case <-blockFinalizedSignal:
// retrieve the latest finalized header, so we know the height
finalHeader, err := e.headers.ByBlockID(e.finalizedBlockTracker.NewestBlock().BlockID)
if err != nil { // no expected errors
ctx.Throw(err)
}
e.core.ProcessFinalizedBlock(finalHeader)
}
}
}