Skip to content

Commit

Permalink
Collect from pg_stat_subscription_stats in pg15.
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevan committed Nov 2, 2022
1 parent 628c852 commit 1312691
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 23 deletions.
82 changes: 60 additions & 22 deletions collector/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
pgv12 = 12_00_00
pgv13 = 13_00_00
pgv14 = 14_00_00
pgv15 = 15_00_00
)

// See https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-CONNSTRING
Expand Down Expand Up @@ -1295,6 +1296,7 @@ func (c *collector) getBETypeCountsv10() {
// onlyListed - only collect for the databases listed in 'dbList'
// dbList - list of database names for onlyListed
// also: if onlyListed is true but dbList is empty, assume dbList contains
//
// the name of the currently connected database
func (c *collector) getDatabases(fillSize, onlyListed bool, dbList []string) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
Expand Down Expand Up @@ -2028,6 +2030,7 @@ func (c *collector) isAWSAurora() bool {

// getWALCountsv12 gets the WAL file and archive ready counts using the
// following functions (respectively):
//
// pg_ls_waldir
// pg_ls_archive_statusdir
func (c *collector) getWALCountsv12() {
Expand All @@ -2039,6 +2042,7 @@ func (c *collector) getWALCountsv12() {

// getWALCountsv11 gets the WAL file and archive ready counts using the
// following functions (respectively):
//
// pg_ls_waldir
// pg_ls_dir (if not aws)
func (c *collector) getWALCountsv11() {
Expand All @@ -2057,6 +2061,7 @@ func (c *collector) getWALCountsv11() {

// getWALCounts gets the WAL file and archive ready counts using the
// following functions (respectively):
//
// pg_ls_dir (if not aws)
// pg_ls_dir (if not aws)
func (c *collector) getWALCounts() {
Expand Down Expand Up @@ -2279,24 +2284,51 @@ func (c *collector) getSubscriptions() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `WITH
sc AS (SELECT srsubid, COUNT(*) AS c FROM pg_subscription_rel GROUP BY 1),
swc AS (SELECT subid, COUNT(*) AS c FROM pg_stat_subscription GROUP BY 1)
SELECT
s.oid, s.subname, current_database(), subenabled,
array_length(subpublications, 1) AS pubcount, sc.c AS tabcount,
swc.c AS workercount,
COALESCE(ss.received_lsn::text, ''),
COALESCE(ss.latest_end_lsn::text, ''),
ss.last_msg_send_time, ss.last_msg_receipt_time,
COALESCE(EXTRACT(EPOCH FROM ss.latest_end_time)::bigint, 0)
FROM
pg_subscription s
JOIN sc ON s.oid = sc.srsubid
JOIN pg_stat_subscription ss ON s.oid = ss.subid
JOIN swc ON s.oid = swc.subid
WHERE
ss.relid IS NULL`
var q string
if c.version >= pgv15 {
q = `WITH
sc AS (SELECT srsubid, COUNT(*) AS c FROM pg_subscription_rel GROUP BY 1),
swc AS (SELECT subid, COUNT(*) AS c FROM pg_stat_subscription GROUP BY 1),
sss AS (SELECT subid, apply_error_count, sync_error_count FROM pg_stat_subscription_stats)
SELECT
s.oid, s.subname, current_database(), subenabled,
array_length(subpublications, 1) AS pubcount, sc.c AS tabcount,
swc.c AS workercount,
COALESCE(ss.received_lsn::text, ''),
COALESCE(ss.latest_end_lsn::text, ''),
ss.last_msg_send_time, ss.last_msg_receipt_time,
COALESCE(EXTRACT(EPOCH FROM ss.latest_end_time)::bigint, 0),
sss.apply_error_count,
sss.sync_error_count
FROM
pg_subscription s
JOIN sc ON s.oid = sc.srsubid
JOIN pg_stat_subscription ss ON s.oid = ss.subid
JOIN swc ON s.oid = swc.subid
JOIN sss ON s.oid = sss.subid
WHERE
ss.relid IS NULL`
} else {
q = `WITH
sc AS (SELECT srsubid, COUNT(*) AS c FROM pg_subscription_rel GROUP BY 1),
swc AS (SELECT subid, COUNT(*) AS c FROM pg_stat_subscription GROUP BY 1)
SELECT
s.oid, s.subname, current_database(), subenabled,
array_length(subpublications, 1) AS pubcount, sc.c AS tabcount,
swc.c AS workercount,
COALESCE(ss.received_lsn::text, ''),
COALESCE(ss.latest_end_lsn::text, ''),
ss.last_msg_send_time, ss.last_msg_receipt_time,
COALESCE(EXTRACT(EPOCH FROM ss.latest_end_time)::bigint, 0),
0, 0
FROM
pg_subscription s
JOIN sc ON s.oid = sc.srsubid
JOIN pg_stat_subscription ss ON s.oid = ss.subid
JOIN swc ON s.oid = swc.subid
WHERE
ss.relid IS NULL`
}
rows, err := c.db.QueryContext(ctx, q)
if err != nil {
return // don't fail on errors
Expand All @@ -2308,12 +2340,18 @@ func (c *collector) getSubscriptions() {
var msgSend, msgRecv pq.NullTime
if err := rows.Scan(&s.OID, &s.Name, &s.DBName, &s.Enabled, &s.PubCount,
&s.TableCount, &s.WorkerCount, &s.ReceivedLSN, &s.LatestEndLSN,
&msgSend, &msgRecv, &s.LatestEndTime); err != nil {
&msgSend, &msgRecv, &s.LatestEndTime, &s.ApplyErrorCount, &s.SyncErrorCount); err != nil {
log.Fatalf("pg_subscription query failed: %v", err)
}
s.LastMsgSendTime = msgSend.Time.Unix()
s.LastMsgReceiptTime = msgRecv.Time.Unix()
s.Latency = int64(msgRecv.Time.Sub(msgSend.Time)) / 1000
if msgSend.Valid {
s.LastMsgSendTime = msgSend.Time.Unix()
}
if msgRecv.Valid {
s.LastMsgReceiptTime = msgRecv.Time.Unix()
}
if msgSend.Valid && msgRecv.Valid {
s.Latency = int64(msgRecv.Time.Sub(msgSend.Time)) / 1000
}
c.result.Subscriptions = append(c.result.Subscriptions, s)
}
if err := rows.Err(); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion model.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package pgmetrics
// ModelSchemaVersion is the schema version of the "Model" data structure
// defined below. It is in the "semver" notation. Version history:
//
// 1.13 - Citus 11 support
// 1.13 - Citus 11 support, Postgres 15
// 1.12 - Azure metrics, queryid in plan, progress views
// 1.11 - Postgres 14, PgBouncer 1.16, other attributes
// 1.10 - New fields in pg_stat_statements for Postgres 13
Expand Down Expand Up @@ -630,6 +630,9 @@ type Subscription struct {
LastMsgReceiptTime int64 `json:"last_msg_receipt_time"`
LatestEndTime int64 `json:"latest_end_time"`
Latency int64 `json:"latency_micros"`
// following fields present only in schema 1.13 and later
ApplyErrorCount int `json:"apply_error_count,omitempty"` // >= pg15
SyncErrorCount int `json:"sync_error_count,omitempty"` // >= pg15
}

// Lock represents a single row from pg_locks. Added in schema 1.3.
Expand Down

0 comments on commit 1312691

Please sign in to comment.