/
node.go
409 lines (353 loc) · 11.8 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
package node
import (
"fmt"
"math/big"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/fatih/color"
"github.com/urfave/cli"
"github.com/rocket-pool/smartnode/rocketpool/node/collectors"
"github.com/rocket-pool/smartnode/shared/services"
"github.com/rocket-pool/smartnode/shared/services/alerting"
"github.com/rocket-pool/smartnode/shared/services/state"
"github.com/rocket-pool/smartnode/shared/services/wallet/keystore/lighthouse"
"github.com/rocket-pool/smartnode/shared/services/wallet/keystore/nimbus"
"github.com/rocket-pool/smartnode/shared/services/wallet/keystore/prysm"
"github.com/rocket-pool/smartnode/shared/services/wallet/keystore/teku"
"github.com/rocket-pool/smartnode/shared/utils/log"
)
// Config
var tasksInterval, _ = time.ParseDuration("5m")
var taskCooldown, _ = time.ParseDuration("10s")
var totalEffectiveStakeCooldown, _ = time.ParseDuration("1h")
const (
MaxConcurrentEth1Requests = 200
StakePrelaunchMinipoolsColor = color.FgBlue
DownloadRewardsTreesColor = color.FgGreen
MetricsColor = color.FgHiYellow
ManageFeeRecipientColor = color.FgHiCyan
PromoteMinipoolsColor = color.FgMagenta
ReduceBondAmountColor = color.FgHiBlue
DefendPdaoPropsColor = color.FgYellow
VerifyPdaoPropsColor = color.FgYellow
DistributeMinipoolsColor = color.FgHiGreen
ErrorColor = color.FgRed
WarningColor = color.FgYellow
UpdateColor = color.FgHiWhite
)
// Register node command
func RegisterCommands(app *cli.App, name string, aliases []string) {
app.Commands = append(app.Commands, cli.Command{
Name: name,
Aliases: aliases,
Usage: "Run Rocket Pool node activity daemon",
Action: func(c *cli.Context) error {
return run(c)
},
})
}
// Run daemon
func run(c *cli.Context) error {
// Handle the initial fee recipient file deployment
err := deployDefaultFeeRecipientFile(c)
if err != nil {
return err
}
// Clean up old fee recipient files
err = removeLegacyFeeRecipientFiles(c)
if err != nil {
return err
}
// Configure
configureHTTP()
// Wait until node is registered
if err := services.WaitNodeRegistered(c, true); err != nil {
return err
}
// Get services
cfg, err := services.GetConfig(c)
if err != nil {
return err
}
rp, err := services.GetRocketPool(c)
if err != nil {
return err
}
w, err := services.GetWallet(c)
if err != nil {
return err
}
bc, err := services.GetBeaconClient(c)
if err != nil {
return err
}
// Print the current mode
if cfg.IsNativeMode {
fmt.Println("Starting node daemon in Native Mode.")
} else {
fmt.Println("Starting node daemon in Docker Mode.")
}
nodeAccount, err := w.GetNodeAccount()
if err != nil {
return fmt.Errorf("error getting node account: %w", err)
}
// Initialize loggers
errorLog := log.NewColorLogger(ErrorColor)
updateLog := log.NewColorLogger(UpdateColor)
// Create the state manager
m, err := state.NewNetworkStateManager(rp, cfg, rp.Client, bc, &updateLog)
if err != nil {
return err
}
stateLocker := collectors.NewStateLocker()
// Initialize tasks
manageFeeRecipient, err := newManageFeeRecipient(c, log.NewColorLogger(ManageFeeRecipientColor))
if err != nil {
return err
}
distributeMinipools, err := newDistributeMinipools(c, log.NewColorLogger(DistributeMinipoolsColor))
if err != nil {
return err
}
stakePrelaunchMinipools, err := newStakePrelaunchMinipools(c, log.NewColorLogger(StakePrelaunchMinipoolsColor))
if err != nil {
return err
}
promoteMinipools, err := newPromoteMinipools(c, log.NewColorLogger(PromoteMinipoolsColor))
if err != nil {
return err
}
downloadRewardsTrees, err := newDownloadRewardsTrees(c, log.NewColorLogger(DownloadRewardsTreesColor))
if err != nil {
return err
}
reduceBonds, err := newReduceBonds(c, log.NewColorLogger(ReduceBondAmountColor))
if err != nil {
return err
}
defendPdaoProps, err := newDefendPdaoProps(c, log.NewColorLogger(DefendPdaoPropsColor))
if err != nil {
return err
}
var verifyPdaoProps *verifyPdaoProps
// Make sure the user opted into this duty
verifyEnabled := cfg.Smartnode.VerifyProposals.Value.(bool)
if verifyEnabled {
verifyPdaoProps, err = newVerifyPdaoProps(c, log.NewColorLogger(VerifyPdaoPropsColor))
if err != nil {
return err
}
}
// Wait group to handle the various threads
wg := new(sync.WaitGroup)
wg.Add(2)
// Timestamp for caching total effective RPL stake
lastTotalEffectiveStakeTime := time.Unix(0, 0)
// Run task loop
isHoustonDeployedMasterFlag := false
go func() {
// we assume clients are synced on startup so that we don't send unnecessary alerts
wasExecutionClientSynced := true
wasBeaconClientSynced := true
for {
// Check the EC status
err := services.WaitEthClientSynced(c, false) // Force refresh the primary / fallback EC status
if err != nil {
wasExecutionClientSynced = false
errorLog.Printlnf("Execution client not synced: %s. Waiting for sync...", err.Error())
time.Sleep(taskCooldown)
continue
}
if !wasExecutionClientSynced {
updateLog.Println("Execution client is now synced.")
wasExecutionClientSynced = true
alerting.AlertExecutionClientSyncComplete(cfg)
}
// Check the BC status
err = services.WaitBeaconClientSynced(c, false) // Force refresh the primary / fallback BC status
if err != nil {
// NOTE: if not synced, it returns an error - so there isn't necessarily an underlying issue
wasBeaconClientSynced = false
errorLog.Printlnf("Beacon client not synced: %s. Waiting for sync...", err.Error())
time.Sleep(taskCooldown)
continue
}
if !wasBeaconClientSynced {
updateLog.Println("Beacon client is now synced.")
wasBeaconClientSynced = true
alerting.AlertBeaconClientSyncComplete(cfg)
}
// Update the network state
updateTotalEffectiveStake := false
if time.Since(lastTotalEffectiveStakeTime) > totalEffectiveStakeCooldown {
updateTotalEffectiveStake = true
lastTotalEffectiveStakeTime = time.Now() // Even if the call below errors out, this will prevent contant errors related to this flag
}
state, totalEffectiveStake, err := updateNetworkState(m, &updateLog, nodeAccount.Address, updateTotalEffectiveStake)
if err != nil {
errorLog.Println(err)
time.Sleep(taskCooldown)
continue
}
stateLocker.UpdateState(state, totalEffectiveStake)
// Check for Houston
if !isHoustonDeployedMasterFlag && state.IsHoustonDeployed {
printHoustonMessage(&updateLog)
isHoustonDeployedMasterFlag = true
}
// Manage the fee recipient for the node
if err := manageFeeRecipient.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
// Run the rewards download check
if err := downloadRewardsTrees.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
if state.IsHoustonDeployed {
// Run the pDAO proposal defender
if err := defendPdaoProps.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
// Run the pDAO proposal verifier
if verifyPdaoProps != nil {
if err := verifyPdaoProps.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
}
}
// Run the minipool stake check
if err := stakePrelaunchMinipools.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
// Run the balance distribution check
if err := distributeMinipools.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
// Run the reduce bond check
if err := reduceBonds.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
// Run the minipool promotion check
if err := promoteMinipools.run(state); err != nil {
errorLog.Println(err)
}
time.Sleep(tasksInterval)
}
wg.Done()
}()
// Run metrics loop
go func() {
err := runMetricsServer(c, log.NewColorLogger(MetricsColor), stateLocker)
if err != nil {
errorLog.Println(err)
}
wg.Done()
}()
// Wait for both threads to stop
wg.Wait()
return nil
}
// Configure HTTP transport settings
func configureHTTP() {
// The daemon makes a large number of concurrent RPC requests to the Eth1 client
// The HTTP transport is set to cache connections for future re-use equal to the maximum expected number of concurrent requests
// This prevents issues related to memory consumption and address allowance from repeatedly opening and closing connections
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = MaxConcurrentEth1Requests
}
// Copy the default fee recipient file into the proper location
func deployDefaultFeeRecipientFile(c *cli.Context) error {
cfg, err := services.GetConfig(c)
if err != nil {
return err
}
feeRecipientPath := cfg.Smartnode.GetFeeRecipientFilePath()
_, err = os.Stat(feeRecipientPath)
if os.IsNotExist(err) {
// Make sure the validators dir is created
validatorsFolder := filepath.Dir(feeRecipientPath)
err = os.MkdirAll(validatorsFolder, 0755)
if err != nil {
return fmt.Errorf("could not create validators directory: %w", err)
}
// Create the file
var defaultFeeRecipientFileContents string
if cfg.IsNativeMode {
// Native mode needs an environment variable definition
defaultFeeRecipientFileContents = fmt.Sprintf("FEE_RECIPIENT=%s", cfg.Smartnode.GetRethAddress().Hex())
} else {
// Docker and Hybrid just need the address itself
defaultFeeRecipientFileContents = cfg.Smartnode.GetRethAddress().Hex()
}
err := os.WriteFile(feeRecipientPath, []byte(defaultFeeRecipientFileContents), 0664)
if err != nil {
return fmt.Errorf("could not write default fee recipient file to %s: %w", feeRecipientPath, err)
}
} else if err != nil {
return fmt.Errorf("Error checking fee recipient file status: %w", err)
}
return nil
}
// Remove the old fee recipient files that were created in v1.5.0
func removeLegacyFeeRecipientFiles(c *cli.Context) error {
legacyFeeRecipientFile := "rp-fee-recipient.txt"
cfg, err := services.GetConfig(c)
if err != nil {
return err
}
validatorsFolder := cfg.Smartnode.GetValidatorKeychainPath()
// Remove the legacy files
keystoreDirs := []string{lighthouse.KeystoreDir, nimbus.KeystoreDir, prysm.KeystoreDir, teku.KeystoreDir}
for _, keystoreDir := range keystoreDirs {
oldFile := filepath.Join(validatorsFolder, keystoreDir, legacyFeeRecipientFile)
_, err = os.Stat(oldFile)
if !os.IsNotExist(err) {
err = os.Remove(oldFile)
if err != nil {
fmt.Printf("NOTE: Couldn't remove old fee recipient file (%s): %s\nThis file is no longer used, you may remove it manually if you wish.\n", oldFile, err.Error())
}
}
}
return nil
}
// Update the latest network state at each cycle
func updateNetworkState(m *state.NetworkStateManager, log *log.ColorLogger, nodeAddress common.Address, calculateTotalEffectiveStake bool) (*state.NetworkState, *big.Int, error) {
// Get the state of the network
state, totalEffectiveStake, err := m.GetHeadStateForNode(nodeAddress, calculateTotalEffectiveStake)
if err != nil {
return nil, nil, fmt.Errorf("error updating network state: %w", err)
}
return state, totalEffectiveStake, nil
}
// Check if Houston has been deployed yet
func printHoustonMessage(log *log.ColorLogger) {
log.Println(`
* .
* / \
* |.'.|
* |'.'|
* ,'| |'.
* |,-'-|-'-.|
* __|_| | _ _ _____ _
* | ___ \| | | | | | ___ \ | |
* | |_/ /|__ ___| | _____| |_ | |_/ /__ ___ | |
* | // _ \ / __| |/ / _ \ __| | __/ _ \ / _ \| |
* | |\ \ (_) | (__| < __/ |_ | | | (_) | (_) | |
* \_| \_\___/ \___|_|\_\___|\__| \_| \___/ \___/|_|
* +---------------------------------------------------+
* | DECENTRALISED STAKING PROTOCOL FOR ETHEREUM |
* +---------------------------------------------------+
*
* =============== Houston has launched! ===============
`)
}