-
Notifications
You must be signed in to change notification settings - Fork 205
/
tx_sender.go
374 lines (341 loc) · 15.8 KB
/
tx_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
package statetracker
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/tx"
sdk "github.com/cosmos/cosmos-sdk/types"
typestx "github.com/cosmos/cosmos-sdk/types/tx"
commontypes "github.com/lavanet/lava/common/types"
"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/protocol/rpcprovider/reliabilitymanager"
"github.com/lavanet/lava/utils"
conflicttypes "github.com/lavanet/lava/x/conflict/types"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
)
const (
defaultGasPrice = "0.000000001" + commontypes.TokenDenom
DefaultGasAdjustment = "1000.0"
// same account can continue failing the more providers you have under the same account
// for example if you have a provider staked at 20 chains you will ask for 20 payments per epoch.
// therefore currently our best solution is to continue retrying increasing sequence number until successful
RETRY_INCORRECT_SEQUENCE = 100
)
type TxSender struct {
txFactory tx.Factory
clientCtx client.Context
}
func NewTxSender(ctx context.Context, clientCtx client.Context, txFactory tx.Factory) (ret *TxSender, err error) {
// set up the rpcClient, and factory necessary to make queries
clientCtx.SkipConfirm = true
ts := &TxSender{txFactory: txFactory, clientCtx: clientCtx}
return ts, nil
}
func (ts *TxSender) checkProfitability(simResult *typestx.SimulateResponse, gasUsed uint64, txFactory tx.Factory) error {
txEvents := simResult.GetResult().Events
lavaReward := sdk.NewCoin(commontypes.TokenDenom, sdk.NewInt(0))
for _, txEvent := range txEvents {
if txEvent.Type == utils.EventPrefix+pairingtypes.RelayPaymentEventName {
for _, attribute := range txEvent.Attributes {
eventStr := attribute.Key
eventStr = strings.SplitN(eventStr, ".", 2)[0]
if eventStr == "BasePay" {
lavaRewardTemp, err := sdk.ParseCoinNormalized(attribute.Value)
if err != nil {
return utils.LavaFormatError("failed parsing simulation result", nil, utils.Attribute{Key: "attribute", Value: attribute.Value})
}
lavaReward = lavaReward.Add(lavaRewardTemp)
break
}
}
}
}
txFactory = txFactory.WithGas(gasUsed)
gasFee := txFactory.GasPrices()[0]
gasFee.Amount = gasFee.Amount.MulInt64(int64(gasUsed))
lavaRewardDec := sdk.NewDecCoinFromCoin(lavaReward)
if gasFee.IsGTE(lavaRewardDec) {
return utils.LavaFormatError("lava_relay_payment claim is not profitable", nil, utils.Attribute{Key: "gasFee", Value: gasFee}, utils.Attribute{Key: "lava_reward:", Value: lavaRewardDec})
}
return nil
}
func (ts *TxSender) SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg sdk.Msg, checkProfitability bool) error {
txfactory := ts.txFactory.WithGasPrices(defaultGasPrice)
if err := msg.ValidateBasic(); err != nil {
return err
}
clientCtx := ts.clientCtx
txfactory, err := ts.prepareFactory(txfactory)
if err != nil {
return err
}
success := false
idx := -1
sequenceNumberParsed := 0
latestResult := common.TxResultData{}
var gasUsed uint64
for ; idx < RETRY_INCORRECT_SEQUENCE && !success; idx++ {
utils.LavaFormatDebug("Attempting to send relay payment transaction", utils.LogAttr("index", idx+1))
txfactory, gasUsed, err = ts.simulateTxWithRetry(clientCtx, txfactory, msg)
if err != nil {
return utils.LavaFormatError("Failed Simulating transaction", err)
}
// incase we got an error the tx result is basically the error
latestResult, err = ts.SendTxAndVerifyCommit(txfactory, msg)
transactionResult := latestResult.RawLog
if err == nil { // if we get some other code which isn't 0 then keep retrying
success = true
break
} else if strings.Contains(transactionResult, "out of gas") {
utils.LavaFormatInfo("Transaction got out of gas error, retrying next block.")
} else {
txfactory, err = ts.parseTxErrorsAndTryGettingANewFactory(transactionResult, clientCtx, txfactory, gasUsed)
if err != nil {
return utils.LavaFormatError("Failed getting a new tx factory", err)
}
// else continue with the new factory
}
utils.LavaFormatDebug("Failed sending transaction, will retry", utils.LogAttr("Index", idx), utils.LogAttr("reason:", err), utils.LogAttr("rawLog", transactionResult))
}
if !success {
return utils.LavaFormatError("Failed sending transaction with all retries and giving up", nil, utils.Attribute{Key: "result", Value: latestResult}, utils.Attribute{Key: "Number Of Retries executed", Value: idx}, utils.Attribute{Key: "Parsed Sequence", Value: sequenceNumberParsed})
}
utils.LavaFormatInfo("Succeeded sending transaction", utils.Attribute{Key: "hash", Value: hex.EncodeToString(latestResult.Txhash)})
return nil
}
func (ts *TxSender) getSequenceNumberFromErrorOrClient(clientCtx client.Context, errString string) (uint64, error) {
sequenceNumberParsed, err := common.FindSequenceNumber(errString)
if err != nil {
utils.LavaFormatWarning("getSequenceNumberFromErrorOrClient: Failed to findSequenceNumber, fetching from client", err)
_, seq, err := clientCtx.AccountRetriever.GetAccountNumberSequence(clientCtx, clientCtx.GetFromAddress())
if err != nil {
return 0, utils.LavaFormatError("failed to get correct sequence number for account, give up", err)
}
return seq, nil
}
return uint64(sequenceNumberParsed), nil
}
// this method will attempt to create a new tx factory from a sequence number error provided the expected sequence number is inside the error string
// the new factory should succeed in executing the tx
func (ts *TxSender) getNewFactoryFromASequenceNumberError(errString string, txfactory tx.Factory, clientCtx client.Context) (tx.Factory, error) {
sequence, errSequence := ts.getSequenceNumberFromErrorOrClient(clientCtx, errString)
if errSequence != nil {
return txfactory, errSequence
}
utils.LavaFormatDebug("Retrying with new sequence factory", utils.LogAttr("sequence", sequence))
return txfactory.WithSequence(sequence), nil
}
// trying to fetch sequence number required from the tx error and creates a new factory using this sequence
func (ts *TxSender) parseTxErrorsAndTryGettingANewFactory(txResultString string, clientCtx client.Context, txfactory tx.Factory, gasUsed uint64) (tx.Factory, error) {
if strings.Contains(txResultString, "account sequence") { // case for more than one tx in a block
utils.LavaFormatInfo("Identified account sequence reason, attempting to run a new simulation with the correct sequence number")
return ts.getNewFactoryFromASequenceNumberError(txResultString, txfactory, clientCtx)
} else if strings.Contains(txResultString, "insufficient fees; got:") { // handle a case where node minimum gas fees is misconfigured
return ts.txFactory, parseInsufficientFeesError(txResultString, gasUsed)
}
return txfactory, fmt.Errorf(txResultString)
}
func (ts *TxSender) simulateTxWithRetry(clientCtx client.Context, txfactory tx.Factory, msg sdk.Msg) (tx.Factory, uint64, error) {
for retrySimulation := 0; retrySimulation < RETRY_INCORRECT_SEQUENCE; retrySimulation++ {
utils.LavaFormatDebug("Running Simulation", utils.LogAttr("idx", retrySimulation))
_, gasUsed, err := tx.CalculateGas(clientCtx, txfactory, msg)
if err != nil {
utils.LavaFormatInfo("Simulation failed", utils.LogAttr("reason:", err))
errString := err.Error()
var errParsed error
txfactory, errParsed = ts.parseTxErrorsAndTryGettingANewFactory(errString, clientCtx, txfactory, gasUsed)
if errParsed != nil {
return txfactory, 0, errParsed
}
continue // we errored, we will retry if parseTxErrors managed to get a new factory
}
txfactory = txfactory.WithGas(gasUsed)
return txfactory, gasUsed, nil
}
return txfactory, 0, utils.LavaFormatError("Failed Calculating gas for reward transaction", nil)
}
func (ts *TxSender) SendTxAndVerifyCommit(txfactory tx.Factory, msg sdk.Msg) (parsedResult common.TxResultData, err error) {
myWriter := bytes.Buffer{}
clientCtx := ts.clientCtx
clientCtx.Output = &myWriter
clientCtx.OutputFormat = "json"
err = tx.GenerateOrBroadcastTxWithFactory(clientCtx, txfactory, msg)
if err != nil {
utils.LavaFormatWarning("Sending CheckProfitabilityAndBroadCastTx failed", err, utils.Attribute{Key: "msg", Value: msg})
return common.TxResultData{}, err
}
jsonParsedResult := map[string]any{}
err = json.Unmarshal(myWriter.Bytes(), &jsonParsedResult)
if err != nil {
return common.TxResultData{}, utils.LavaFormatInfo("Failed unmarshaling transaction results", utils.Attribute{Key: "transactionResult", Value: myWriter.String()})
}
myWriter.Reset()
if debug {
utils.LavaFormatDebug("transaction results", utils.Attribute{Key: "jsonParsedResult", Value: jsonParsedResult})
}
resultData, err := common.ParseTransactionResult(jsonParsedResult)
utils.LavaFormatInfo("Sent Transaction", utils.LogAttr("Hash", hex.EncodeToString(resultData.Txhash)))
if err != nil {
return common.TxResultData{}, err
}
if resultData.Code != 0 {
return resultData, utils.LavaFormatInfo("Failed sending transaction, code is not 0", utils.Attribute{Key: "resultData", Value: resultData})
}
// now that our Tx was sent to the mempool successfully, we want to see it's result on chain
resultData, err = ts.waitForTxCommit(resultData)
return resultData, err
}
func (ts *TxSender) waitForTxCommit(resultData common.TxResultData) (common.TxResultData, error) {
clientCtx := ts.clientCtx
txResultChan := make(chan *coretypes.ResultTx)
guid := utils.GenerateUniqueIdentifier()
// check consumer session manager
timeOutReached := false
go func() {
for {
// we will never catch the tx hash in the first attempt as not enough time have passed, so we sleep at the beginning of the loop
time.Sleep(5 * time.Second)
if timeOutReached {
utils.LavaFormatWarning("Timeout waiting for transaction", nil, utils.LogAttr("hash", resultData.Txhash))
return
}
ctx, cancel := context.WithTimeout(utils.WithUniqueIdentifier(context.Background(), guid), 5*time.Second)
result, err := clientCtx.Client.Tx(ctx, resultData.Txhash, false)
cancel()
if err == nil {
utils.LavaFormatDebug("Tx Found successfully on chain!", utils.LogAttr("Hash", hex.EncodeToString(resultData.Txhash)))
txResultChan <- result
return
}
utils.LavaFormatDebug("Keep Waiting tx results...", utils.LogAttr("reason", err))
if debug {
utils.LavaFormatWarning("Tx query got error", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "resultData", Value: resultData})
}
}
}()
select {
case txRes := <-txResultChan:
resultData = common.TxResultData{
RawLog: txRes.TxResult.Log,
Txhash: resultData.Txhash,
Code: int(txRes.TxResult.Code),
}
utils.LavaFormatDebug("Tx Hash found on blockchain", utils.LogAttr("Txhash", hex.EncodeToString(resultData.Txhash)), utils.LogAttr("Code", resultData.Code))
break
case <-time.After(5 * time.Minute):
timeOutReached = true
return common.TxResultData{}, utils.LavaFormatError("failed sending tx, wasn't found after timeout", nil, utils.Attribute{Key: "hash", Value: hex.EncodeToString(resultData.Txhash)})
}
// we found the tx on chain and it failed
if resultData.Code != 0 {
return resultData, utils.LavaFormatInfo("Failed sending transaction, code is not 0", utils.Attribute{Key: "result", Value: resultData})
}
return resultData, nil
}
// this function is extracted from the tx package so that we can use it locally to set the tx factory correctly
func (ts *TxSender) prepareFactory(txf tx.Factory) (tx.Factory, error) {
clientCtx := ts.clientCtx
from := clientCtx.GetFromAddress()
if err := clientCtx.AccountRetriever.EnsureExists(clientCtx, from); err != nil {
return txf, err
}
initNum, initSeq := txf.AccountNumber(), txf.Sequence()
if initNum == 0 || initSeq == 0 {
num, seq, err := clientCtx.AccountRetriever.GetAccountNumberSequence(clientCtx, from)
if err != nil {
return txf, err
}
if initNum == 0 {
txf = txf.WithAccountNumber(num)
}
if initSeq == 0 {
txf = txf.WithSequence(seq)
}
}
return txf, nil
}
type ConsumerTxSender struct {
*TxSender
}
func NewConsumerTxSender(ctx context.Context, clientCtx client.Context, txFactory tx.Factory) (ret *ConsumerTxSender, err error) {
txSender, err := NewTxSender(ctx, clientCtx, txFactory)
if err != nil {
return nil, err
}
ts := &ConsumerTxSender{TxSender: txSender}
return ts, nil
}
func (ts *ConsumerTxSender) TxSenderConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, sameProviderConflict *conflicttypes.FinalizationConflict) error {
msg := conflicttypes.NewMsgDetection(ts.clientCtx.FromAddress.String(), finalizationConflict, responseConflict, sameProviderConflict)
err := ts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg, false)
if err != nil {
return utils.LavaFormatError("discrepancyChecker - SimulateAndBroadCastTx Failed", err)
}
return nil
}
type ProviderTxSender struct {
*TxSender
}
func NewProviderTxSender(ctx context.Context, clientCtx client.Context, txFactory tx.Factory) (ret *ProviderTxSender, err error) {
txSender, err := NewTxSender(ctx, clientCtx, txFactory)
if err != nil {
return nil, err
}
ts := &ProviderTxSender{TxSender: txSender}
return ts, nil
}
func (pts *ProviderTxSender) TxRelayPayment(ctx context.Context, relayRequests []*pairingtypes.RelaySession, description string, latestBlocks []*pairingtypes.LatestBlockReport) error {
msg := pairingtypes.NewMsgRelayPayment(pts.clientCtx.FromAddress.String(), relayRequests, description, latestBlocks)
utils.LavaFormatDebug("Sending reward TX", utils.LogAttr("Number_of_relay_sessions_for_payment", len(relayRequests)))
err := pts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg, true)
if err != nil {
return utils.LavaFormatError("relay_payment - sending Tx Failed", err)
}
return nil
}
func (pts *ProviderTxSender) SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData) error {
msg := conflicttypes.NewMsgConflictVoteReveal(pts.clientCtx.FromAddress.String(), voteID, vote.Nonce, vote.RelayDataHash)
err := pts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg, false)
if err != nil {
return utils.LavaFormatError("SendVoteReveal - SimulateAndBroadCastTx Failed", err)
}
return nil
}
func (pts *ProviderTxSender) SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData) error {
msg := conflicttypes.NewMsgConflictVoteCommit(pts.clientCtx.FromAddress.String(), voteID, vote.CommitHash)
err := pts.SimulateAndBroadCastTxWithRetryOnSeqMismatch(msg, false)
if err != nil {
return utils.LavaFormatError("SendVoteCommitment - SimulateAndBroadCastTx Failed", err)
}
return nil
}
func parseInsufficientFeesError(msg string, gasUsed uint64) error {
feesPart := strings.Split(msg, "insufficient fees; got: ")[1]
prices := strings.Split(feesPart, commontypes.TokenDenom)
var required int
var err error
for _, p := range prices {
if strings.Contains(p, " required: ") {
requiredParsedString := strings.Split(p, " required: ")[1]
required, err = strconv.Atoi(requiredParsedString)
if err != nil {
return utils.LavaFormatError("Failed converting string to number", err, utils.Attribute{Key: "requiredParsedString", Value: requiredParsedString})
}
}
}
if required == 0 {
return utils.LavaFormatError("Failed fetching required gas from error", nil, utils.Attribute{Key: "message", Value: prices})
}
minimumGasPricesGot := (float64(gasUsed) / float64(required))
return utils.LavaFormatError("Bad Lava Node Configuration detected, Gas fees inconsistencies can be related to the app.toml configuration of the lava node you are using under 'minimum-gas-prices', Please remove the field or set it to the required amount or change rpc to a different lava node", nil,
utils.Attribute{Key: "Required Minimum Gas Prices", Value: defaultGasPrice},
utils.Attribute{Key: "Current (estimated) Minimum Gas Prices", Value: strconv.FormatFloat(minimumGasPricesGot, 'f', -1, 64) + commontypes.TokenDenom},
)
}