-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
reaper.go
180 lines (163 loc) · 5.39 KB
/
reaper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package bulletprooftxmanager
import (
"fmt"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/services/postgres"
"github.com/smartcontractkit/chainlink/core/store/models"
"gorm.io/gorm"
)
//go:generate mockery --name ReaperConfig --output ./mocks/ --case=underscore
// ReaperConfig is the config subset used by the reaper
type ReaperConfig interface {
EthTxReaperInterval() time.Duration
EthTxReaperThreshold() time.Duration
EthFinalityDepth() uint
}
// Reaper handles periodic database cleanup for BPTXM
type Reaper struct {
db *gorm.DB
config ReaperConfig
log *logger.Logger
latestBlockNum int64
trigger chan struct{}
chStop chan struct{}
chDone chan struct{}
}
// NewReaper instantiates a new reaper object
func NewReaper(db *gorm.DB, config ReaperConfig) *Reaper {
return &Reaper{
db,
config,
logger.CreateLogger(logger.Default.With("id", "bptxm_reaper")),
-1,
make(chan struct{}, 1),
make(chan struct{}),
make(chan struct{}),
}
}
// Start the reaper. Should only be called once.
func (r *Reaper) Start() {
r.log.Debugf("BPTXMReaper: started with age threshold %v and interval %v", r.config.EthTxReaperThreshold(), r.config.EthTxReaperInterval())
go r.runLoop()
}
// Stop the reaper. Should only be called once.
func (r *Reaper) Stop() {
r.log.Debug("BPTXMReaper: stopping")
close(r.chStop)
<-r.chDone
}
func (r *Reaper) runLoop() {
defer close(r.chDone)
ticker := time.NewTicker(r.config.EthTxReaperInterval())
defer ticker.Stop()
for {
select {
case <-r.chStop:
return
case <-ticker.C:
r.work()
case <-r.trigger:
r.work()
}
}
}
func (r *Reaper) work() {
latestBlockNum := atomic.LoadInt64(&r.latestBlockNum)
if latestBlockNum < 0 {
return
}
err := r.ReapEthTxes(latestBlockNum)
if err != nil {
r.log.Error("BPTXMReaper: unable to reap old eth_txes: ", err)
}
err = r.ReapJobRuns()
if err != nil {
r.log.Error("BPTXMReaper: unable to reap old runs: ", err)
}
}
// SetLatestBlockNum should be called on every new highest block number
func (r *Reaper) SetLatestBlockNum(latestBlockNum int64) {
if latestBlockNum < 0 {
panic(fmt.Sprintf("latestBlockNum must be 0 or greater, got: %d", latestBlockNum))
}
was := atomic.SwapInt64(&r.latestBlockNum, latestBlockNum)
if was < 0 {
// Run reaper once on startup
r.trigger <- struct{}{}
}
}
// ReapEthTxes deletes old eth_txes
func (r *Reaper) ReapEthTxes(headNum int64) error {
threshold := r.config.EthTxReaperThreshold()
if threshold == 0 {
r.log.Debug("BPTXMReaper: ETH_TX_REAPER_THRESHOLD set to 0; skipping ReapEthTxes")
return nil
}
minBlockNumberToKeep := headNum - int64(r.config.EthFinalityDepth())
mark := time.Now()
timeThreshold := mark.Add(-threshold)
r.log.Debugw(fmt.Sprintf("BPTXMReaper: reaping old eth_txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold, "minBlockNumberToKeep", minBlockNumberToKeep)
// Delete old confirmed eth_txes
// NOTE that this relies on foreign key triggers automatically removing
// the eth_tx_attempts and eth_receipts linked to every eth_tx
err := postgres.Batch(func(_, limit uint) (count uint, err error) {
res := r.db.Exec(`
WITH old_enough_receipts AS (
SELECT tx_hash FROM eth_receipts
WHERE block_number < ?
ORDER BY block_number ASC, id ASC
LIMIT ?
)
DELETE FROM eth_txes
USING old_enough_receipts, eth_tx_attempts
WHERE eth_tx_attempts.eth_tx_id = eth_txes.id
AND eth_tx_attempts.hash = old_enough_receipts.tx_hash
AND eth_txes.created_at < ?
AND eth_txes.state = 'confirmed'`, minBlockNumberToKeep, limit, timeThreshold)
if res.Error != nil {
return count, res.Error
}
return uint(res.RowsAffected), res.Error
})
if err != nil {
return errors.Wrap(err, "BPTXMReaper#reapEthTxes batch delete of confirmed eth_txes failed")
}
// Delete old 'fatal_error' eth_txes
err = postgres.Batch(func(_, limit uint) (count uint, err error) {
res := r.db.Exec(`
DELETE FROM eth_txes
WHERE created_at < ?
AND state = 'fatal_error'`, timeThreshold)
if res.Error != nil {
return count, res.Error
}
return uint(res.RowsAffected), res.Error
})
if err != nil {
return errors.Wrap(err, "BPTXMReaper#reapEthTxes batch delete of fatally errored eth_txes failed")
}
r.log.Debugf("BPTXMReaper: ReapEthTxes completed in %v", time.Since(mark))
return nil
}
// ReapJobRuns removes old job runs
// HACK: This isn't quite the right place for it, but since we are killing the
// old pipeline this code is temporary anyway
func (r *Reaper) ReapJobRuns() error {
threshold := r.config.EthTxReaperThreshold() // Just re-use the EthTxReaperThreshold, it's probably close enough
if threshold == 0 {
r.log.Debug("BPTXMReaper: ETH_TX_REAPER_THRESHOLD set to 0; skipping ReapJobRuns")
return nil
}
mark := time.Now()
timeThreshold := mark.Add(-threshold)
r.log.Debugw(fmt.Sprintf("BPTXMReaper: reaping old job_runs last updated before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold)
request := &models.BulkDeleteRunRequest{
Status: []models.RunStatus{models.RunStatusCompleted, models.RunStatusErrored},
UpdatedBefore: timeThreshold,
}
r.log.Debugf("BPTXMReaper: ReapJobRuns completed in %v", time.Since(mark))
return errors.Wrap(postgres.BulkDeleteRuns(r.db, request), "BPTXMReaper#ReapJobRuns failed")
}