-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
reaper.go
171 lines (155 loc) · 4.92 KB
/
reaper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package txmgr
import (
"fmt"
"math/big"
"time"
"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/services/pg"
"github.com/smartcontractkit/chainlink/core/utils"
"github.com/smartcontractkit/sqlx"
"go.uber.org/atomic"
)
//go:generate mockery --name ReaperConfig --output ./mocks/ --case=underscore
// ReaperConfig is the config subset used by the reaper
type ReaperConfig interface {
EthTxReaperInterval() time.Duration
EthTxReaperThreshold() time.Duration
EvmFinalityDepth() uint32
}
// Reaper handles periodic database cleanup for Txm
type Reaper struct {
db *sqlx.DB
config ReaperConfig
chainID utils.Big
log logger.Logger
latestBlockNum *atomic.Int64
trigger chan struct{}
chStop chan struct{}
chDone chan struct{}
}
// NewReaper instantiates a new reaper object
func NewReaper(lggr logger.Logger, db *sqlx.DB, config ReaperConfig, chainID big.Int) *Reaper {
return &Reaper{
db,
config,
*utils.NewBig(&chainID),
lggr.Named("txm_reaper"),
atomic.NewInt64(-1),
make(chan struct{}, 1),
make(chan struct{}),
make(chan struct{}),
}
}
// Start the reaper. Should only be called once.
func (r *Reaper) Start() {
r.log.Debugf("TxmReaper: started with age threshold %v and interval %v", r.config.EthTxReaperThreshold(), r.config.EthTxReaperInterval())
go r.runLoop()
}
// Stop the reaper. Should only be called once.
func (r *Reaper) Stop() {
r.log.Debug("TxmReaper: stopping")
close(r.chStop)
<-r.chDone
}
func (r *Reaper) runLoop() {
defer close(r.chDone)
ticker := time.NewTicker(utils.WithJitter(r.config.EthTxReaperInterval()))
defer ticker.Stop()
for {
select {
case <-r.chStop:
return
case <-ticker.C:
r.work()
ticker.Reset(utils.WithJitter(r.config.EthTxReaperInterval()))
case <-r.trigger:
r.work()
ticker.Reset(utils.WithJitter(r.config.EthTxReaperInterval()))
}
}
}
func (r *Reaper) work() {
latestBlockNum := r.latestBlockNum.Load()
if latestBlockNum < 0 {
return
}
err := r.ReapEthTxes(latestBlockNum)
if err != nil {
r.log.Error("TxmReaper: unable to reap old eth_txes: ", err)
}
}
// SetLatestBlockNum should be called on every new highest block number
func (r *Reaper) SetLatestBlockNum(latestBlockNum int64) {
if latestBlockNum < 0 {
panic(fmt.Sprintf("latestBlockNum must be 0 or greater, got: %d", latestBlockNum))
}
was := r.latestBlockNum.Swap(latestBlockNum)
if was < 0 {
// Run reaper once on startup
r.trigger <- struct{}{}
}
}
// ReapEthTxes deletes old eth_txes
func (r *Reaper) ReapEthTxes(headNum int64) error {
threshold := r.config.EthTxReaperThreshold()
if threshold == 0 {
r.log.Debug("TxmReaper: ETH_TX_REAPER_THRESHOLD set to 0; skipping ReapEthTxes")
return nil
}
minBlockNumberToKeep := headNum - int64(r.config.EvmFinalityDepth())
mark := time.Now()
timeThreshold := mark.Add(-threshold)
r.log.Debugw(fmt.Sprintf("TxmReaper: reaping old eth_txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold, "minBlockNumberToKeep", minBlockNumberToKeep)
// Delete old confirmed eth_txes
// NOTE that this relies on foreign key triggers automatically removing
// the eth_tx_attempts and eth_receipts linked to every eth_tx
err := pg.Batch(func(_, limit uint) (count uint, err error) {
res, err := r.db.Exec(`
WITH old_enough_receipts AS (
SELECT tx_hash FROM eth_receipts
WHERE block_number < $1
ORDER BY block_number ASC, id ASC
LIMIT $2
)
DELETE FROM eth_txes
USING old_enough_receipts, eth_tx_attempts
WHERE eth_tx_attempts.eth_tx_id = eth_txes.id
AND eth_tx_attempts.hash = old_enough_receipts.tx_hash
AND eth_txes.created_at < $3
AND eth_txes.state = 'confirmed'
AND evm_chain_id = $4`, minBlockNumberToKeep, limit, timeThreshold, r.chainID)
if err != nil {
return count, errors.Wrap(err, "ReapEthTxes failed to delete old confirmed eth_txes")
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return count, errors.Wrap(err, "ReapEthTxes failed to get rows affected")
}
return uint(rowsAffected), err
})
if err != nil {
return errors.Wrap(err, "TxmReaper#reapEthTxes batch delete of confirmed eth_txes failed")
}
// Delete old 'fatal_error' eth_txes
err = pg.Batch(func(_, limit uint) (count uint, err error) {
res, err := r.db.Exec(`
DELETE FROM eth_txes
WHERE created_at < $1
AND state = 'fatal_error'
AND evm_chain_id = $2`, timeThreshold, r.chainID)
if err != nil {
return count, errors.Wrap(err, "ReapEthTxes failed to delete old fatally errored eth_txes")
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return count, errors.Wrap(err, "ReapEthTxes failed to get rows affected")
}
return uint(rowsAffected), err
})
if err != nil {
return errors.Wrap(err, "TxmReaper#reapEthTxes batch delete of fatally errored eth_txes failed")
}
r.log.Debugf("TxmReaper: ReapEthTxes completed in %v", time.Since(mark))
return nil
}