-
Notifications
You must be signed in to change notification settings - Fork 170
/
staked_access_node_builder.go
269 lines (219 loc) · 8.67 KB
/
staked_access_node_builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
package node_builder
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/host"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/common/splitter"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/metrics/unstaked"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/dns"
"github.com/onflow/flow-go/network/topology"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
)
// StakedAccessNodeBuilder builds a staked access node. The staked access node can optionally participate in the
// unstaked network publishing data for the unstaked access node downstream.
type StakedAccessNodeBuilder struct {
*FlowAccessNodeBuilder
}
func NewStakedAccessNodeBuilder(anb *FlowAccessNodeBuilder) *StakedAccessNodeBuilder {
return &StakedAccessNodeBuilder{
FlowAccessNodeBuilder: anb,
}
}
func (fnb *StakedAccessNodeBuilder) InitIDProviders() {
fnb.Module("id providers", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) error {
idCache, err := p2p.NewProtocolStateIDCache(node.Logger, node.State, node.ProtocolEvents)
if err != nil {
return err
}
fnb.IdentityProvider = idCache
fnb.SyncEngineParticipantsProviderFactory = func() id.IdentifierProvider {
return id.NewFilteredIdentifierProvider(
filter.And(
filter.HasRole(flow.RoleConsensus),
filter.Not(filter.HasNodeID(node.Me.NodeID())),
p2p.NotEjectedFilter,
),
idCache,
)
}
fnb.IDTranslator = p2p.NewHierarchicalIDTranslator(idCache, p2p.NewUnstakedNetworkIDTranslator())
if !fnb.supportsUnstakedFollower {
fnb.NetworkingIdentifierProvider = id.NewFilteredIdentifierProvider(p2p.NotEjectedFilter, idCache)
}
return nil
})
}
func (builder *StakedAccessNodeBuilder) Initialize() error {
ctx, cancel := context.WithCancel(context.Background())
builder.Cancel = cancel
builder.InitIDProviders()
// if this is an access node that supports unstaked followers, enqueue the unstaked network
if builder.supportsUnstakedFollower {
builder.enqueueUnstakedNetworkInit(ctx)
} else {
// otherwise, enqueue the regular network
builder.EnqueueNetworkInit(ctx)
}
builder.EnqueueMetricsServerInit()
if err := builder.RegisterBadgerMetrics(); err != nil {
return err
}
builder.EnqueueAdminServerInit(ctx)
builder.EnqueueTracer()
return nil
}
func (anb *StakedAccessNodeBuilder) Build() AccessNodeBuilder {
anb.FlowAccessNodeBuilder.Build()
if anb.supportsUnstakedFollower {
var unstakedNetworkConduit network.Conduit
var proxyEngine *splitter.Engine
anb.
Component("unstaked sync request proxy", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
proxyEngine = splitter.New(node.Logger, engine.PublicSyncCommittee)
// register the proxy engine with the unstaked network
var err error
unstakedNetworkConduit, err = node.Network.Register(engine.PublicSyncCommittee, proxyEngine)
if err != nil {
return nil, fmt.Errorf("could not register unstaked sync request proxy: %w", err)
}
return proxyEngine, nil
}).
Component("unstaked sync request handler", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
syncRequestHandler := synceng.NewRequestHandlerEngine(
node.Logger.With().Bool("unstaked", true).Logger(),
unstaked.NewUnstakedEngineCollector(node.Metrics.Engine),
unstakedNetworkConduit,
node.Me,
node.Storage.Blocks,
anb.SyncCore,
anb.FinalizedHeader,
// don't queue missing heights from unstaked nodes
// since we are always more up-to-date than them
false,
)
// register the sync request handler with the proxy engine
proxyEngine.RegisterEngine(syncRequestHandler)
return syncRequestHandler, nil
})
}
anb.Component("ping engine", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ping, err := pingeng.New(
node.Logger,
node.State,
node.Me,
anb.PingMetrics,
anb.pingEnabled,
node.Middleware,
anb.nodeInfoFile,
)
if err != nil {
return nil, fmt.Errorf("could not create ping engine: %w", err)
}
return ping, nil
})
return anb
}
// enqueueUnstakedNetworkInit enqueues the unstaked network component initialized for the staked node
func (builder *StakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.Context) {
builder.Component("unstaked network", func(_ cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
libP2PFactory, err := builder.initLibP2PFactory(ctx,
builder.NodeID,
builder.NodeConfig.NetworkKey)
if err != nil {
return nil, err
}
msgValidators := unstakedNetworkMsgValidators(node.Logger, node.IdentityProvider, builder.NodeID)
middleware := builder.initMiddleware(builder.NodeID, node.Metrics.Network, libP2PFactory, msgValidators...)
// topology returns empty list since peers are not known upfront
top := topology.EmptyListTopology{}
network, err := builder.initNetwork(builder.Me, node.Metrics.Network, middleware, top)
if err != nil {
return nil, err
}
builder.Network = network
builder.Middleware = middleware
idEvents := gadgets.NewIdentityDeltas(builder.Middleware.UpdateNodeAddresses)
builder.ProtocolEvents.AddConsumer(idEvents)
node.Logger.Info().Msgf("network will run on address: %s", builder.BindAddr)
return builder.Network, err
})
}
// initLibP2PFactory creates the LibP2P factory function for the given node ID and network key.
// The factory function is later passed into the initMiddleware function to eventually instantiate the p2p.LibP2PNode instance
// The LibP2P host is created with the following options:
// DHT as server
// The address from the node config or the specified bind address as the listen address
// The passed in private key as the libp2p key
// No connection gater
// Default Flow libp2p pubsub options
func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
nodeID flow.Identifier,
networkKey crypto.PrivateKey) (p2p.LibP2PFactoryFunc, error) {
// The staked nodes act as the DHT servers
dhtOptions := []dht.Option{p2p.AsServer(true)}
myAddr := builder.NodeConfig.Me.Address()
if builder.BaseConfig.BindAddr != cmd.NotSet {
myAddr = builder.BaseConfig.BindAddr
}
connManager := p2p.NewConnManager(builder.Logger, builder.Metrics.Network, p2p.TrackUnstakedConnections(builder.IdentityProvider))
resolver := dns.NewResolver(builder.Metrics.Network, dns.WithTTL(builder.BaseConfig.DNSCacheTTL))
return func() (*p2p.Node, error) {
psOpts := p2p.DefaultPubsubOptions(p2p.DefaultMaxPubSubMsgSize)
psOpts = append(psOpts, func(_ context.Context, h host.Host) (pubsub.Option, error) {
return pubsub.WithSubscriptionFilter(p2p.NewRoleBasedFilter(
h.ID(), builder.RootBlock.ID(), builder.IdentityProvider,
)), nil
})
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, myAddr, networkKey).
SetRootBlockID(builder.RootBlock.ID()).
// no connection gater
SetConnectionManager(connManager).
// act as a DHT server
SetDHTOptions(dhtOptions...).
SetPubsubOptions(psOpts...).
SetLogger(builder.Logger).
SetResolver(resolver).
Build(ctx)
if err != nil {
return nil, err
}
builder.LibP2PNode = libp2pNode
return builder.LibP2PNode, nil
}, nil
}
// initMiddleware creates the network.Middleware implementation with the libp2p factory function, metrics, peer update
// interval, and validators. The network.Middleware is then passed into the initNetwork function.
func (builder *StakedAccessNodeBuilder) initMiddleware(nodeID flow.Identifier,
networkMetrics module.NetworkMetrics,
factoryFunc p2p.LibP2PFactoryFunc,
validators ...network.MessageValidator) network.Middleware {
// disable connection pruning for the staked AN which supports the unstaked AN
peerManagerFactory := p2p.PeerManagerFactory([]p2p.Option{p2p.WithInterval(builder.PeerUpdateInterval)}, p2p.WithConnectionPruning(false))
builder.Middleware = p2p.NewMiddleware(
builder.Logger,
factoryFunc,
nodeID,
networkMetrics,
builder.RootBlock.ID(),
p2p.DefaultUnicastTimeout,
false, // no connection gating to allow unstaked nodes to connect
builder.IDTranslator,
p2p.WithMessageValidators(validators...),
p2p.WithPeerManager(peerManagerFactory),
// use default identifier provider
)
return builder.Middleware
}