Skip to content

Commit

Permalink
Collect information about locks
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevan committed Nov 10, 2018
1 parent da6dc11 commit 2dc1da1
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 4 deletions.
112 changes: 112 additions & 0 deletions collector/collect.go
Expand Up @@ -341,6 +341,7 @@ func (c *collector) collectCluster(o CollectConfig) {
c.getNotification()
}

c.getLocks()
}

// info and stats for the current database
Expand Down Expand Up @@ -1564,6 +1565,117 @@ func (c *collector) getNotification() {
}
}

func (c *collector) getLocks() {
c.getLockRows()
if c.version >= 90600 {
c.getBlockers96()
} else {
c.getBlockers()
}
}

func (c *collector) getLockRows() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `
SELECT COALESCE(D.datname, ''), L.locktype, L.mode, L.granted, L.pid,
COALESCE(L.relation, 0)
FROM pg_locks L LEFT OUTER JOIN pg_database D ON L.database = D.oid`
rows, err := c.db.QueryContext(ctx, q)
if err != nil {
log.Fatalf("pg_locks query failed: %v", err)
}
defer rows.Close()

for rows.Next() {
var l pgmetrics.Lock
if err := rows.Scan(&l.DBName, &l.LockType, &l.Mode, &l.Granted,
&l.PID, &l.RelationOID); err != nil {
log.Fatalf("pg_locks query failed: %v", err)
}
c.result.Locks = append(c.result.Locks, l)
}
if err := rows.Err(); err != nil {
log.Fatalf("pg_locks query failed: %v", err)
}
}

func (c *collector) getBlockers96() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `
WITH P AS (SELECT DISTINCT pid FROM pg_locks WHERE NOT granted)
SELECT pid, pg_blocking_pids(pid) FROM P`
rows, err := c.db.QueryContext(ctx, q)
if err != nil {
log.Fatalf("pg_locks query failed: %v", err)
}
defer rows.Close()

c.result.BlockingPIDs = make(map[int][]int)
for rows.Next() {
var pid int
var blockers []int64 // lib/pq doesn't support []int :-(
if err := rows.Scan(&pid, pq.Array(&blockers)); err != nil {
log.Fatalf("pg_locks query failed: %v", err)
}
blockersInt := make([]int, len(blockers))
for i := range blockers {
blockersInt[i] = int(blockers[i])
}
c.result.BlockingPIDs[pid] = blockersInt
}
if err := rows.Err(); err != nil {
log.Fatalf("pg_locks query failed: %v", err)
}
}

func (c *collector) getBlockers() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Based on a query from https://wiki.postgresql.org/wiki/Lock_Monitoring
q := `
SELECT DISTINCT blocked_locks.pid AS blocked_pid, blocking_locks.pid AS blocking_pid
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
WHERE NOT blocked_locks.GRANTED`
rows, err := c.db.QueryContext(ctx, q)
if err != nil {
log.Fatalf("pg_locks query failed: %v", err)
}
defer rows.Close()

c.result.BlockingPIDs = make(map[int][]int)
for rows.Next() {
var pid, blocker int
if err := rows.Scan(&pid, &blocker); err != nil {
log.Fatalf("pg_locks query failed: %v", err)
}
if _, ok := c.result.BlockingPIDs[pid]; ok {
c.result.BlockingPIDs[pid] = append(c.result.BlockingPIDs[pid], blocker)
} else {
c.result.BlockingPIDs[pid] = []int{blocker}
}
}
if err := rows.Err(); err != nil {
log.Fatalf("pg_locks query failed: %v", err)
}
}

func (c *collector) getPublications() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
Expand Down
25 changes: 21 additions & 4 deletions model.go
Expand Up @@ -18,10 +18,11 @@ package pgmetrics

// ModelSchemaVersion is the schema version of the "Model" data structure
// defined below. It is in the "semver" notation. Version history:
// 1.3 - locks information
// 1.2 - more table and index attributes
// 1.1 - added NotificationQueueUsage and Statements
// 1.0 - initial release
const ModelSchemaVersion = "1.2"
const ModelSchemaVersion = "1.3"

// Model contains the entire information collected by a single run of
// pgmetrics. It can be converted to and from json without loss of
Expand Down Expand Up @@ -98,11 +99,17 @@ type Model struct {
// settings
Settings map[string]Setting `json:"settings"` // all settings and their values

// rest of the fields added in schema version 1.2
// following fields present only in schema 1.2 and later

// Logical replication (database-specific)
Publications []Publication `json:"publications,omitempty"`
Subscriptions []Subscription `json:"subscriptions,omitempty"`

// following fields present only in schema 1.3 and later

// Lock information
Locks []Lock `json:"locks,omitempty"`
BlockingPIDs map[int][]int `json:"blocking_pids,omitempty"`
}

// DatabaseByOID iterates over the databases in the model and returns the reference
Expand Down Expand Up @@ -303,7 +310,7 @@ type Table struct {
TidxBlksHit int64 `json:"tidx_blks_hit"`
Size int64 `json:"size"`
Bloat int64 `json:"bloat"`
// rest of the fields added in schema version 1.2
// following fields present only in schema 1.2 and later
RelKind string `json:"relkind"`
RelPersistence string `json:"relpersistence"`
RelNAtts int `json:"relnatts"`
Expand All @@ -328,7 +335,7 @@ type Index struct {
IdxBlksHit int64 `json:"idx_blks_hit"`
Size int64 `json:"size"`
Bloat int64 `json:"bloat"`
// rest of the fields added in schema version 1.2
// following fields present only in schema 1.2 and later
RelNAtts int `json:"relnatts"`
AMName string `json:"amname"`
TablespaceName string `json:"tablespace_name"`
Expand Down Expand Up @@ -503,3 +510,13 @@ type Subscription struct {
LatestEndTime int64 `json:"latest_end_time"`
Latency int64 `json:"latency_micros"`
}

// Lock represents a single row from pg_locks. Added in schema 1.3.
type Lock struct {
LockType string `json:"locktype"`
DBName string `json:"db_name,omitempty"`
PID int `json:"pid"`
Mode string `json:"mode"`
Granted bool `json:"granted"`
RelationOID int `json:"relation_oid,omitempty"`
}

0 comments on commit 2dc1da1

Please sign in to comment.