-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
orm.go
94 lines (82 loc) · 3.44 KB
/
orm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package headtracker
import (
"context"
"database/sql"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/smartcontractkit/sqlx"
evmtypes "github.com/smartcontractkit/chainlink/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/services/pg"
"github.com/smartcontractkit/chainlink/core/utils"
)
type ORM interface {
// IdempotentInsertHead inserts a head only if the hash is new. Will do nothing if hash exists already.
// No advisory lock required because this is thread safe.
IdempotentInsertHead(ctx context.Context, head *evmtypes.Head) error
// TrimOldHeads deletes heads such that only the top N block numbers remain
TrimOldHeads(ctx context.Context, n uint) (err error)
// LatestHead returns the highest seen head
LatestHead(ctx context.Context) (head *evmtypes.Head, err error)
// LatestHeads returns the latest heads up to given limit
LatestHeads(ctx context.Context, limit uint) (heads []*evmtypes.Head, err error)
// HeadByHash fetches the head with the given hash from the db, returns nil if none exists
HeadByHash(ctx context.Context, hash common.Hash) (head *evmtypes.Head, err error)
}
type orm struct {
q pg.Q
chainID utils.Big
}
func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig, chainID big.Int) ORM {
return &orm{pg.NewQ(db, lggr.Named("HeadTrackerORM"), cfg), utils.Big(chainID)}
}
func (orm *orm) IdempotentInsertHead(ctx context.Context, head *evmtypes.Head) error {
// listener guarantees head.EVMChainID to be equal to orm.chainID
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
query := `
INSERT INTO evm_heads (hash, number, parent_hash, created_at, timestamp, l1_block_number, evm_chain_id, base_fee_per_gas) VALUES (
:hash, :number, :parent_hash, :created_at, :timestamp, :l1_block_number, :evm_chain_id, :base_fee_per_gas)
ON CONFLICT (evm_chain_id, hash) DO NOTHING`
err := q.ExecQNamed(query, head)
return errors.Wrap(err, "IdempotentInsertHead failed to insert head")
}
func (orm *orm) TrimOldHeads(ctx context.Context, n uint) (err error) {
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
return q.ExecQ(`
DELETE FROM evm_heads
WHERE evm_chain_id = $1 AND number < (
SELECT min(number) FROM (
SELECT number
FROM evm_heads
WHERE evm_chain_id = $1
ORDER BY number DESC
LIMIT $2
) numbers
)`, orm.chainID, n)
}
func (orm *orm) LatestHead(ctx context.Context) (head *evmtypes.Head, err error) {
head = new(evmtypes.Head)
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
err = q.Get(head, `SELECT * FROM evm_heads WHERE evm_chain_id = $1 ORDER BY number DESC, created_at DESC, id DESC LIMIT 1`, orm.chainID)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
err = errors.Wrap(err, "LatestHead failed")
return
}
func (orm *orm) LatestHeads(ctx context.Context, limit uint) (heads []*evmtypes.Head, err error) {
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
err = q.Select(&heads, `SELECT * FROM evm_heads WHERE evm_chain_id = $1 ORDER BY number DESC, created_at DESC, id DESC LIMIT $2`, orm.chainID, limit)
err = errors.Wrap(err, "LatestHeads failed")
return
}
func (orm *orm) HeadByHash(ctx context.Context, hash common.Hash) (head *evmtypes.Head, err error) {
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
head = new(evmtypes.Head)
err = q.Get(head, `SELECT * FROM evm_heads WHERE evm_chain_id = $1 AND hash = $2`, orm.chainID, hash)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return head, err
}