-
Notifications
You must be signed in to change notification settings - Fork 175
/
engine.go
151 lines (130 loc) · 5.11 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Package pusher implements an engine for providing access to resources held
// by the collection node, including collections, collection guarantees, and
// transactions.
package pusher
import (
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
)
// Engine is the collection pusher engine, which provides access to resources
// held by the collection node.
type Engine struct {
unit *engine.Unit
log zerolog.Logger
engMetrics module.EngineMetrics
colMetrics module.CollectionMetrics
conduit network.Conduit
me module.Local
state protocol.State
collections storage.Collections
transactions storage.Transactions
}
func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) {
e := &Engine{
unit: engine.NewUnit(),
log: log.With().Str("engine", "pusher").Logger(),
engMetrics: engMetrics,
colMetrics: colMetrics,
me: me,
state: state,
collections: collections,
transactions: transactions,
}
conduit, err := net.Register(channels.PushGuarantees, e)
if err != nil {
return nil, fmt.Errorf("could not register for push protocol: %w", err)
}
e.conduit = conduit
return e, nil
}
// Ready returns a ready channel that is closed once the engine has fully
// started.
func (e *Engine) Ready() <-chan struct{} {
return e.unit.Ready()
}
// Done returns a done channel that is closed once the engine has fully stopped.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
}
// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
e.unit.Launch(func() {
err := e.process(e.me.NodeID(), event)
if err != nil {
engine.LogError(e.log, err)
}
})
}
// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
e.unit.Launch(func() {
err := e.process(originID, event)
if err != nil {
engine.LogError(e.log, err)
}
})
}
// ProcessLocal processes an event originating on the local node.
func (e *Engine) ProcessLocal(event interface{}) error {
return e.unit.Do(func() error {
return e.process(e.me.NodeID(), event)
})
}
// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
return e.unit.Do(func() error {
return e.process(originID, event)
})
}
// process processes events for the pusher engine on the collection node.
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
switch ev := event.(type) {
case *messages.SubmitCollectionGuarantee:
e.engMetrics.MessageReceived(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee)
defer e.engMetrics.MessageHandled(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee)
return e.onSubmitCollectionGuarantee(originID, ev)
default:
return fmt.Errorf("invalid event type (%T)", event)
}
}
// onSubmitCollectionGuarantee handles submitting the given collection guarantee
// to consensus nodes.
func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *messages.SubmitCollectionGuarantee) error {
if originID != e.me.NodeID() {
return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID)
}
return e.SubmitCollectionGuarantee(&req.Guarantee)
}
// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes.
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {
consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus))
if err != nil {
return fmt.Errorf("could not get consensus nodes: %w", err)
}
// NOTE: Consensus nodes do not broadcast guarantees among themselves, so it needs that
// at least one collection node make a publish to all of them.
err = e.conduit.Publish(guarantee, consensusNodes.NodeIDs()...)
if err != nil {
return fmt.Errorf("could not submit collection guarantee: %w", err)
}
e.engMetrics.MessageSent(metrics.EngineCollectionProvider, metrics.MessageCollectionGuarantee)
e.log.Debug().
Hex("guarantee_id", logging.ID(guarantee.ID())).
Hex("ref_block_id", logging.ID(guarantee.ReferenceBlockID)).
Msg("submitting collection guarantee")
return nil
}