/
prom_reporter.go
126 lines (107 loc) · 3.45 KB
/
prom_reporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package services
import (
"context"
"database/sql"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/store"
"github.com/smartcontractkit/chainlink/core/store/models"
"go.uber.org/multierr"
"gopkg.in/guregu/null.v4"
)
//go:generate mockery --name PrometheusBackend --output ../internal/mocks/ --case=underscore
type (
promReporter struct {
db *sql.DB
backend PrometheusBackend
}
PrometheusBackend interface {
SetUnconfirmedTransactions(int64)
SetMaxUnconfirmedBlocks(int64)
}
defaultBackend struct{}
)
var (
promUnconfirmedTransactions = promauto.NewGauge(prometheus.GaugeOpts{
Name: "unconfirmed_transactions",
Help: "Number of currently unconfirmed transactions",
})
promMaxUnconfirmedBlocks = promauto.NewGauge(prometheus.GaugeOpts{
Name: "max_unconfirmed_blocks",
Help: "The max number of blocks any currently unconfirmed transaction has been unconfirmed for",
})
)
func (defaultBackend) SetUnconfirmedTransactions(n int64) {
promUnconfirmedTransactions.Set(float64(n))
}
func (defaultBackend) SetMaxUnconfirmedBlocks(n int64) {
promMaxUnconfirmedBlocks.Set(float64(n))
}
func NewPromReporter(db *sql.DB, opts ...PrometheusBackend) store.HeadTrackable {
var backend PrometheusBackend
if len(opts) > 0 {
backend = opts[0]
} else {
backend = defaultBackend{}
}
return &promReporter{db, backend}
}
// Do nothing on connect, simply wait for the next head
func (pr *promReporter) Connect(*models.Head) error {
return nil
}
func (pr *promReporter) Disconnect() {
// pass
}
func (pr *promReporter) OnNewLongestChain(ctx context.Context, head models.Head) {
err := multierr.Combine(
errors.Wrap(pr.reportPendingEthTxes(ctx), "reportPendingEthTxes failed"),
errors.Wrap(pr.reportMaxUnconfirmedBlocks(ctx, head), "reportMaxUnconfirmedBlocks failed"),
)
if err != nil {
logger.Errorw("Error reporting prometheus metrics", "err", err)
}
}
func (pr *promReporter) reportPendingEthTxes(ctx context.Context) (err error) {
rows, err := pr.db.QueryContext(ctx, `SELECT count(*) FROM eth_txes WHERE state = 'unconfirmed'`)
if err != nil {
return errors.Wrap(err, "failed to query for unconfirmed eth_tx count")
}
defer func() {
err = multierr.Combine(err, rows.Close())
}()
var unconfirmed int64
for rows.Next() {
if err := rows.Scan(&unconfirmed); err != nil {
return errors.Wrap(err, "unexpected error scanning row")
}
}
pr.backend.SetUnconfirmedTransactions(unconfirmed)
return nil
}
func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head models.Head) (err error) {
rows, err := pr.db.QueryContext(ctx, `
SELECT MIN(broadcast_before_block_num) FROM eth_tx_attempts
JOIN eth_txes ON eth_txes.id = eth_tx_attempts.eth_tx_id
AND eth_txes.state = 'unconfirmed'`)
if err != nil {
return errors.Wrap(err, "failed to query for min broadcast_before_block_num")
}
defer func() {
err = multierr.Combine(err, rows.Close())
}()
var earliestUnconfirmedTxBlock null.Int
for rows.Next() {
if err := rows.Scan(&earliestUnconfirmedTxBlock); err != nil {
return errors.Wrap(err, "unexpected error scanning row")
}
}
var blocksUnconfirmed int64
if !earliestUnconfirmedTxBlock.IsZero() {
blocksUnconfirmed = head.Number - earliestUnconfirmedTxBlock.ValueOrZero()
}
pr.backend.SetMaxUnconfirmedBlocks(blocksUnconfirmed)
return nil
}