-
Notifications
You must be signed in to change notification settings - Fork 105
/
watchtower.go
408 lines (360 loc) · 12.5 KB
/
watchtower.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
package watchtower
import (
"fmt"
"math/big"
"math/rand"
"net/http"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/fatih/color"
"github.com/urfave/cli"
"github.com/rocket-pool/rocketpool-go/dao/trustednode"
"github.com/rocket-pool/rocketpool-go/rocketpool"
"github.com/rocket-pool/smartnode/rocketpool/watchtower/collectors"
"github.com/rocket-pool/smartnode/shared/services"
"github.com/rocket-pool/smartnode/shared/services/beacon"
"github.com/rocket-pool/smartnode/shared/services/state"
"github.com/rocket-pool/smartnode/shared/utils/log"
)
// Config
var minTasksInterval, _ = time.ParseDuration("4m")
var maxTasksInterval, _ = time.ParseDuration("6m")
var taskCooldown, _ = time.ParseDuration("5s")
const (
MaxConcurrentEth1Requests = 200
RespondChallengesColor = color.FgWhite
ClaimRplRewardsColor = color.FgGreen
SubmitRplPriceColor = color.FgYellow
SubmitNetworkBalancesColor = color.FgYellow
DissolveTimedOutMinipoolsColor = color.FgMagenta
SubmitScrubMinipoolsColor = color.FgHiGreen
ErrorColor = color.FgRed
MetricsColor = color.FgHiYellow
SubmitRewardsTreeColor = color.FgHiCyan
WarningColor = color.FgYellow
ProcessPenaltiesColor = color.FgHiMagenta
CancelBondsColor = color.FgGreen
CheckSoloMigrationsColor = color.FgCyan
FinalizeProposalsColor = color.FgMagenta
UpdateColor = color.FgHiWhite
)
// Register watchtower command
func RegisterCommands(app *cli.App, name string, aliases []string) {
app.Commands = append(app.Commands, cli.Command{
Name: name,
Aliases: aliases,
Usage: "Run Rocket Pool watchtower activity daemon",
Action: func(c *cli.Context) error {
return run(c)
},
})
}
// Run daemon
func run(c *cli.Context) error {
// Configure
configureHTTP()
// Wait until node is registered
if err := services.WaitNodeRegistered(c, true); err != nil {
return err
}
// Get services
cfg, err := services.GetConfig(c)
if err != nil {
return err
}
rp, err := services.GetRocketPool(c)
if err != nil {
return err
}
w, err := services.GetWallet(c)
if err != nil {
return err
}
bc, err := services.GetBeaconClient(c)
if err != nil {
return err
}
// Print the current mode
if cfg.IsNativeMode {
fmt.Println("Starting watchtower daemon in Native Mode.")
} else {
fmt.Println("Starting watchtower daemon in Docker Mode.")
}
// Check if rolling records are enabled
useRollingRecords := cfg.Smartnode.UseRollingRecords.Value.(bool)
if useRollingRecords {
fmt.Println("***NOTE: EXPERIMENTAL ROLLING RECORDS ARE ENABLED, BE ADVISED!***")
}
// Initialize the metrics reporters
scrubCollector := collectors.NewScrubCollector()
bondReductionCollector := collectors.NewBondReductionCollector()
soloMigrationCollector := collectors.NewSoloMigrationCollector()
// Initialize error logger
errorLog := log.NewColorLogger(ErrorColor)
updateLog := log.NewColorLogger(UpdateColor)
// Create the state manager
m, err := state.NewNetworkStateManager(rp, cfg, rp.Client, bc, &updateLog)
if err != nil {
return err
}
// Get the node address
nodeAccount, err := w.GetNodeAccount()
if err != nil {
return fmt.Errorf("error getting node account: %w", err)
}
// Initialize tasks
respondChallenges, err := newRespondChallenges(c, log.NewColorLogger(RespondChallengesColor), m)
if err != nil {
return fmt.Errorf("error during respond-to-challenges check: %w", err)
}
submitRplPrice, err := newSubmitRplPrice(c, log.NewColorLogger(SubmitRplPriceColor), errorLog)
if err != nil {
return fmt.Errorf("error during rpl price check: %w", err)
}
submitNetworkBalances, err := newSubmitNetworkBalances(c, log.NewColorLogger(SubmitNetworkBalancesColor), errorLog)
if err != nil {
return fmt.Errorf("error during network balances check: %w", err)
}
dissolveTimedOutMinipools, err := newDissolveTimedOutMinipools(c, log.NewColorLogger(DissolveTimedOutMinipoolsColor))
if err != nil {
return fmt.Errorf("error during timed-out minipools check: %w", err)
}
submitScrubMinipools, err := newSubmitScrubMinipools(c, log.NewColorLogger(SubmitScrubMinipoolsColor), errorLog, scrubCollector)
if err != nil {
return fmt.Errorf("error during scrub check: %w", err)
}
var submitRewardsTree_Stateless *submitRewardsTree_Stateless
var submitRewardsTree_Rolling *submitRewardsTree_Rolling
if !useRollingRecords {
submitRewardsTree_Stateless, err = newSubmitRewardsTree_Stateless(c, log.NewColorLogger(SubmitRewardsTreeColor), errorLog, m)
if err != nil {
return fmt.Errorf("error during stateless rewards tree check: %w", err)
}
} else {
submitRewardsTree_Rolling, err = newSubmitRewardsTree_Rolling(c, log.NewColorLogger(SubmitRewardsTreeColor), errorLog, m)
if err != nil {
return fmt.Errorf("error during rolling rewards tree check: %w", err)
}
}
/*processPenalties, err := newProcessPenalties(c, log.NewColorLogger(ProcessPenaltiesColor), errorLog)
if err != nil {
return fmt.Errorf("error during penalties check: %w", err)
}*/
generateRewardsTree, err := newGenerateRewardsTree(c, log.NewColorLogger(SubmitRewardsTreeColor), errorLog)
if err != nil {
return fmt.Errorf("error during manual tree generation check: %w", err)
}
cancelBondReductions, err := newCancelBondReductions(c, log.NewColorLogger(CancelBondsColor), errorLog, bondReductionCollector)
if err != nil {
return fmt.Errorf("error during bond reduction cancel check: %w", err)
}
checkSoloMigrations, err := newCheckSoloMigrations(c, log.NewColorLogger(CheckSoloMigrationsColor), errorLog, soloMigrationCollector)
if err != nil {
return fmt.Errorf("error during solo migration check: %w", err)
}
finalizePdaoProposals, err := newFinalizePdaoProposals(c, log.NewColorLogger(FinalizeProposalsColor))
if err != nil {
return fmt.Errorf("error creating finalize-pdao-proposals task: %w", err)
}
intervalDelta := maxTasksInterval - minTasksInterval
secondsDelta := intervalDelta.Seconds()
// Wait group to handle the various threads
wg := new(sync.WaitGroup)
wg.Add(2)
// Run task loop
isHoustonDeployedMasterFlag := false
go func() {
for {
// Randomize the next interval
randomSeconds := rand.Intn(int(secondsDelta))
interval := time.Duration(randomSeconds)*time.Second + minTasksInterval
// Check the EC status
err := services.WaitEthClientSynced(c, false) // Force refresh the primary / fallback EC status
if err != nil {
errorLog.Println(err)
time.Sleep(taskCooldown)
continue
}
// Check the BC status
err = services.WaitBeaconClientSynced(c, false) // Force refresh the primary / fallback BC status
if err != nil {
errorLog.Println(err)
time.Sleep(taskCooldown)
continue
}
// Get the Beacon block
//latestBlock, err := m.GetLatestFinalizedBeaconBlock()
latestBlock, err := m.GetLatestBeaconBlock()
if err != nil {
errorLog.Println(fmt.Errorf("error getting latest Beacon block: %w", err))
time.Sleep(taskCooldown)
continue
}
// Check if on the Oracle DAO
isOnOdao, err := isOnOracleDAO(rp, nodeAccount.Address, latestBlock)
if err != nil {
errorLog.Println(err)
time.Sleep(taskCooldown)
continue
}
// Run the manual rewards tree generation
if err := generateRewardsTree.run(); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
if isOnOdao {
// Run the challenge check
if err := respondChallenges.run(); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
// Update the network state
state, err := updateNetworkState(m, &updateLog, latestBlock)
if err != nil {
errorLog.Println(err)
time.Sleep(taskCooldown)
continue
}
// Check for Houston
if !isHoustonDeployedMasterFlag && state.IsHoustonDeployed {
printHoustonMessage(&updateLog)
isHoustonDeployedMasterFlag = true
}
// Run the network balance submission check
if err := submitNetworkBalances.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
if !useRollingRecords {
// Run the rewards tree submission check
if err := submitRewardsTree_Stateless.Run(isOnOdao, state, latestBlock.Slot); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
} else {
// Run the network balance and rewards tree submission check
if err := submitRewardsTree_Rolling.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
}
// Run the price submission check
if err := submitRplPrice.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
// Run the minipool dissolve check
if err := dissolveTimedOutMinipools.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
if state.IsHoustonDeployed {
// Run the finalize proposals check
if err := finalizePdaoProposals.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
}
// Run the minipool scrub check
if err := submitScrubMinipools.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
// Run the bond cancel check
if err := cancelBondReductions.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
// Run the solo migration check
if err := checkSoloMigrations.run(state); err != nil {
errorLog.Println(err)
}
/*time.Sleep(taskCooldown)
// Run the fee recipient penalty check
if err := processPenalties.run(); err != nil {
errorLog.Println(err)
}*/
// DISABLED until MEV-Boost can support it
} else {
/*
*/
if !useRollingRecords {
// Run the rewards tree submission check
if err := submitRewardsTree_Stateless.Run(isOnOdao, nil, latestBlock.Slot); err != nil {
errorLog.Println(err)
}
} else {
// Run the network balance and rewards tree submission check
if err := submitRewardsTree_Rolling.run(nil); err != nil {
errorLog.Println(err)
}
}
}
time.Sleep(interval)
}
wg.Done()
}()
// Run metrics loop
go func() {
err := runMetricsServer(c, log.NewColorLogger(MetricsColor), scrubCollector, bondReductionCollector, soloMigrationCollector)
if err != nil {
errorLog.Println(err)
}
wg.Done()
}()
// Wait for both threads to stop
wg.Wait()
return nil
}
// Configure HTTP transport settings
func configureHTTP() {
// The daemon makes a large number of concurrent RPC requests to the Eth1 client
// The HTTP transport is set to cache connections for future re-use equal to the maximum expected number of concurrent requests
// This prevents issues related to memory consumption and address allowance from repeatedly opening and closing connections
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = MaxConcurrentEth1Requests
}
// Update the latest network state at each cycle
func updateNetworkState(m *state.NetworkStateManager, log *log.ColorLogger, block beacon.BeaconBlock) (*state.NetworkState, error) {
log.Print("Getting latest network state... ")
// Get the state of the network
state, err := m.GetStateForSlot(block.Slot)
if err != nil {
return nil, fmt.Errorf("error getting network state: %w", err)
}
return state, nil
}
// Check if this node is on the Oracle DAO
func isOnOracleDAO(rp *rocketpool.RocketPool, nodeAddress common.Address, block beacon.BeaconBlock) (bool, error) {
opts := &bind.CallOpts{
BlockNumber: big.NewInt(0).SetUint64(block.ExecutionBlockNumber),
}
nodeTrusted, err := trustednode.GetMemberExists(rp, nodeAddress, opts)
if err != nil {
return false, fmt.Errorf("error checking if node is in the Oracle DAO for Beacon block %d, EL block %d: %w", block.Slot, block.ExecutionBlockNumber, err)
}
return nodeTrusted, nil
}
// Check if Houston has been deployed yet
func printHoustonMessage(log *log.ColorLogger) {
log.Println(`
* .
* / \
* |.'.|
* |'.'|
* ,'| |'.
* |,-'-|-'-.|
* __|_| | _ _ _____ _
* | ___ \| | | | | | ___ \ | |
* | |_/ /|__ ___| | _____| |_ | |_/ /__ ___ | |
* | // _ \ / __| |/ / _ \ __| | __/ _ \ / _ \| |
* | |\ \ (_) | (__| < __/ |_ | | | (_) | (_) | |
* \_| \_\___/ \___|_|\_\___|\__| \_| \___/ \___/|_|
* +---------------------------------------------------+
* | DECENTRALISED STAKING PROTOCOL FOR ETHEREUM |
* +---------------------------------------------------+
*
* =============== Houston has launched! ===============
`)
}