-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
orm.go
297 lines (262 loc) · 10.4 KB
/
orm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package txmgr
import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/smartcontractkit/sqlx"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/services/pg"
)
//go:generate mockery --quiet --name ORM --output ./mocks/ --case=underscore
type ORM interface {
EthTransactions(offset, limit int) ([]EthTx, int, error)
EthTransactionsWithAttempts(offset, limit int) ([]EthTx, int, error)
EthTxAttempts(offset, limit int) ([]EthTxAttempt, int, error)
FindEthTxAttempt(hash common.Hash) (*EthTxAttempt, error)
FindEthTxAttemptsByEthTxIDs(ids []int64) ([]EthTxAttempt, error)
FindEthTxByHash(hash common.Hash) (*EthTx, error)
InsertEthTxAttempt(attempt *EthTxAttempt) error
InsertEthTx(etx *EthTx) error
InsertEthReceipt(receipt *EthReceipt) error
FindEthTxWithAttempts(etxID int64) (etx EthTx, err error)
FindEthTxAttemptConfirmedByEthTxIDs(ids []int64) ([]EthTxAttempt, error)
}
type orm struct {
q pg.Q
logger logger.Logger
}
var _ ORM = (*orm)(nil)
func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) ORM {
namedLogger := lggr.Named("TxmORM")
q := pg.NewQ(db, namedLogger, cfg)
return &orm{q, namedLogger}
}
func (o *orm) preloadTxAttempts(txs []EthTx) error {
// Preload TxAttempts
var ids []int64
for _, tx := range txs {
ids = append(ids, tx.ID)
}
if len(ids) == 0 {
return nil
}
var attempts []EthTxAttempt
sql := `SELECT * FROM eth_tx_attempts WHERE eth_tx_id IN (?) ORDER BY id desc;`
query, args, err := sqlx.In(sql, ids)
if err != nil {
return err
}
query = o.q.Rebind(query)
if err = o.q.Select(&attempts, query, args...); err != nil {
return err
}
// fill in attempts
for _, attempt := range attempts {
for i, tx := range txs {
if tx.ID == attempt.EthTxID {
txs[i].EthTxAttempts = append(txs[i].EthTxAttempts, attempt)
}
}
}
return nil
}
func (o *orm) preloadTxes(attempts []EthTxAttempt) error {
var ids []int64
for _, attempt := range attempts {
ids = append(ids, attempt.EthTxID)
}
if len(ids) == 0 {
return nil
}
var txs []EthTx
sql := `SELECT * FROM eth_txes WHERE id IN (?)`
query, args, err := sqlx.In(sql, ids)
if err != nil {
return err
}
query = o.q.Rebind(query)
if err = o.q.Select(&txs, query, args...); err != nil {
return err
}
// fill in txs
for _, tx := range txs {
for i, attempt := range attempts {
if tx.ID == attempt.EthTxID {
attempts[i].EthTx = tx
}
}
}
return nil
}
// EthTransactions returns all eth transactions without loaded relations
// limited by passed parameters.
func (o *orm) EthTransactions(offset, limit int) (txs []EthTx, count int, err error) {
sql := `SELECT count(*) FROM eth_txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM eth_tx_attempts)`
if err = o.q.Get(&count, sql); err != nil {
return
}
sql = `SELECT * FROM eth_txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM eth_tx_attempts) ORDER BY id desc LIMIT $1 OFFSET $2`
if err = o.q.Select(&txs, sql, limit, offset); err != nil {
return
}
return
}
// EthTransactionsWithAttempts returns all eth transactions with at least one attempt
// limited by passed parameters. Attempts are sorted by id.
func (o *orm) EthTransactionsWithAttempts(offset, limit int) (txs []EthTx, count int, err error) {
sql := `SELECT count(*) FROM eth_txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM eth_tx_attempts)`
if err = o.q.Get(&count, sql); err != nil {
return
}
sql = `SELECT * FROM eth_txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM eth_tx_attempts) ORDER BY id desc LIMIT $1 OFFSET $2`
if err = o.q.Select(&txs, sql, limit, offset); err != nil {
return
}
err = o.preloadTxAttempts(txs)
return
}
// EthTxAttempts returns the last tx attempts sorted by created_at descending.
func (o *orm) EthTxAttempts(offset, limit int) (txs []EthTxAttempt, count int, err error) {
sql := `SELECT count(*) FROM eth_tx_attempts`
if err = o.q.Get(&count, sql); err != nil {
return
}
sql = `SELECT * FROM eth_tx_attempts ORDER BY created_at DESC, id DESC LIMIT $1 OFFSET $2`
if err = o.q.Select(&txs, sql, limit, offset); err != nil {
return
}
err = o.preloadTxes(txs)
return
}
// FindEthTxAttempt returns an individual EthTxAttempt
func (o *orm) FindEthTxAttempt(hash common.Hash) (*EthTxAttempt, error) {
ethTxAttempt := EthTxAttempt{}
sql := `SELECT * FROM eth_tx_attempts WHERE hash = $1`
if err := o.q.Get(ðTxAttempt, sql, hash); err != nil {
return nil, err
}
// reuse the preload
attempts := []EthTxAttempt{ethTxAttempt}
err := o.preloadTxes(attempts)
return &attempts[0], err
}
// FindEthTxAttemptsByEthTxIDs returns a list of attempts by ETH Tx IDs
func (o *orm) FindEthTxAttemptsByEthTxIDs(ids []int64) ([]EthTxAttempt, error) {
var attempts []EthTxAttempt
sql := `SELECT * FROM eth_tx_attempts WHERE eth_tx_id = ANY($1)`
if err := o.q.Select(&attempts, sql, ids); err != nil {
return nil, err
}
return attempts, nil
}
func (o *orm) FindEthTxByHash(hash common.Hash) (*EthTx, error) {
var etx EthTx
err := o.q.Transaction(func(tx pg.Queryer) error {
sql := `SELECT eth_txes.* FROM eth_txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM eth_tx_attempts WHERE hash = $1)`
if err := tx.Get(&etx, sql, hash); err != nil {
return errors.Wrapf(err, "failed to find eth_tx with hash %d", hash)
}
return nil
}, pg.OptReadOnlyTx())
return &etx, errors.Wrap(err, "FindEthTxByHash failed")
}
// InsertEthTxAttempt inserts a new txAttempt into the database
func (o *orm) InsertEthTx(etx *EthTx) error {
if etx.CreatedAt == (time.Time{}) {
etx.CreatedAt = time.Now()
}
const insertEthTxSQL = `INSERT INTO eth_txes (nonce, from_address, to_address, encoded_payload, value, gas_limit, error, broadcast_at, initial_broadcast_at, created_at, state, meta, subject, pipeline_task_run_id, min_confirmations, evm_chain_id, access_list, transmit_checker) VALUES (
:nonce, :from_address, :to_address, :encoded_payload, :value, :gas_limit, :error, :broadcast_at, :initial_broadcast_at, :created_at, :state, :meta, :subject, :pipeline_task_run_id, :min_confirmations, :evm_chain_id, :access_list, :transmit_checker
) RETURNING *`
err := o.q.GetNamed(insertEthTxSQL, etx, etx)
return errors.Wrap(err, "InsertEthTx failed")
}
func (o *orm) InsertEthTxAttempt(attempt *EthTxAttempt) error {
const insertEthTxAttemptSQL = `INSERT INTO eth_tx_attempts (eth_tx_id, gas_price, signed_raw_tx, hash, broadcast_before_block_num, state, created_at, chain_specific_gas_limit, tx_type, gas_tip_cap, gas_fee_cap) VALUES (
:eth_tx_id, :gas_price, :signed_raw_tx, :hash, :broadcast_before_block_num, :state, NOW(), :chain_specific_gas_limit, :tx_type, :gas_tip_cap, :gas_fee_cap
) RETURNING *`
err := o.q.GetNamed(insertEthTxAttemptSQL, attempt, attempt)
return errors.Wrap(err, "InsertEthTxAttempt failed")
}
func (o *orm) InsertEthReceipt(receipt *EthReceipt) error {
const insertEthReceiptSQL = `INSERT INTO eth_receipts (tx_hash, block_hash, block_number, transaction_index, receipt, created_at) VALUES (
:tx_hash, :block_hash, :block_number, :transaction_index, :receipt, NOW()
) RETURNING *`
err := o.q.GetNamed(insertEthReceiptSQL, receipt, receipt)
return errors.Wrap(err, "InsertEthReceipt failed")
}
// FindEthTxWithAttempts finds the EthTx with its attempts and receipts preloaded
func (o *orm) FindEthTxWithAttempts(etxID int64) (etx EthTx, err error) {
err = o.q.Transaction(func(tx pg.Queryer) error {
if err = tx.Get(&etx, `SELECT * FROM eth_txes WHERE id = $1 ORDER BY created_at ASC, id ASC`, etxID); err != nil {
return errors.Wrapf(err, "failed to find eth_tx with id %d", etxID)
}
if err = loadEthTxAttempts(tx, &etx); err != nil {
return errors.Wrapf(err, "failed to load eth_tx_attempts for eth_tx with id %d", etxID)
}
if err = loadEthTxAttemptsReceipts(tx, &etx); err != nil {
return errors.Wrapf(err, "failed to load eth_receipts for eth_tx with id %d", etxID)
}
return nil
}, pg.OptReadOnlyTx())
return etx, errors.Wrap(err, "FindEthTxWithAttempts failed")
}
func (o *orm) FindEthTxAttemptConfirmedByEthTxIDs(ids []int64) ([]EthTxAttempt, error) {
var attempts []EthTxAttempt
err := o.q.Transaction(func(tx pg.Queryer) error {
if err := tx.Select(&attempts, `SELECT eta.*
FROM eth_tx_attempts eta
join eth_receipts er on eta.hash = er.tx_hash where eta.eth_tx_id = ANY($1) ORDER BY eta.gas_price DESC, eta.gas_tip_cap DESC`, ids); err != nil {
return err
}
return loadConfirmedAttemptsReceipts(tx, attempts)
}, pg.OptReadOnlyTx())
return attempts, errors.Wrap(err, "FindEthTxAttemptConfirmedByEthTxIDs failed")
}
func loadEthTxAttempts(q pg.Queryer, etx *EthTx) error {
err := q.Select(&etx.EthTxAttempts, `SELECT * FROM eth_tx_attempts WHERE eth_tx_id = $1 ORDER BY eth_tx_attempts.gas_price DESC, eth_tx_attempts.gas_tip_cap DESC`, etx.ID)
return errors.Wrapf(err, "failed to load ethtxattempts for eth tx %d", etx.ID)
}
func loadEthTxAttemptsReceipts(q pg.Queryer, etx *EthTx) (err error) {
return loadEthTxesAttemptsReceipts(q, []*EthTx{etx})
}
func loadEthTxesAttemptsReceipts(q pg.Queryer, etxs []*EthTx) (err error) {
if len(etxs) == 0 {
return nil
}
attemptHashM := make(map[common.Hash]*EthTxAttempt, len(etxs)) // len here is lower bound
attemptHashes := make([][]byte, len(etxs)) // len here is lower bound
for _, etx := range etxs {
for i, attempt := range etx.EthTxAttempts {
attemptHashM[attempt.Hash] = &etx.EthTxAttempts[i]
attemptHashes = append(attemptHashes, attempt.Hash.Bytes())
}
}
var receipts []EthReceipt
if err = q.Select(&receipts, `SELECT * FROM eth_receipts WHERE tx_hash = ANY($1)`, pq.Array(attemptHashes)); err != nil {
return errors.Wrap(err, "loadEthTxesAttemptsReceipts failed to load eth_receipts")
}
for _, receipt := range receipts {
attempt := attemptHashM[receipt.TxHash]
attempt.EthReceipts = append(attempt.EthReceipts, receipt)
}
return nil
}
func loadConfirmedAttemptsReceipts(q pg.Queryer, attempts []EthTxAttempt) error {
byHash := make(map[common.Hash]*EthTxAttempt, len(attempts))
hashes := make([][]byte, len(attempts))
for i, attempt := range attempts {
byHash[attempt.Hash] = &attempts[i]
hashes = append(hashes, attempt.Hash.Bytes())
}
var receipts []EthReceipt
if err := q.Select(&receipts, `SELECT * FROM eth_receipts WHERE tx_hash = ANY($1)`, pq.Array(hashes)); err != nil {
return errors.Wrap(err, "loadConfirmedAttemptsReceipts failed to load eth_receipts")
}
for _, receipt := range receipts {
attempt := byHash[receipt.TxHash]
attempt.EthReceipts = append(attempt.EthReceipts, receipt)
}
return nil
}