forked from wormhole-foundation/wormhole
-
Notifications
You must be signed in to change notification settings - Fork 0
/
node.go
239 lines (199 loc) · 9.16 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package node
import (
"context"
"crypto/ecdsa"
"fmt"
"github.com/certusone/wormhole/node/pkg/accountant"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/governor"
"github.com/certusone/wormhole/node/pkg/gwrelayer"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/grpc"
)
const (
// gossipSendBufferSize configures the size of the gossip network send buffer
gossipSendBufferSize = 5000
// inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians.
// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
inboundObservationBufferSize = 10000
// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
// So in the worst case the entire queue can be processed in 2s.
inboundSignedVaaBufferSize = 1000
// observationRequestInboundBufferSize configures the size of obsvReqC.
// Messages from there are immediately sent to the per-chain observation request channels, which are more important to configure.
observationRequestInboundBufferSize = 500
// observationRequestOutboundBufferSize configures the size of obsvReqSendC
// and thereby somewhat limits the amount of observation requests that can be sent in bursts to the network.
observationRequestOutboundBufferSize = 100
// observationRequestPerChainBufferSize is the buffer size of the per-network reobservation channel
observationRequestPerChainBufferSize = 100
)
type PrometheusCtxKey struct{}
type G struct {
// rootCtxCancel is a context.CancelFunc. It MUST be a root context for any context that is passed to any member function of G.
// It can be used by components to shut down the entire node if they encounter an unrecoverable state.
rootCtxCancel context.CancelFunc
env common.Environment
// keys
gk *ecdsa.PrivateKey
// components
db *db.Database
gst *common.GuardianSetState
acct *accountant.Accountant
gov *governor.ChainGovernor
gatewayRelayer *gwrelayer.GatewayRelayer
queryHandler *query.QueryHandler
publicrpcServer *grpc.Server
// runnables
runnablesWithScissors map[string]supervisor.Runnable
runnables map[string]supervisor.Runnable
// various channels
// Outbound gossip message queue (needs to be read/write because p2p needs read/write)
gossipSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Finalized guardian observations aggregated across all chains
msgC channelPair[*common.MessagePublication]
// Ethereum incoming guardian set updates
setC channelPair[*common.GuardianSet]
// Inbound signed VAAs
signedInC channelPair[*gossipv1.SignedVAAWithQuorum]
// Inbound observation requests from the p2p service (for all chains)
obsvReqC channelPair[*gossipv1.ObservationRequest]
// Outbound observation requests
obsvReqSendC channelPair[*gossipv1.ObservationRequest]
// acctC is the channel where messages will be put after they reached quorum in the accountant.
acctC channelPair[*common.MessagePublication]
// Cross Chain Query Handler channels
chainQueryReqC map[vaa.ChainID]chan *query.PerChainQueryInternal
signedQueryReqC channelPair[*gossipv1.SignedQueryRequest]
queryResponseC channelPair[*query.PerChainQueryResponseInternal]
queryResponsePublicationC channelPair[*query.QueryResponsePublication]
}
func NewGuardianNode(
env common.Environment,
gk *ecdsa.PrivateKey,
) *G {
g := G{
env: env,
gk: gk,
}
return &g
}
// initializeBasic sets up everything that every GuardianNode needs before any options can be applied.
func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
g.rootCtxCancel = rootCtxCancel
// Setup various channels...
g.gossipSendC = make(chan []byte, gossipSendBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize)
g.obsvReqSendC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestOutboundBufferSize)
g.acctC = makeChannelPair[*common.MessagePublication](accountant.MsgChannelCapacity)
// Cross Chain Query Handler channels
g.chainQueryReqC = make(map[vaa.ChainID]chan *query.PerChainQueryInternal)
g.signedQueryReqC = makeChannelPair[*gossipv1.SignedQueryRequest](query.SignedQueryRequestChannelSize)
g.queryResponseC = makeChannelPair[*query.PerChainQueryResponseInternal](0)
g.queryResponsePublicationC = makeChannelPair[*query.QueryResponsePublication](0)
// Guardian set state managed by processor
g.gst = common.NewGuardianSetState(nil)
// allocate maps
g.runnablesWithScissors = make(map[string]supervisor.Runnable)
g.runnables = make(map[string]supervisor.Runnable)
}
// applyOptions applies `options` to the GuardianNode.
// Each option must have a unique option.name.
// If an option has `dependencies`, they must be defined before that option.
func (g *G) applyOptions(ctx context.Context, logger *zap.Logger, options []*GuardianOption) error {
configuredComponents := make(map[string]struct{}) // using `map[string]struct{}` to implement a set here
for _, option := range options {
// check that this component has not been configured yet
if _, ok := configuredComponents[option.name]; ok {
return fmt.Errorf("Component %s is already configured and cannot be configured a second time.", option.name)
}
// check that all dependencies have been met
for _, dep := range option.dependencies {
if _, ok := configuredComponents[dep]; !ok {
return fmt.Errorf("Component %s requires %s to be configured first. Check the order of your options.", option.name, dep)
}
}
// run the config
err := option.f(ctx, logger, g)
if err != nil {
return fmt.Errorf("Error applying option for component %s: %w", option.name, err)
}
// mark the component as configured
configuredComponents[option.name] = struct{}{}
}
return nil
}
func (g *G) Run(rootCtxCancel context.CancelFunc, options ...*GuardianOption) supervisor.Runnable {
return func(ctx context.Context) error {
logger := supervisor.Logger(ctx)
g.initializeBasic(rootCtxCancel)
if err := g.applyOptions(ctx, logger, options); err != nil {
logger.Fatal("failed to initialize GuardianNode", zap.Error(err))
}
logger.Info("GuardianNode initialization done.") // Do not modify this message, node_test.go relies on it.
// Start the watchers
for runnableName, runnable := range g.runnablesWithScissors {
logger.Info("Starting runnablesWithScissors: " + runnableName)
if err := supervisor.Run(ctx, runnableName, common.WrapWithScissors(runnable, runnableName)); err != nil {
logger.Fatal("error starting runnablesWithScissors", zap.Error(err))
}
}
// TODO there is an opportunity to refactor the startup of the accountant and governor:
// Ideally they should just register a g.runnables["governor"] and g.runnables["accountant"] instead of being treated as special cases.
if g.acct != nil {
logger.Info("Starting accountant")
if err := g.acct.Start(ctx); err != nil {
logger.Fatal("acct: failed to start accountant", zap.Error(err))
}
defer g.acct.Close()
}
if g.gov != nil {
logger.Info("Starting governor")
if err := g.gov.Run(ctx); err != nil {
logger.Fatal("failed to create chain governor", zap.Error(err))
}
}
if g.gatewayRelayer != nil {
logger.Info("Starting gateway relayer")
if err := g.gatewayRelayer.Start(ctx); err != nil {
logger.Fatal("failed to start gateway relayer", zap.Error(err), zap.String("component", "gwrelayer"))
}
}
if g.queryHandler != nil {
logger.Info("Starting query handler", zap.String("component", "ccq"))
if err := g.queryHandler.Start(ctx); err != nil {
logger.Fatal("failed to create query handler", zap.Error(err), zap.String("component", "ccq"))
}
}
// Start any other runnables
for name, runnable := range g.runnables {
if err := supervisor.Run(ctx, name, runnable); err != nil {
logger.Fatal("failed to start other runnable", zap.Error(err))
}
}
logger.Info("Started internal services")
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
return nil
}
}
type channelPair[T any] struct {
readC <-chan T
writeC chan<- T
}
func makeChannelPair[T any](cap int) channelPair[T] {
out := make(chan T, cap)
return channelPair[T]{out, out}
}