/
unresponsive_provider.go
265 lines (231 loc) · 10.6 KB
/
unresponsive_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package keeper
import (
"fmt"
"math"
"strconv"
"strings"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/utils"
epochstoragetypes "github.com/lavanet/lava/x/epochstorage/types"
"github.com/lavanet/lava/x/pairing/types"
)
const THRESHOLD_FACTOR = 4
// PunishUnresponsiveProviders punished unresponsive providers (current punishment: freeze)
func (k Keeper) PunishUnresponsiveProviders(ctx sdk.Context, epochsNumToCheckCUForUnresponsiveProvider, epochsNumToCheckCUForComplainers uint64) {
// check the epochsNum consts
if epochsNumToCheckCUForComplainers <= 0 || epochsNumToCheckCUForUnresponsiveProvider <= 0 {
utils.LavaFormatError("epoch to check CU for unresponsive provider or for complainer is zero",
fmt.Errorf("invalid unresponsive provider consts"),
utils.Attribute{Key: "epochsNumToCheckCUForUnresponsiveProvider", Value: epochsNumToCheckCUForUnresponsiveProvider},
utils.Attribute{Key: "epochsNumToCheckCUForComplainers", Value: epochsNumToCheckCUForComplainers},
)
}
// Get current epoch
currentEpoch := k.epochStorageKeeper.GetEpochStart(ctx)
// Get recommendedEpochNumToCollectPayment
recommendedEpochNumToCollectPayment := k.RecommendedEpochNumToCollectPayment(ctx)
// check which of the consts is larger
largerEpochsNumConst := epochsNumToCheckCUForComplainers
if epochsNumToCheckCUForUnresponsiveProvider > epochsNumToCheckCUForComplainers {
largerEpochsNumConst = epochsNumToCheckCUForUnresponsiveProvider
}
// To check for punishment, we have to go back:
// recommendedEpochNumToCollectPayment+
// max(epochsNumToCheckCUForComplainers,epochsNumToCheckCUForUnresponsiveProvider)
// epochs from the current epoch.
minHistoryBlock, err := k.getBlockEpochsAgo(ctx, currentEpoch, largerEpochsNumConst+recommendedEpochNumToCollectPayment)
if err != nil {
// not enough history, do nothing
return
}
// Get the current stake storages (from all chains).
// Stake storages contain a list of stake entries (each for a different chain).
providerStakeStorageList := k.getCurrentProviderStakeStorageList(ctx)
if len(providerStakeStorageList) == 0 {
// no provider is staked -> no one to punish
return
}
// Go back recommendedEpochNumToCollectPayment
minPaymentBlock, err := k.getBlockEpochsAgo(ctx, currentEpoch, recommendedEpochNumToCollectPayment)
if err != nil {
// not enough history, do nothiing
return
}
// find the minimum number of providers in all the plans
minProviders := uint64(math.MaxUint64)
planIndices := k.planKeeper.GetAllPlanIndices(ctx)
for _, index := range planIndices {
plan, found := k.planKeeper.FindPlan(ctx, index, uint64(ctx.BlockHeight()))
if found && plan.PlanPolicy.MaxProvidersToPair < minProviders {
minProviders = plan.PlanPolicy.MaxProvidersToPair
}
}
ProviderChainID := func(provider, chainID string) string {
return provider + " " + chainID
}
// check all supported providers from all geolocations prior to making decisions
existingProviders := map[string]uint64{}
stakeAppliedBlockProviders := map[string]uint64{}
for _, providerStakeStorage := range providerStakeStorageList {
providerStakeEntriesForChain := providerStakeStorage.GetStakeEntries()
// count providers per geolocation
for _, providerStakeEntry := range providerStakeEntriesForChain {
if !providerStakeEntry.IsFrozen() {
existingProviders[providerStakeEntry.GetChain()]++
stakeAppliedBlockProviders[ProviderChainID(providerStakeEntry.Address, providerStakeEntry.Chain)] = providerStakeEntry.StakeAppliedBlock
}
}
}
// Go over the staked provider entries (on all chains) that has complaints
// build a map that has all the relevant details: provider address, chain, epoch and ProviderEpochCu object
keys := []string{}
pecsDetailed := k.GetAllProviderEpochComplainerCuStore(ctx)
complainedProviders := map[string]map[uint64]types.ProviderEpochComplainerCu{} // map[provider chainID]map[epoch]ProviderEpochComplainerCu
for _, pec := range pecsDetailed {
if minHistoryBlock < stakeAppliedBlockProviders[pec.Provider] {
// this staked provider has too short history (either since staking
// or since it was last unfrozen) - do not consider for jailing
continue
}
key := ProviderChainID(pec.Provider, pec.ChainId)
if _, ok := complainedProviders[key]; !ok {
complainedProviders[key] = map[uint64]types.ProviderEpochComplainerCu{pec.Epoch: pec.ProviderEpochComplainerCu}
keys = append(keys, key)
} else {
if _, ok := complainedProviders[key][pec.Epoch]; !ok {
complainedProviders[key][pec.Epoch] = pec.ProviderEpochComplainerCu
} else {
utils.LavaFormatError("duplicate ProviderEpochCu key", fmt.Errorf("did not aggregate complainers CU"),
utils.LogAttr("key", types.ProviderEpochCuKey(pec.Epoch, pec.Provider, pec.ChainId)),
)
continue
}
}
}
// go over all the providers, count the complainers CU and punish providers
for _, key := range keys {
components := strings.Split(key, " ")
provider := components[0]
chainID := components[1]
// update the CU count for this provider in providerCuCounterForUnreponsivenessMap
epochs, complaintCU, servicedCU, err := k.countCuForUnresponsiveness(ctx, provider, chainID, minPaymentBlock, epochsNumToCheckCUForUnresponsiveProvider, epochsNumToCheckCUForComplainers, complainedProviders[key])
if err != nil {
utils.LavaFormatError("unstake unresponsive providers failed to count CU", err,
utils.Attribute{Key: "provider", Value: provider},
)
continue
}
// providerPaymentStorageKeyList is not empty -> provider should be punished
if len(epochs) != 0 && existingProviders[chainID] > minProviders {
err = k.punishUnresponsiveProvider(ctx, epochs, provider, chainID, complaintCU, servicedCU, complainedProviders[key])
existingProviders[chainID]--
if err != nil {
utils.LavaFormatError("unstake unresponsive providers failed to punish provider", err,
utils.Attribute{Key: "provider", Value: provider},
)
}
}
}
}
// getBlockEpochsAgo returns the block numEpochs back from the given blockHeight
func (k Keeper) getBlockEpochsAgo(ctx sdk.Context, blockHeight, numEpochs uint64) (uint64, error) {
for counter := 0; counter < int(numEpochs); counter++ {
var err error
blockHeight, err = k.epochStorageKeeper.GetPreviousEpochStartForBlock(ctx, blockHeight)
if err != nil {
// too early in the chain life: bail without an error
return uint64(0), err
}
}
return blockHeight, nil
}
// Function to count the CU serviced by the unresponsive provider and the CU of the complainers. The function returns the keys of the objects containing complainer CU
func (k Keeper) countCuForUnresponsiveness(ctx sdk.Context, provider, chainId string, epoch, epochsNumToCheckCUForUnresponsiveProvider, epochsNumToCheckCUForComplainers uint64, providerEpochCuMap map[uint64]types.ProviderEpochComplainerCu) (epochs []uint64, complainersCu uint64, servicedCu uint64, errRet error) {
// check which of the epoch consts is larger
max := epochsNumToCheckCUForComplainers
if epochsNumToCheckCUForUnresponsiveProvider > epochsNumToCheckCUForComplainers {
max = epochsNumToCheckCUForUnresponsiveProvider
}
// count the CU serviced by the unersponsive provider and used CU of the complainers
for counter := uint64(0); counter < max; counter++ {
pec, ok := providerEpochCuMap[epoch]
if ok {
// counter is smaller than epochsNumToCheckCUForComplainers -> count complainer CU
if counter < epochsNumToCheckCUForComplainers {
complainersCu += pec.ComplainersCu
epochs = append(epochs, epoch)
}
// counter is smaller than epochsNumToCheckCUForUnresponsiveProvider -> count CU serviced by the provider in the epoch
if counter < epochsNumToCheckCUForUnresponsiveProvider {
pec, found := k.GetProviderEpochCu(ctx, epoch, provider, chainId)
if found {
servicedCu += pec.ServicedCu
}
}
}
// Get previous epoch (from epochTemp)
previousEpoch, err := k.epochStorageKeeper.GetPreviousEpochStartForBlock(ctx, epoch)
if err != nil {
return nil, 0, 0, utils.LavaFormatWarning("couldn't get previous epoch", err,
utils.Attribute{Key: "epoch", Value: epoch},
)
}
// update epoch
epoch = previousEpoch
}
// the complainers' CU is larger than the provider serviced CU -> should be punished
// epochs list is returned so the complainers' CU can be reset
if complainersCu > THRESHOLD_FACTOR*servicedCu {
return epochs, complainersCu, servicedCu, nil
}
return nil, complainersCu, servicedCu, nil
}
// Function that return the current stake storage for all chains
func (k Keeper) getCurrentProviderStakeStorageList(ctx sdk.Context) []epochstoragetypes.StakeStorage {
var stakeStorageList []epochstoragetypes.StakeStorage
// get all chain IDs
chainIdList := k.specKeeper.GetAllChainIDs(ctx)
// go over all chain IDs and keep their stake storage. If there is no stake storage for a specific chain, continue to the next one
for _, chainID := range chainIdList {
stakeStorage, found := k.epochStorageKeeper.GetStakeStorageCurrent(ctx, chainID)
if !found {
continue
}
stakeStorageList = append(stakeStorageList, stakeStorage)
}
return stakeStorageList
}
// Function that punishes providers. Current punishment is freeze
func (k Keeper) punishUnresponsiveProvider(ctx sdk.Context, epochs []uint64, provider, chainID string, complaintCU uint64, servicedCU uint64, providerEpochCuMap map[uint64]types.ProviderEpochComplainerCu) error {
// freeze the unresponsive provider
err := k.FreezeProvider(ctx, provider, []string{chainID}, "unresponsiveness")
if err != nil {
utils.LavaFormatError("unable to freeze provider entry due to unresponsiveness", err,
utils.Attribute{Key: "provider", Value: provider},
utils.Attribute{Key: "chainID", Value: chainID},
)
}
utils.LogLavaEvent(ctx, k.Logger(ctx), types.ProviderJailedEventName,
map[string]string{
"provider_address": provider,
"chain_id": chainID,
"complaint_cu": strconv.FormatUint(complaintCU, 10),
"serviced_cu": strconv.FormatUint(servicedCU, 10),
},
"Unresponsive provider was freezed due to unresponsiveness")
// reset the provider's complainer CU (so he won't get punished for the same complaints twice)
k.resetComplainersCU(ctx, epochs, provider, chainID, providerEpochCuMap)
return nil
}
// resetComplainersCU resets the complainers CU for a specific provider and chain
func (k Keeper) resetComplainersCU(ctx sdk.Context, epochs []uint64, provider string, chainID string, providerEpochCuMap map[uint64]types.ProviderEpochComplainerCu) {
for _, epoch := range epochs {
pec, ok := providerEpochCuMap[epoch]
if !ok {
continue
}
// reset the complainer CU
pec.ComplainersCu = 0
k.SetProviderEpochComplainerCu(ctx, epoch, provider, chainID, pec)
}
}