/
reverted_txns.go
722 lines (666 loc) · 25 KB
/
reverted_txns.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
package v2
import (
"bytes"
"context"
"database/sql"
"fmt"
"strconv"
"strings"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"
txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
evmutils "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/vrf_coordinator_v2"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
type (
TxnReceiptDB struct {
TxHash common.Hash `db:"tx_hash"`
EVMReceipt evmtypes.Receipt `db:"receipt"`
FromAddress common.Address `db:"from_address"`
ToAddress common.Address `db:"to_address"`
EncodedPayload hexutil.Bytes `db:"encoded_payload"`
GasLimit uint64 `db:"gas_limit"`
SubID uint64 `db:"sub_id"`
RequestID string `db:"request_id"`
RequestTxHash string `db:"request_tx_hash"`
ForceFulfillmentAttempt uint64 `db:"force_fulfillment_attempt"`
}
RevertedVRFTxn struct {
DBReceipt TxnReceiptDB
IsBatchReq bool
Proof vrf_coordinator_v2.VRFProof
Commitment vrf_coordinator_v2.VRFCoordinatorV2RequestCommitment
}
)
var ReqScanTimeRangeInDB = "1 hour"
func (lsn *listenerV2) runRevertedTxnsHandler(pollPeriod time.Duration) {
pollPeriod = pollPeriod + time.Second*3
tick := time.NewTicker(pollPeriod)
defer tick.Stop()
ctx, cancel := lsn.chStop.NewCtx()
defer cancel()
for {
select {
case <-lsn.chStop:
return
case <-tick.C:
lsn.handleRevertedTxns(ctx, pollPeriod)
}
}
}
func (lsn *listenerV2) handleRevertedTxns(ctx context.Context, pollPeriod time.Duration) {
lsn.l.Infow("Handling reverted txns")
// Fetch recent single and batch txns, that have not been force-fulfilled
recentSingleTxns, err := lsn.fetchRecentSingleTxns(ctx, lsn.q, lsn.chainID.Uint64(), pollPeriod)
if err != nil {
lsn.l.Fatalw("Fetch recent txns", "err", err)
}
recentBatchTxns, err := lsn.fetchRecentBatchTxns(ctx, lsn.q, lsn.chainID.Uint64(), pollPeriod)
if err != nil {
lsn.l.Fatalw("Fetch recent batch txns", "err", err)
}
recentForceFulfillmentTxns, err := lsn.fetchRevertedForceFulfilmentTxns(ctx, lsn.q, lsn.chainID.Uint64(), pollPeriod)
if err != nil {
lsn.l.Fatalw("Fetch recent reverted force-fulfillment txns", "err", err)
}
recentTxns := make([]TxnReceiptDB, 0)
if len(recentSingleTxns) > 0 {
recentTxns = append(recentTxns, recentSingleTxns...)
}
if len(recentBatchTxns) > 0 {
recentTxns = append(recentTxns, recentBatchTxns...)
}
if len(recentForceFulfillmentTxns) > 0 {
recentTxns = append(recentTxns, recentForceFulfillmentTxns...)
}
// Query RPC using TransactionByHash to get the transaction object
revertedTxns := lsn.filterRevertedTxns(ctx, recentTxns)
// Extract calldata of function call from transaction object
for _, revertedTxn := range revertedTxns {
// Pass that to txm to create a new tx for force fulfillment
_, err := lsn.enqueueForceFulfillmentForRevertedTxn(ctx, revertedTxn)
if err != nil {
lsn.l.Errorw("Enqueue force fulfilment", "err", err)
}
}
}
func (lsn *listenerV2) fetchRecentSingleTxns(ctx context.Context,
q pg.Q,
chainID uint64,
pollPeriod time.Duration) ([]TxnReceiptDB, error) {
// (state = 'confirmed' OR state = 'unconfirmed')
sqlQuery := fmt.Sprintf(`
WITH already_ff as (
SELECT meta->>'RequestID' as request_id
FROM evm.txes
WHERE created_at >= NOW() - interval '%s'
AND evm_chain_id = $1
AND meta->>'ForceFulfilled' is NOT NULL
), txes AS (
SELECT *
FROM evm.txes
WHERE created_at >= NOW() - interval '%s'
AND evm_chain_id = $1
AND meta->>'SubId' IS NOT NULL
AND meta->>'RequestID' IS NOT NULL
AND meta->>'ForceFulfilled' is NULL
AND meta->>'RequestID' NOT IN (SELECT request_id FROM already_ff)
), attempts AS (
SELECT *
FROM evm.tx_attempts
WHERE eth_tx_id IN (SELECT id FROM txes)
), receipts AS (
SELECT *
FROM evm.receipts
WHERE tx_hash IN (SELECT hash FROM attempts)
AND receipt->>'status' = '0x0'
)
SELECT r.tx_hash,
r.receipt,
t.from_address,
t.to_address,
t.encoded_payload,
t.gas_limit,
t.meta->>'SubId' as sub_id,
t.meta->>'RequestID' as request_id,
t.meta->>'RequestTxHash' as request_tx_hash
FROM receipts r
INNER JOIN attempts a ON r.tx_hash = a.hash
INNER JOIN txes t ON a.eth_tx_id = t.id
`, ReqScanTimeRangeInDB, ReqScanTimeRangeInDB)
var recentReceipts []TxnReceiptDB
before := time.Now()
err := q.Select(&recentReceipts, sqlQuery, chainID)
lsn.postSqlLog(ctx, before, pollPeriod, "FetchRecentSingleTxns")
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, errors.Wrap(err, "Error fetching recent non-force-fulfilled txns")
}
recentReceipts = unique(recentReceipts)
lsn.l.Infow("finished querying for recently reverting single fulfillments",
"count", len(recentReceipts),
)
for _, r := range recentReceipts {
lsn.l.Infow("found reverted fulfillment", "requestID", r.RequestID, "fulfillmentTxHash", r.TxHash.String())
}
return recentReceipts, nil
}
func (lsn *listenerV2) fetchRecentBatchTxns(ctx context.Context,
q pg.Q,
chainID uint64,
pollPeriod time.Duration) ([]TxnReceiptDB, error) {
sqlQuery := fmt.Sprintf(`
WITH already_ff as (
SELECT meta->>'RequestID' as request_id
FROM evm.txes
WHERE created_at >= NOW() - interval '%s'
AND evm_chain_id = $1
AND meta->>'ForceFulfilled' is NOT NULL
), txes AS (
SELECT *
FROM (
SELECT *
FROM evm.txes
WHERE created_at >= NOW() - interval '%s'
AND evm_chain_id = $1
AND meta->>'SubId' IS NOT NULL
AND meta->>'RequestIDs' IS NOT NULL
AND meta->>'ForceFulfilled' IS NULL
) AS eth_txes1
WHERE (meta->'RequestIDs' ?| (SELECT ARRAY_AGG(request_id) FROM already_ff)) IS NOT TRUE
), attempts AS (
SELECT *
FROM evm.tx_attempts
WHERE eth_tx_id IN (SELECT id FROM txes)
), receipts AS (
SELECT *
FROM evm.receipts
WHERE tx_hash IN (SELECT hash FROM attempts)
)
SELECT r.tx_hash,
r.receipt,
t.from_address,
t.to_address,
t.encoded_payload,
t.gas_limit,
t.meta->>'SubId' as sub_id
FROM receipts r
INNER JOIN attempts a ON r.tx_hash = a.hash
INNER JOIN txes t ON a.eth_tx_id = t.id
`, ReqScanTimeRangeInDB, ReqScanTimeRangeInDB)
var recentReceipts []TxnReceiptDB
before := time.Now()
err := q.Select(&recentReceipts, sqlQuery, chainID)
lsn.postSqlLog(ctx, before, pollPeriod, "FetchRecentBatchTxns")
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, errors.Wrap(err, "Error fetching recent non-force-fulfilled txns")
}
recentReceipts = unique(recentReceipts)
lsn.l.Infow("finished querying for recent batch fulfillments",
"count", len(recentReceipts),
)
return recentReceipts, nil
}
func (lsn *listenerV2) fetchRevertedForceFulfilmentTxns(ctx context.Context,
q pg.Q,
chainID uint64,
pollPeriod time.Duration) ([]TxnReceiptDB, error) {
sqlQuery := fmt.Sprintf(`
WITH txes AS (
SELECT *
FROM evm.txes
WHERE created_at >= NOW() - interval '%s'
AND evm_chain_id = $1
AND meta->>'SubId' IS NOT NULL
AND meta->>'RequestID' IS NOT NULL
AND meta->>'ForceFulfilled' is NOT NULL
), attempts AS (
SELECT *
FROM evm.tx_attempts
WHERE eth_tx_id IN (SELECT id FROM txes)
), receipts AS (
SELECT *
FROM evm.receipts
WHERE tx_hash IN (SELECT hash FROM attempts)
AND receipt->>'status' = '0x0'
)
SELECT r.tx_hash,
r.receipt,
t.from_address,
t.to_address,
t.encoded_payload,
t.gas_limit,
t.meta->>'SubId' as sub_id,
t.meta->>'RequestID' as request_id,
t.meta->>'RequestTxHash' as request_tx_hash,
CAST(COALESCE(t.meta->>'ForceFulfillmentAttempt', '0') AS INT) as force_fulfillment_attempt
FROM receipts r
INNER JOIN attempts a ON r.tx_hash = a.hash
INNER JOIN txes t ON a.eth_tx_id = t.id
`, ReqScanTimeRangeInDB)
var recentReceipts []TxnReceiptDB
before := time.Now()
err := q.Select(&recentReceipts, sqlQuery, chainID)
lsn.postSqlLog(ctx, before, pollPeriod, "FetchRevertedForceFulfilmentTxns")
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, errors.Wrap(err, "Error fetching recent reverted force-fulfilled txns")
}
sqlQueryAll := fmt.Sprintf(`
WITH txes AS (
SELECT *
FROM evm.txes
WHERE created_at >= NOW() - interval '%s'
AND evm_chain_id = $1
AND meta->>'SubId' IS NOT NULL
AND meta->>'RequestID' IS NOT NULL
AND meta->>'ForceFulfilled' is NOT NULL
), attempts AS (
SELECT *
FROM evm.tx_attempts
WHERE eth_tx_id IN (SELECT id FROM txes)
)
SELECT a.hash as tx_hash,
t.meta->>'SubId' as sub_id,
t.meta->>'RequestID' as request_id,
CAST(COALESCE(t.meta->>'ForceFulfillmentAttempt', '0') AS INT) as force_fulfillment_attempt
FROM attempts a
INNER JOIN txes t ON a.eth_tx_id = t.id
`, ReqScanTimeRangeInDB)
var allReceipts []TxnReceiptDB
before = time.Now()
err = q.Select(&allReceipts, sqlQueryAll, chainID)
lsn.postSqlLog(ctx, before, pollPeriod, "Fetch all ForceFulfilment Txns")
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, errors.Wrap(err, "Error fetching all recent force-fulfilled txns")
}
recentReceipts = UniqueByReqID(recentReceipts, allReceipts)
lsn.l.Infow("finished querying for recently reverting reverted force-fulfillment txns",
"count", len(recentReceipts),
)
for _, r := range recentReceipts {
lsn.l.Infow("found reverted force-fulfillment txn", "requestID", r.RequestID,
"fulfillmentTxHash", r.TxHash.String(),
"ForceFulfillmentAttempt", r.ForceFulfillmentAttempt)
}
return unique(recentReceipts), nil
}
func unique(rs []TxnReceiptDB) (res []TxnReceiptDB) {
if len(rs) == 0 {
return
}
exists := make(map[string]bool)
res = make([]TxnReceiptDB, 0)
for _, r := range rs {
if _, ok := exists[r.TxHash.Hex()]; ok {
continue
}
res = append(res, r)
exists[r.TxHash.Hex()] = true
}
return res
}
func UniqueByReqID(revertedForceTxns []TxnReceiptDB, allForceTxns []TxnReceiptDB) (res []TxnReceiptDB) {
if len(revertedForceTxns) == 0 {
return
}
// Load all force fulfillment txns into a map
// allForceTxns would have successful, reverted and pending force fulfillment txns
allForceTxnsMap := make(map[string]TxnReceiptDB)
for _, r := range allForceTxns {
if existingReceipt, ok := allForceTxnsMap[r.RequestID]; ok {
// Get the latest force fulfillment attempt for a given RequestID
if existingReceipt.ForceFulfillmentAttempt < r.ForceFulfillmentAttempt {
allForceTxnsMap[r.RequestID] = r
}
continue
}
allForceTxnsMap[r.RequestID] = r
}
// Deduplicate reverted force fulfillment txns and skip/ignore reverted
// force-fulfillment txns which have a pending force-fulfillment retry
revertedForceTxnsMap := make(map[string]TxnReceiptDB)
res = make([]TxnReceiptDB, 0)
for _, forceTxn := range revertedForceTxns {
// If there is a pending force fulfilment without a receipt yet, skip force-fulfilling it now again, until a txn receipt
// This prevents a race between this Custom-VRF-Reverted-Txns-Pipeline and TransactionManager
if receipt, ok := allForceTxnsMap[forceTxn.RequestID]; ok && receipt.ForceFulfillmentAttempt > forceTxn.ForceFulfillmentAttempt {
continue
}
if existingReceipt, ok := revertedForceTxnsMap[forceTxn.RequestID]; ok {
// Get the latest force fulfillment attempt for a given RequestID
if existingReceipt.ForceFulfillmentAttempt < forceTxn.ForceFulfillmentAttempt {
revertedForceTxnsMap[forceTxn.RequestID] = forceTxn
}
continue
}
revertedForceTxnsMap[forceTxn.RequestID] = forceTxn
}
// Load the deduplicated map into a list and return
for _, r := range revertedForceTxnsMap {
res = append(res, r)
}
return res
}
// postSqlLog logs about context cancellation and timing after a query returns.
// Queries which use their full timeout log critical level. More than 50% log error, and 10% warn.
func (lsn *listenerV2) postSqlLog(ctx context.Context, begin time.Time, pollPeriod time.Duration, queryName string) {
elapsed := time.Since(begin)
if ctx.Err() != nil {
lsn.l.Debugw("SQL context canceled", "ms", elapsed.Milliseconds(), "err", ctx.Err(), "sql", queryName)
}
timeout := lsn.q.QueryTimeout
if timeout <= 0 {
timeout = pollPeriod
}
pct := float64(elapsed) / float64(timeout)
pct *= 100
kvs := []any{"ms", elapsed.Milliseconds(),
"timeout", timeout.Milliseconds(),
"percent", strconv.FormatFloat(pct, 'f', 1, 64),
"sql", queryName}
if elapsed >= timeout {
lsn.l.Criticalw("ExtremelySlowSQLQuery", kvs...)
} else if errThreshold := timeout / 5; errThreshold > 0 && elapsed > errThreshold {
lsn.l.Errorw("VerySlowSQLQuery", kvs...)
} else if warnThreshold := timeout / 10; warnThreshold > 0 && elapsed > warnThreshold {
lsn.l.Warnw("SlowSQLQuery", kvs...)
} else {
lsn.l.Infow("SQLQueryLatency", kvs...)
}
}
func (lsn *listenerV2) filterRevertedTxns(ctx context.Context,
recentReceipts []TxnReceiptDB) []RevertedVRFTxn {
revertedVRFTxns := make([]RevertedVRFTxn, 0)
for _, txnReceipt := range recentReceipts {
switch txnReceipt.ToAddress.Hex() {
case lsn.vrfOwner.Address().Hex():
fallthrough
case lsn.coordinator.Address().Hex():
// Filter Single VRF Fulfilment
revertedVRFTxn, err := lsn.filterSingleRevertedTxn(ctx, txnReceipt)
if err != nil {
lsn.l.Errorw("Filter reverted single fulfillment txn", "Err", err)
continue
}
// Revert reason is not insufficient balance
if revertedVRFTxn == nil {
continue
}
revertedVRFTxns = append(revertedVRFTxns, *revertedVRFTxn)
case lsn.batchCoordinator.Address().Hex():
// Filter Batch VRF Fulfilment
revertedBatchVRFTxns, err := lsn.filterBatchRevertedTxn(ctx, txnReceipt)
if err != nil {
lsn.l.Errorw("Filter batchfulfilment with reverted txns", "Err", err)
continue
}
// No req in the batch txn with insufficient balance revert reason
if len(revertedBatchVRFTxns) == 0 {
continue
}
revertedVRFTxns = append(revertedVRFTxns, revertedBatchVRFTxns...)
default:
// Unrecognised Txn
lsn.l.Warnw("Unrecognised txn in VRF-Reverted-Pipeline",
"ToAddress", txnReceipt.ToAddress.Hex(),
)
}
}
lsn.l.Infow("Reverted VRF fulfilment txns due to InsufficientBalance",
"count", len(revertedVRFTxns),
"reverted_txns", revertedVRFTxns,
)
for _, r := range revertedVRFTxns {
lsn.l.Infow("Reverted VRF fulfilment txns due to InsufficientBalance",
"RequestID", r.DBReceipt.RequestID,
"TxnStoreEVMReceipt.BlockHash", r.DBReceipt.EVMReceipt.BlockHash.String(),
"TxnStoreEVMReceipt.BlockNumber", r.DBReceipt.EVMReceipt.BlockNumber.String(),
"VRFFulfillmentTxHash", r.DBReceipt.TxHash.String())
}
return revertedVRFTxns
}
func (lsn *listenerV2) filterSingleRevertedTxn(ctx context.Context,
txnReceiptDB TxnReceiptDB) (
*RevertedVRFTxn, error) {
requestID := common.HexToHash(txnReceiptDB.RequestID).Big()
commitment, err := lsn.coordinator.GetCommitment(&bind.CallOpts{Context: ctx}, requestID)
if err != nil {
// Not able to get commitment from chain RPC node, continue
lsn.l.Errorw("Force-fulfilment of single reverted txns: Not able to get commitment from chain RPC node", "err", err)
} else if utils.IsEmpty(commitment[:]) {
// VRF request already fulfilled, return
return nil, nil
}
lsn.l.Infow("Single reverted txn: Unfulfilled req", "req", requestID.String())
// Get txn object from RPC node
ethClient := lsn.chain.Client()
tx, err := ethClient.TransactionByHash(ctx, txnReceiptDB.TxHash)
if err != nil {
return nil, errors.Wrap(err, "get_txn_by_hash")
}
// Simulate txn to get revert error
call := ethereum.CallMsg{
From: txnReceiptDB.FromAddress,
To: &txnReceiptDB.ToAddress,
Data: tx.Data(), // txnReceiptDB.EncodedPayload,
Gas: txnReceiptDB.GasLimit,
GasPrice: tx.GasPrice(),
}
_, rpcError := ethClient.CallContract(ctx, call, txnReceiptDB.EVMReceipt.BlockNumber)
if rpcError == nil {
return nil, fmt.Errorf("error fetching revert reason %v: %v", txnReceiptDB.TxHash, err)
}
revertErr, err := evmclient.ExtractRPCError(rpcError)
lsn.l.Infow("InsufficientBalRevertedTxn",
"RawRevertData", rpcError,
"ParsedRevertData", revertErr.Data,
"ParsingErr", err,
)
if err != nil {
return nil, fmt.Errorf("reverted_txn_reason_parse_err: %v", err)
}
revertErrDataStr := ""
revertErrDataBytes := []byte{}
if revertErr.Data != nil {
revertErrDataStr = revertErr.Data.(string)
revertErrDataStr = strings.Replace(revertErrDataStr, "Reverted ", "", 1)
// If force fulfillment txn reverts on chain due to getFeedData not falling back
// to MAXINT256 due to stalenessSeconds criteria not satisfying
revertErrDataBytes = common.FromHex(revertErrDataStr)
}
insufficientErr := coordinatorV2ABI.Errors["InsufficientBalance"].ID.Bytes()[0:4]
// Revert reason may not be accurately determined from all RPC nodes and may
// not work in some chains
if len(revertErrDataStr) > 0 && !bytes.Equal(revertErrDataBytes[0:4], insufficientErr) {
return nil, nil
}
// If reached maximum number of retries for force fulfillment
if txnReceiptDB.ForceFulfillmentAttempt >= 15 {
return nil, nil
}
// Get VRF fulfillment proof and commitment from tx object
txData := txnReceiptDB.EncodedPayload
if len(txData) <= 4 {
return nil, fmt.Errorf("invalid_txn_data_for_tx: %s", tx.Hash().String())
}
callData := txData[4:] // Remove first 4 bytes of function signature
unpacked, err := coordinatorV2ABI.Methods["fulfillRandomWords"].Inputs.Unpack(callData)
if err != nil {
return nil, fmt.Errorf("invalid_txn_data_for_tx_pack: %s, err %v", tx.Hash().String(), err)
}
proof := abi.ConvertType(unpacked[0], new(vrf_coordinator_v2.VRFProof)).(*vrf_coordinator_v2.VRFProof)
reqCommitment := abi.ConvertType(unpacked[1], new(vrf_coordinator_v2.VRFCoordinatorV2RequestCommitment)).(*vrf_coordinator_v2.VRFCoordinatorV2RequestCommitment)
return &RevertedVRFTxn{
DBReceipt: txnReceiptDB,
IsBatchReq: false,
Proof: *proof,
Commitment: *reqCommitment}, nil
}
func (lsn *listenerV2) filterBatchRevertedTxn(ctx context.Context,
txnReceiptDB TxnReceiptDB) (
[]RevertedVRFTxn, error) {
if len(txnReceiptDB.EncodedPayload) <= 4 {
return nil, fmt.Errorf("invalid encodedPayload: %v", hexutil.Encode(txnReceiptDB.EncodedPayload))
}
unpackedInputs, err := batchCoordinatorV2ABI.Methods["fulfillRandomWords"].Inputs.Unpack(txnReceiptDB.EncodedPayload[4:])
if err != nil {
return nil, errors.Wrap(err, "cannot_unpack_batch_txn")
}
proofs := abi.ConvertType(unpackedInputs[0], new([]vrf_coordinator_v2.VRFProof)).(*[]vrf_coordinator_v2.VRFProof)
reqCommitments := abi.ConvertType(unpackedInputs[1], new([]vrf_coordinator_v2.VRFCoordinatorV2RequestCommitment)).(*[]vrf_coordinator_v2.VRFCoordinatorV2RequestCommitment)
proofReqIDs := make([]common.Hash, 0)
keyHash := lsn.job.VRFSpec.PublicKey.MustHash()
for _, proof := range *proofs {
payload, err := evmutils.ABIEncode(`[{"type":"bytes32"},{"type":"uint256"}]`, keyHash, proof.Seed)
if err != nil {
return nil, fmt.Errorf("ABI Encode Error: (err %v), (keyHash %v), (prood: %v)", err, keyHash, proof.Seed)
}
requestIDOfProof := common.BytesToHash(crypto.Keccak256(payload))
proofReqIDs = append(proofReqIDs, requestIDOfProof)
}
// BatchVRFCoordinatorV2
revertedTxns := make([]RevertedVRFTxn, 0)
for _, log := range txnReceiptDB.EVMReceipt.Logs {
if log.Topics[0] != batchCoordinatorV2ABI.Events["RawErrorReturned"].ID {
continue
}
// Extract revert reason for individual req in batch txn
unpacked, err := batchCoordinatorV2ABI.Events["RawErrorReturned"].Inputs.Unpack(log.Data)
if err != nil {
lsn.l.Errorw("cannot_unpack_batch_coordinator_log", "err", err)
continue
}
lowLevelData := unpacked[0].([]byte)
if !bytes.Equal(lowLevelData, coordinatorV2ABI.Errors["InsufficientBalance"].ID.Bytes()[0:4]) {
continue
}
// Match current log to a (proof, commitment) pair from rawTxData using requestID
requestID := log.Topics[1]
var curProof vrf_coordinator_v2.VRFProof
var curReqCommitment vrf_coordinator_v2.VRFCoordinatorV2RequestCommitment
found := false
for i, proof := range *proofs {
requestIDOfProof := proofReqIDs[i]
if requestID == requestIDOfProof {
found = true
curProof = proof
curReqCommitment = (*reqCommitments)[i]
break
}
}
if found {
commitment, err := lsn.coordinator.GetCommitment(&bind.CallOpts{Context: ctx}, requestID.Big())
if err != nil {
// Not able to get commitment from chain RPC node, continue
lsn.l.Errorw("Force-fulfilment of batch reverted txns: Not able to get commitment from chain RPC node",
"err", err,
"requestID", requestID.Big())
} else if utils.IsEmpty(commitment[:]) {
lsn.l.Infow("Batch fulfillment with initial reverted fulfillment txn and later successful fulfillment, Skipping", "req", requestID.String())
continue
}
lsn.l.Infow("Batch fulfillment with reverted fulfillment txn", "req", requestID.String())
revertedTxn := RevertedVRFTxn{
DBReceipt: TxnReceiptDB{
TxHash: txnReceiptDB.TxHash,
EVMReceipt: txnReceiptDB.EVMReceipt,
FromAddress: txnReceiptDB.FromAddress,
SubID: txnReceiptDB.SubID,
RequestID: requestID.Hex(),
},
IsBatchReq: true,
Proof: curProof,
Commitment: curReqCommitment,
}
revertedTxns = append(revertedTxns, revertedTxn)
} else {
lsn.l.Criticalw("Reverted Batch fulfilment requestID from log does not have proof in req EncodedPayload",
"requestIDFromLog", requestID.Big().Int64(),
)
}
}
return revertedTxns, nil
}
// enqueueForceFulfillment enqueues a forced fulfillment through the
// VRFOwner contract. It estimates gas again on the transaction due
// to the extra steps taken within VRFOwner.fulfillRandomWords.
func (lsn *listenerV2) enqueueForceFulfillmentForRevertedTxn(
ctx context.Context,
revertedTxn RevertedVRFTxn,
) (etx txmgr.Tx, err error) {
if lsn.job.VRFSpec.VRFOwnerAddress == nil {
return txmgr.Tx{}, errors.New("vrf_owner_not_set_in_job_spec")
}
proof := revertedTxn.Proof
reqCommitment := revertedTxn.Commitment
fromAddresses := lsn.fromAddresses()
fromAddress, err := lsn.gethks.GetRoundRobinAddress(ctx, lsn.chainID, fromAddresses...)
if err != nil {
return txmgr.Tx{}, errors.Wrap(err, "failed_to_get_vrf_listener_from_address")
}
// fulfill the request through the VRF owner
lsn.l.Infow("VRFOwner.fulfillRandomWords vs. VRFCoordinatorV2.fulfillRandomWords",
"vrf_owner.fulfillRandomWords", hexutil.Encode(vrfOwnerABI.Methods["fulfillRandomWords"].ID),
"vrf_coordinator_v2.fulfillRandomWords", hexutil.Encode(coordinatorV2ABI.Methods["fulfillRandomWords"].ID),
)
vrfOwnerAddress1 := lsn.vrfOwner.Address()
vrfOwnerAddressSpec := lsn.job.VRFSpec.VRFOwnerAddress.Address()
lsn.l.Infow("addresses diff", "wrapper_address", vrfOwnerAddress1, "spec_address", vrfOwnerAddressSpec)
txData, err := vrfOwnerABI.Pack("fulfillRandomWords", proof, reqCommitment)
if err != nil {
return txmgr.Tx{}, errors.Wrap(err, "abi pack VRFOwner.fulfillRandomWords")
}
vrfOwnerCoordinator, _ := lsn.vrfOwner.GetVRFCoordinator(nil)
lsn.l.Infow("RevertedTxnForceFulfilment EstimatingGas",
"EncodedPayload", hexutil.Encode(txData),
"VRFOwnerCoordinator", vrfOwnerCoordinator.String(),
)
ethClient := lsn.chain.Client()
estimateGasLimit, err := ethClient.EstimateGas(ctx, ethereum.CallMsg{
From: fromAddress,
To: &vrfOwnerAddressSpec,
Data: txData,
})
if err != nil {
return txmgr.Tx{}, errors.Wrap(err, "failed to estimate gas on VRFOwner.fulfillRandomWords")
}
estimateGasLimit = uint64(1.4 * float64(estimateGasLimit))
lsn.l.Infow("Estimated gas limit on force fulfillment", "estimateGasLimit", estimateGasLimit)
reqID := common.BytesToHash(hexutil.MustDecode(revertedTxn.DBReceipt.RequestID))
var reqTxHash common.Hash
if revertedTxn.DBReceipt.RequestTxHash != "" {
reqTxHash = common.BytesToHash(hexutil.MustDecode(revertedTxn.DBReceipt.RequestTxHash))
}
lsn.l.Infow("RevertedTxnForceFulfilment CreateTransaction",
"RequestID", revertedTxn.DBReceipt.RequestID,
"RequestTxHash", revertedTxn.DBReceipt.RequestTxHash,
)
forceFulfiled := true
forceFulfillmentAttempt := revertedTxn.DBReceipt.ForceFulfillmentAttempt + 1
etx, err = lsn.chain.TxManager().CreateTransaction(ctx, txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: lsn.vrfOwner.Address(),
EncodedPayload: txData,
FeeLimit: estimateGasLimit,
Strategy: txmgrcommon.NewSendEveryStrategy(),
Meta: &txmgr.TxMeta{
RequestID: &reqID,
SubID: &revertedTxn.DBReceipt.SubID,
RequestTxHash: &reqTxHash,
ForceFulfilled: &forceFulfiled,
ForceFulfillmentAttempt: &forceFulfillmentAttempt,
// No max link since simulation failed
},
})
return etx, err
}