Skip to content

Commit

Permalink
Citus 11 support, part 4.
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevan committed Oct 26, 2022
1 parent 676a5f3 commit 628c852
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 29 deletions.
13 changes: 7 additions & 6 deletions collector/citus.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *collector) getCitus(currdb string, fillSize bool) {

if majorVer >= 11 {
c.getCitusTables(currdb)
c.getCitusCoordinatorNodeID(currdb)
c.getCitusNodeIDs(currdb)
}
}

Expand Down Expand Up @@ -334,7 +334,7 @@ SELECT p.logicalrelid::oid::int AS table_oid,
ELSE 'local'::text
END
END AS citus_table_type,
COALESCE(column_to_column_name(p.logicalrelid, p.partkey), '<none>'::text) AS distribution_column,
COALESCE(column_to_column_name(p.logicalrelid, p.partkey), ''::text) AS distribution_column,
p.colocationid AS colocation_id,
citus_total_relation_size(p.logicalrelid, fail_on_error => false) AS table_size,
( SELECT count(*) AS count
Expand Down Expand Up @@ -377,12 +377,13 @@ func (c *collector) getCitusTables(currdb string) {
}
}

func (c *collector) getCitusCoordinatorNodeID(currdb string) {
func (c *collector) getCitusNodeIDs(currdb string) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `SELECT COALESCE(citus_coordinator_nodeid(), 0)`
if err := c.db.QueryRowContext(ctx, q).Scan(&c.result.Citus[currdb].CoordinatorNodeID); err != nil {
log.Printf("warning: citus_coordinator_nodeid() query failed: %v", err)
q := `SELECT COALESCE(citus_coordinator_nodeid(), 0), COALESCE(citus_backend_gpid(), 0)/10000000000`
if err := c.db.QueryRowContext(ctx, q).Scan(&c.result.Citus[currdb].CoordinatorNodeID,
&c.result.Citus[currdb].ConnectedNodeID); err != nil {
log.Printf("warning: citus_coordinator_nodeid()/citus_backend_gpid() query failed: %v", err)
}
}
47 changes: 24 additions & 23 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ package pgmetrics
// ModelSchemaVersion is the schema version of the "Model" data structure
// defined below. It is in the "semver" notation. Version history:
//
// 1.13 - Citus 11 support
// 1.12 - Azure metrics, queryid in plan, progress views
// 1.11 - Postgres 14, PgBouncer 1.16, other attributes
// 1.10 - New fields in pg_stat_statements for Postgres 13
// 1.9 - Postgres 13, Citus support
// 1.8 - AWS RDS/EnhancedMonitoring metrics, index defn,
// 1.13 - Citus 11 support
// 1.12 - Azure metrics, queryid in plan, progress views
// 1.11 - Postgres 14, PgBouncer 1.16, other attributes
// 1.10 - New fields in pg_stat_statements for Postgres 13
// 1.9 - Postgres 13, Citus support
// 1.8 - AWS RDS/EnhancedMonitoring metrics, index defn,
// backend type counts, slab memory (linux), user agent
// 1.7 - query execution plans, autovacuum, deadlocks, table acl
// 1.6 - added highest WAL segment number
// 1.5 - add PID to replication_outgoing entries
// 1.4 - pgbouncer information
// 1.3 - locks information
// 1.2 - more table and index attributes
// 1.1 - added NotificationQueueUsage and Statements
// 1.0 - initial release
// 1.7 - query execution plans, autovacuum, deadlocks, table acl
// 1.6 - added highest WAL segment number
// 1.5 - add PID to replication_outgoing entries
// 1.4 - pgbouncer information
// 1.3 - locks information
// 1.2 - more table and index attributes
// 1.1 - added NotificationQueueUsage and Statements
// 1.0 - initial release
const ModelSchemaVersion = "1.13"

// Model contains the entire information collected by a single run of
Expand Down Expand Up @@ -762,9 +762,10 @@ type Citus struct {
WorkerBackends []CitusBackend `json:"worker_activity,omitempty"` // citus <=10.x
Locks []CitusLock `json:"locks"`
// following fields present only in schema 1.13 and later
AllBackends []CitusBackendV11 `json:"activity,omitempty"` // citus >=11.x
Tables []CitusTable `json:"tables,omitempty"` // citus >=11.x
CoordinatorNodeID int `json:"coordinator_nodeid"` // citus >=11.x
AllBackends []CitusBackendV11 `json:"activity,omitempty"` // citus >=11.x
Tables []CitusTable `json:"tables,omitempty"` // citus >=11.x
CoordinatorNodeID int `json:"coordinator_nodeid,omitempty"` // citus >=11.x
ConnectedNodeID int `json:"connected_nodeid,omitempty"` // citus >=11.x, the node pgmetrics connected to
}

// CitusNode represents a row from the pg_dist_node table. Added in schema 1.9.
Expand Down Expand Up @@ -815,16 +816,16 @@ type CitusBackendV11 struct {

// CitusLock represents a single row from citus_lock_waits. Added in schema 1.9.
type CitusLock struct {
WaitingPID int `json:"waiting_pid"` // citus <=10.x, 0 otherwise
BlockingPID int `json:"blocking_pid"` // citus <=10.x, 0 otherwise
WaitingPID int `json:"waiting_pid,omitempty"` // citus <=10.x, 0 otherwise
BlockingPID int `json:"blocking_pid,omitempty"` // citus <=10.x, 0 otherwise
BlockedStmt string `json:"blocked_statement"`
CurrStmt string `json:"current_statement_in_blocking_process"`
WaitingNodeID int `json:"waiting_node_id"`
BlockingNodeID int `json:"blocking_node_id"`
WaitingNodeName string `json:"waiting_node_name"` // citus <=10.x, '' otherwise
BlockingNodeName string `json:"blocking_node_name"` // citus <=10.x, '' otherwise
WaitingNodePort int `json:"waiting_node_port"` // citus <=10.x, 0 otherwise
BlockingNodePort int `json:"blocking_node_port"` // citus <=10.x, 0 otherwise
WaitingNodeName string `json:"waiting_node_name,omitempty"` // citus <=10.x, '' otherwise
BlockingNodeName string `json:"blocking_node_name,omitempty"` // citus <=10.x, '' otherwise
WaitingNodePort int `json:"waiting_node_port,omitempty"` // citus <=10.x, 0 otherwise
BlockingNodePort int `json:"blocking_node_port,omitempty"` // citus <=10.x, 0 otherwise
// following fields present only in schema 1.13 and later
WaitingGPID int64 `json:"waiting_gpid,omitempty"` // citus >=11.x
BlockingGPID int64 `json:"blocking_gpid,omitempty"` // citus >=11.x
Expand Down

0 comments on commit 628c852

Please sign in to comment.