/
eth2_validator_balance.go
160 lines (140 loc) · 5.11 KB
/
eth2_validator_balance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package task_syncer
import (
"fmt"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stafiprotocol/eth2-balance-service/dao"
"github.com/stafiprotocol/eth2-balance-service/pkg/utils"
"github.com/stafiprotocol/eth2-balance-service/shared/beacon"
"github.com/stafiprotocol/eth2-balance-service/shared/types"
"gorm.io/gorm"
)
// get staked validator info from beacon on target slot, and update balance/effective balance
func (task *Task) syncValidatorEpochBalances() error {
beaconHead, err := task.connection.Eth2BeaconHead()
if err != nil {
return err
}
finalEpoch := beaconHead.FinalizedEpoch
eth2ValidatorInfoSyncerMetaData, err := dao.GetMetaData(task.db, utils.MetaTypeEth2ValidatorInfoSyncer)
if err != nil {
return err
}
eth2BlockSyncerMetaData, err := dao.GetMetaData(task.db, utils.MetaTypeEth2BlockSyncer)
if err != nil {
return err
}
// ensure validators latest info already synced
if finalEpoch > eth2ValidatorInfoSyncerMetaData.DealedEpoch {
finalEpoch = eth2ValidatorInfoSyncerMetaData.DealedEpoch
}
// ensure validators block info(withdrawals) already synced
if finalEpoch > eth2BlockSyncerMetaData.DealedEpoch {
finalEpoch = eth2BlockSyncerMetaData.DealedEpoch
}
eth2ValidatorBalanceMetaData, err := dao.GetMetaData(task.db, utils.MetaTypeEth2ValidatorBalanceSyncer)
if err != nil {
return err
}
// no need fetch new balance
if finalEpoch <= eth2ValidatorBalanceMetaData.DealedEpoch {
return nil
}
for epoch := eth2ValidatorBalanceMetaData.DealedEpoch + 1; epoch <= finalEpoch; epoch++ {
// we fetch epoch info every 75 epoch
if epoch%task.rewardEpochInterval != 0 {
continue
}
validatorList, err := dao.GetValidatorListActiveEpochBefore(task.db, epoch)
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"dealedEpoch": eth2ValidatorBalanceMetaData.DealedEpoch,
"willDealEpoch": epoch,
"willDealValidatorListLen": len(validatorList),
}).Debug("syncValidatorEpochBalances")
// should skip if no validator
if len(validatorList) == 0 {
eth2ValidatorBalanceMetaData.DealedEpoch = epoch
err = dao.UpOrInMetaData(task.db, eth2ValidatorBalanceMetaData)
if err != nil {
return err
}
continue
}
pubkeys := make([]types.ValidatorPubkey, 0)
pubkeyToNodeAddress := make(map[string]string)
nodeAddressMap := make(map[string]struct{})
pubkeyToIndex := make(map[string]uint64)
for _, validator := range validatorList {
pubkey, err := types.HexToValidatorPubkey(validator.Pubkey[2:])
if err != nil {
return err
}
pubkeys = append(pubkeys, pubkey)
pubkeyToNodeAddress[validator.Pubkey] = validator.NodeAddress
nodeAddressMap[validator.NodeAddress] = struct{}{}
pubkeyToIndex[validator.Pubkey] = validator.ValidatorIndex
}
willUsePubkeys := pubkeys
var validatorStatusMap map[types.ValidatorPubkey]beacon.ValidatorStatus
switch task.version {
case utils.V1, utils.V2, utils.Dev:
validatorStatusMap, err = task.connection.GetValidatorStatuses(willUsePubkeys, &beacon.ValidatorStatusOptions{
Epoch: &epoch,
})
if err != nil {
return errors.Wrap(err, "syncValidatorEpochBalances GetValidatorStatuses failed")
}
default:
return fmt.Errorf("unsupported version %s", task.version)
}
logrus.WithFields(logrus.Fields{
"validatorStatuses len": len(validatorStatusMap),
}).Debug("validator statuses")
if len(validatorStatusMap) != len(willUsePubkeys) {
return fmt.Errorf("validatorStatusMap len: %d not equal pubkeys len: %d", len(validatorStatusMap), len(willUsePubkeys))
}
for pubkey, status := range validatorStatusMap {
pubkeyStr := hexutil.Encode(pubkey.Bytes())
if !status.Exists {
return fmt.Errorf("should exist status on beacon chain, pubkey: %s, epoch: %d", pubkeyStr, epoch)
}
validatorIndex, exist := pubkeyToIndex[pubkeyStr]
if !exist {
return fmt.Errorf("validator index not exist in pubkeyToIndex")
}
nodeAddress, exist := pubkeyToNodeAddress[pubkeyStr]
if !exist {
return fmt.Errorf("node address not exist in pubkeyToNodeAddress")
}
totalWithdrawal, err := dao.GetValidatorTotalWithdrawalBeforeSlot(task.db, validatorIndex, utils.StartSlotOfEpoch(task.eth2Config, epoch))
if err != nil {
return errors.Wrap(err, "GetValidatorTotalWithdrawalBeforeSlot failed")
}
validatorBalance, err := dao.GetValidatorBalance(task.db, validatorIndex, epoch)
if err != nil && err != gorm.ErrRecordNotFound {
return err
}
validatorBalance.NodeAddress = nodeAddress
validatorBalance.Balance = status.Balance
validatorBalance.TotalWithdrawal = totalWithdrawal
validatorBalance.EffectiveBalance = status.EffectiveBalance
validatorBalance.Epoch = epoch
validatorBalance.ValidatorIndex = validatorIndex
validatorBalance.Timestamp = utils.StartTimestampOfEpoch(task.eth2Config, epoch)
err = dao.UpOrInValidatorBalance(task.db, validatorBalance)
if err != nil {
return err
}
}
eth2ValidatorBalanceMetaData.DealedEpoch = epoch
err = dao.UpOrInMetaData(task.db, eth2ValidatorBalanceMetaData)
if err != nil {
return err
}
}
return nil
}