Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Support for coalescing multiple metrics into a single SQL query
Browse files Browse the repository at this point in the history
  • Loading branch information
sumerman committed Sep 19, 2022
1 parent 42b3f08 commit 11e3e1f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 95 deletions.
24 changes: 15 additions & 9 deletions pkg/pgmodel/metrics/database/database.go
Expand Up @@ -55,7 +55,7 @@ func (e *metricsEngineImpl) unregister() {
func getMetrics(m []metricQueryWrap) []prometheus.Collector {
var metrics []prometheus.Collector
for i := range m {
metrics = append(metrics, m[i].metric)
metrics = append(metrics, m[i].metrics...)
}
return metrics
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func healthCheck(conn pgxconn.PgxConn, healthMetric metricQueryWrap) {
err := conn.QueryRow(context.Background(), healthMetric.query).Scan(&val)
networkLatency := time.Since(start).Milliseconds()

updateMetric(healthMetric.metric, 1)
updateMetric(healthMetric.metrics[0], 1)
if err != nil {
dbHealthErrors.Inc()
log.Error("msg", "health check failed", "err", err.Error())
Expand All @@ -173,18 +173,24 @@ func (e *metricsEngineImpl) IsRunning() bool {

func handleResults(results pgx.BatchResults, m []metricQueryWrap) {
for i := range m {
metric := m[i]
val := new(int64)
err := results.QueryRow().Scan(&val)
entry := m[i]
valsCount := len(entry.metrics)
vals := make([]interface{}, valsCount)
for vi := range vals {
vals[vi] = new(int64)
}
err := results.QueryRow().Scan(vals...)
if err != nil {
log.Warn("msg", fmt.Sprintf("error evaluating database metric with query: %s", metric.query), "err", err.Error())
log.Warn("msg", fmt.Sprintf("error evaluating database metric with query: %s", entry.query), "err", err.Error())
return
}
var value int64
if val != nil {
value = *val
for vi := range vals {
if vals[vi] != nil {
value = *vals[vi].(*int64)
}
updateMetric(entry.metrics[vi], value)
}
updateMetric(metric.metric, value)
}
}

Expand Down
175 changes: 89 additions & 86 deletions pkg/pgmodel/metrics/database/metrics.go
Expand Up @@ -40,38 +40,32 @@ func init() {
}

type metricQueryWrap struct {
metric prometheus.Collector
// Multiple metrics could be retrieved via single query
// In that case they should appear in the same order as
// corresponding the columns in the query's result.
metrics []prometheus.Collector
query string
isHealthCheck bool
isHealthCheck bool // if set only metrics[0] is used
}

func compressionChunksQueries(countDelayed bool) string {
delayClause := "(m.delay_compression_until IS NULL OR m.delay_compression_until < now())"
if countDelayed {
delayClause = "(m.delay_compression_until IS NOT NULL AND m.delay_compression_until >= now())"
func gauges(opts ...prometheus.GaugeOpts) []prometheus.Collector {
res := make([]prometheus.Collector, 0, len(opts))
for _, opt := range opts {
res = append(res, prometheus.NewGauge(opt))
}

return `WITH chunk_candidates AS MATERIALIZED (
SELECT chcons.dimension_slice_id, h.table_name, h.schema_name
FROM _timescaledb_catalog.chunk_constraint chcons
INNER JOIN _timescaledb_catalog.chunk c ON c.id = chcons.chunk_id
INNER JOIN _timescaledb_catalog.hypertable h ON h.id = c.hypertable_id
WHERE c.dropped IS FALSE
AND h.compression_state = 1 -- compression_enabled = TRUE
AND (c.status & 1) != 1) -- only check for uncompressed chunks
SELECT count(*)::BIGINT
FROM chunk_candidates cc
INNER JOIN _timescaledb_catalog.dimension_slice ds ON ds.id = cc.dimension_slice_id
INNER JOIN _prom_catalog.metric m ON (m.table_name = cc.table_name AND m.table_schema = cc.schema_name)
WHERE ` + delayClause +
` AND NOT m.is_view
AND ds.range_start <= _timescaledb_internal.time_to_internal(now() - interval '1 hour')
AND ds.range_end <= _timescaledb_internal.time_to_internal(now() - interval '1 hour')`
return res
}
func counters(opts ...prometheus.CounterOpts) []prometheus.Collector {
res := make([]prometheus.Collector, 0, len(opts))
for _, opt := range opts {
res = append(res, prometheus.NewCounter(opt))
}
return res
}

var metrics = []metricQueryWrap{
{
metric: prometheus.NewCounter(
metrics: counters(
prometheus.CounterOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -83,41 +77,34 @@ var metrics = []metricQueryWrap{
isHealthCheck: true,
},
{
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "chunks_count",
Help: "Total number of chunks in TimescaleDB currently.",
},
),
// Compressed_chunk_id is null for both yet to be compressed and already compressed chunks.
query: `select count(*)::bigint from _timescaledb_catalog.chunk where dropped=false and compressed_chunk_id is null`,
},
{
metric: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "chunks_created",
Help: "Total number of chunks created since creation of database.",
},
),
// Compressed_chunk_id is null for both yet to be compressed and already compressed chunks.
query: `select count(*)::bigint from _timescaledb_catalog.chunk where compressed_chunk_id is null`,
},
{
metric: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "chunks_compressed_count",
Help: "Total number of compressed chunks in TimescaleDB currently.",
},
),
query: `select count(*)::bigint from _timescaledb_catalog.chunk where dropped=false and compressed_chunk_id is not null`,
// Compressed_chunk_id is null for both yet to be compressed and already compressed chunks.
query: `SELECT
count(*) FILTER (WHERE dropped=false AND compressed_chunk_id IS NULL)::BIGINT AS chunks_count,
count(*) FILTER (WHERE compressed_chunk_id IS NULL)::BIGINT AS chunks_created,
count(*) FILTER (WHERE dropped=false AND compressed_chunk_id IS NOT NULL)::BIGINT AS chunks_compressed_count
FROM _timescaledb_catalog.chunk`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -135,27 +122,40 @@ var metrics = []metricQueryWrap{
WHERE ds.range_start < _timescaledb_internal.time_to_internal(now() - coalesce(m.retention_period, conf.def_retention))
AND ds.range_end < _timescaledb_internal.time_to_internal(now() - coalesce(m.retention_period, conf.def_retention))`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "chunks_metrics_uncompressed_count",
Help: "The number of metrics chunks soon to be compressed by maintenance jobs.",
},
),
query: compressionChunksQueries(false),
}, {
metric: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "chunks_metrics_delayed_compression_count",
Help: "The number of metrics chunks not-compressed due to a set delay.",
},
),
query: compressionChunksQueries(true),
query: `WITH chunk_candidates AS MATERIALIZED (
SELECT chcons.dimension_slice_id, h.table_name, h.schema_name
FROM _timescaledb_catalog.chunk_constraint chcons
INNER JOIN _timescaledb_catalog.chunk c ON c.id = chcons.chunk_id
INNER JOIN _timescaledb_catalog.hypertable h ON h.id = c.hypertable_id
WHERE c.dropped IS FALSE
AND h.compression_state = 1 -- compression_enabled = TRUE
AND (c.status & 1) != 1 -- only check for uncompressed chunks
)
SELECT
count(*) FILTER(WHERE m.delay_compression_until IS NULL OR m.delay_compression_until < now())::BIGINT AS uncompressed,
count(*) FILTER(WHERE m.delay_compression_until IS NOT NULL AND m.delay_compression_until >= now())::BIGINT AS delayed_compression
FROM chunk_candidates cc
INNER JOIN _timescaledb_catalog.dimension_slice ds ON ds.id = cc.dimension_slice_id
INNER JOIN _prom_catalog.metric m ON (m.table_name = cc.table_name AND m.table_schema = cc.schema_name)
WHERE NOT m.is_view
AND ds.range_start <= _timescaledb_internal.time_to_internal(now() - interval '1 hour')
AND ds.range_end <= _timescaledb_internal.time_to_internal(now() - interval '1 hour')`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -173,7 +173,7 @@ var metrics = []metricQueryWrap{
AND ds.range_end < _timescaledb_internal.time_to_internal(now() - conf.def_retention)
AND h.schema_name = '_ps_trace'`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -182,21 +182,22 @@ var metrics = []metricQueryWrap{
},
),
query: `WITH chunk_candidates AS MATERIALIZED (
SELECT chcons.dimension_slice_id
FROM _timescaledb_catalog.chunk_constraint chcons
INNER JOIN _timescaledb_catalog.chunk c ON c.id = chcons.chunk_id
INNER JOIN _timescaledb_catalog.hypertable h ON h.id = c.hypertable_id
WHERE c.dropped IS FALSE
AND h.schema_name = '_ps_trace'
AND h.compression_state = 1 -- compression_enabled = TRUE
AND (c.status & 1) != 1) -- only check for uncompressed chunks
SELECT count(*)::BIGINT
FROM chunk_candidates cc
INNER JOIN _timescaledb_catalog.dimension_slice ds ON ds.id = cc.dimension_slice_id
WHERE ds.range_start <= _timescaledb_internal.time_to_internal(now() - interval '1 hour')
AND ds.range_end <= _timescaledb_internal.time_to_internal(now() - interval '1 hour')`,
SELECT chcons.dimension_slice_id
FROM _timescaledb_catalog.chunk_constraint chcons
INNER JOIN _timescaledb_catalog.chunk c ON c.id = chcons.chunk_id
INNER JOIN _timescaledb_catalog.hypertable h ON h.id = c.hypertable_id
WHERE c.dropped IS FALSE
AND h.schema_name = '_ps_trace'
AND h.compression_state = 1 -- compression_enabled = TRUE
AND (c.status & 1) != 1 -- only check for uncompressed chunks
)
SELECT count(*)::BIGINT
FROM chunk_candidates cc
INNER JOIN _timescaledb_catalog.dimension_slice ds ON ds.id = cc.dimension_slice_id
WHERE ds.range_start <= _timescaledb_internal.time_to_internal(now() - interval '1 hour')
AND ds.range_end <= _timescaledb_internal.time_to_internal(now() - interval '1 hour')`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -206,7 +207,7 @@ var metrics = []metricQueryWrap{
),
query: `select (case when (value = 'true') then 1 else 0 end) from _prom_catalog.get_default_value('metric_compression') value`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -216,7 +217,7 @@ var metrics = []metricQueryWrap{
),
query: `select current_setting('timescaledb.max_background_workers')::BIGINT`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -226,7 +227,7 @@ var metrics = []metricQueryWrap{
),
query: `select count(*) from timescaledb_information.jobs where proc_name = 'execute_maintenance_job'`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -235,13 +236,13 @@ var metrics = []metricQueryWrap{
},
),
query: `select count(stats.last_run_status)
from timescaledb_information.job_stats stats
inner join
timescaledb_information.jobs jobs
on jobs.job_id = stats.job_id
where jobs.proc_name = 'execute_maintenance_job' and stats.last_run_status = 'Failed'`,
from timescaledb_information.job_stats stats
inner join
timescaledb_information.jobs jobs
on jobs.job_id = stats.job_id
where jobs.proc_name = 'execute_maintenance_job' and stats.last_run_status = 'Failed'`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -250,15 +251,15 @@ inner join
},
),
query: `SELECT extract(
epoch FROM (SELECT COALESCE(
(SELECT last_run_started_at AS job_running_since
FROM timescaledb_information.job_stats WHERE last_run_started_at > last_successful_finish
AND last_run_status = 'Success'
),
CURRENT_TIMESTAMP
)))::BIGINT`,
epoch FROM (SELECT COALESCE(
(SELECT last_run_started_at AS job_running_since
FROM timescaledb_information.job_stats WHERE last_run_started_at > last_successful_finish
AND last_run_status = 'Success'
),
CURRENT_TIMESTAMP
)))::BIGINT`,
}, {
metric: prometheus.NewGauge(
metrics: gauges(
prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Expand All @@ -272,14 +273,16 @@ inner join

// GetMetric returns the first metric whose Name matches the supplied name.
func GetMetric(name string) (prometheus.Metric, error) {
for _, m := range metrics {
metric := getMetric(m.metric)
str, err := util.ExtractMetricDesc(metric)
if err != nil {
return nil, fmt.Errorf("extract metric string")
}
if strings.Contains(str, name) {
return metric, nil
for _, ms := range metrics {
for _, m := range ms.metrics {
metric := getMetric(m)
str, err := util.ExtractMetricDesc(metric)
if err != nil {
return nil, fmt.Errorf("extract metric string")
}
if strings.Contains(str, name) {
return metric, nil
}
}
}
return nil, nil
Expand Down

0 comments on commit 11e3e1f

Please sign in to comment.