Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Finishing refinements for metric-metadata.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Jun 18, 2021
1 parent 7bcb492 commit 29a04f8
Show file tree
Hide file tree
Showing 25 changed files with 304 additions and 726 deletions.
1 change: 0 additions & 1 deletion go.mod
Expand Up @@ -3,7 +3,6 @@ module github.com/timescale/promscale
go 1.15

require (
github.com/Azure/azure-sdk-for-go v52.5.0+incompatible
github.com/NYTimes/gziphandler v1.1.1
github.com/armon/go-metrics v0.3.3 // indirect
github.com/blang/semver/v4 v4.0.0
Expand Down
58 changes: 4 additions & 54 deletions pkg/api/metadata.go
Expand Up @@ -9,8 +9,6 @@ import (
"strconv"

"github.com/NYTimes/gziphandler"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/timescale/promscale/pkg/pgclient"
"github.com/timescale/promscale/pkg/pgmodel/metadata"
)
Expand All @@ -27,10 +25,11 @@ func metricMetadataHandler(client *pgclient.Client) http.HandlerFunc {
return
}
var (
limit int64
err error

metric = r.FormValue("metric")
limitStr = r.FormValue("limit")
limit int64
err error
)
if limitStr != "" {
limit, err = strconv.ParseInt(limitStr, 10, 32)
Expand All @@ -39,56 +38,7 @@ func metricMetadataHandler(client *pgclient.Client) http.HandlerFunc {
return
}
}
data, err := metadata.MetricMetadata(client.Connection, metric, int(limit))
if err != nil {
respondError(w, http.StatusInternalServerError, err, "fetching metric metadata")
return
}
respond(w, http.StatusOK, data)
}
}

func TargetMetadata(conf *Config, client *pgclient.Client) http.Handler {
hf := corsWrapper(conf, targetMetadataHandler(client))
return gziphandler.GzipHandler(hf)
}

func targetMetadataHandler(client *pgclient.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
respondError(w, http.StatusBadRequest, err, "bad_data")
return
}
var (
metric = r.FormValue("metric")
limitStr = r.FormValue("limit")
matchTarget = r.FormValue("match_target")
limit int64
err error
)
if limitStr != "" {
limit, err = strconv.ParseInt(limitStr, 10, 32)
if err != nil {
respondError(w, http.StatusBadRequest, err, "converting string to integer")
return
}
}
var matchers []*labels.Matcher
if matchTarget != "" {
matchers, err = parser.ParseMetricSelector(matchTarget)
if err != nil {
respondError(w, http.StatusBadRequest, err, "bad_data")
return
}
} else {
matcher, err := labels.NewMatcher(labels.MatchRegexp, "job", ".*")
if err != nil {
respondError(w, http.StatusInternalServerError, err, "creating all jobs matcher")
return
}
matchers = append(matchers, matcher)
}
data, err := metadata.TargetMetadata(client.Connection, matchers, metric, int(limit))
data, err := metadata.MetricQuery(client.Connection, metric, int(limit))
if err != nil {
respondError(w, http.StatusInternalServerError, err, "fetching metric metadata")
return
Expand Down
5 changes: 1 addition & 4 deletions pkg/api/parser/parser.go
Expand Up @@ -62,10 +62,7 @@ func (d DefaultParser) ParseRequest(r *http.Request, req *prompb.WriteRequest) e
return fmt.Errorf("parser error: %w", err)
}

if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
} else if len(req.Timeseries) == 0 && len(req.Metadata) > 0 {
// Preprocessors not required as no timeseries exists.
if len(req.Timeseries) == 0 {
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/api/router.go
Expand Up @@ -82,10 +82,6 @@ func GenerateRouter(apiConf *Config, metrics *Metrics, client *pgclient.Client,
router.Get("/api/v1/labels", labelsHandler)
router.Post("/api/v1/labels", labelsHandler)

targetMetadataHandler := timeHandler(metrics.HTTPRequestDuration, "targets/metadata", TargetMetadata(apiConf, client))
router.Get("/api/v1/targets/metadata", targetMetadataHandler)
router.Post("/api/v1/targets/metadata", targetMetadataHandler)

metadataHandler := timeHandler(metrics.HTTPRequestDuration, "metadata", MetricMetadata(apiConf, client))
router.Get("/api/v1/metadata", metadataHandler)
router.Post("/api/v1/metadata", metadataHandler)
Expand Down
27 changes: 25 additions & 2 deletions pkg/migrations/migration_files_generated.go

Large diffs are not rendered by default.

71 changes: 16 additions & 55 deletions pkg/migrations/sql/idempotent/metric-metadata.sql
@@ -1,77 +1,38 @@

-- TODO: mov this to update files script.
CREATE TABLE IF NOT EXISTS SCHEMA_CATALOG.metadata
(
id SERIAL NOT NULL PRIMARY KEY,
last_seen TIMESTAMPTZ NOT NULL,
metric_family TEXT NOT NULL,
type TEXT DEFAULT NULL,
unit TEXT DEFAULT NULL,
help TEXT DEFAULT NULL,
UNIQUE (metric_family, type, unit, help)
);

-- TODO: mov this to update files script.
CREATE UNIQUE INDEX IF NOT EXISTS metadata_index ON SCHEMA_CATALOG.metadata
(
last_seen, metric_family
);

CREATE OR REPLACE FUNCTION prom_api.insert_metric_metadata(t TIMESTAMPTZ, metric_family_name TEXT, metric_type TEXT, metric_unit TEXT, metric_help TEXT)
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.insert_metric_metadatas(t TIMESTAMPTZ[], metric_family_name TEXT[], metric_type TEXT[], metric_unit TEXT[], metric_help TEXT[])
RETURNS BIGINT
AS
$$
DECLARE
num_rows BIGINT;
BEGIN
INSERT INTO _prom_catalog.metadata (last_seen, metric_family, type, unit, help) VALUES (t, metric_family_name, metric_type, metric_unit, metric_help)
ON CONFLICT (metric_family, type, unit, help) DO
UPDATE SET last_seen = t;
INSERT INTO SCHEMA_CATALOG.metadata (last_seen, metric_family, type, unit, help)
SELECT * FROM UNNEST($1, $2, $3, $4, $5) res(last_seen, metric_family, type, unit, help)
ORDER BY res.metric_family, res.type, res.unit, res.help
ON CONFLICT (metric_family, type, unit, help) DO
UPDATE SET last_seen = EXCLUDED.last_seen;
GET DIAGNOSTICS num_rows = ROW_COUNT;
RETURN num_rows;
END;
$$ LANGUAGE plpgsql;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.insert_metric_metadata(TIMESTAMPTZ, TEXT, TEXT, TEXT, TEXT) TO prom_writer;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.insert_metric_metadatas(TIMESTAMPTZ[], TEXT[], TEXT[], TEXT[], TEXT[]) TO prom_writer;

CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_metric_metadata(metric_family_name TEXT)
RETURNS TABLE (metric_family TEXT, type TEXT, unit TEXT, help TEXT)
AS
$$
SELECT metric_family, type, unit, help FROM SCHEMA_CATALOG.metadata WHERE metric_family = metric_family_name ORDER BY last_seen LIMIT 1
SELECT metric_family, type, unit, help FROM SCHEMA_CATALOG.metadata WHERE metric_family = metric_family_name ORDER BY last_seen DESC
$$ LANGUAGE SQL;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.get_metric_metadata(TEXT) TO prom_writer;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.get_metric_metadata(TEXT) TO prom_reader;

-- metric_families should have unique elements, otherwise there will be duplicate rows in the returned table.
CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_multiple_metric_metadata(metric_families TEXT[])
RETURNS TABLE (metric_family TEXT, type TEXT, unit TEXT, help TEXT)
AS
$$
DECLARE
m TEXT;
BEGIN
create temporary table result (mf TEXT, t TEXT, u TEXT, h TEXT) ON COMMIT DROP;
FOREACH m IN ARRAY metric_families
LOOP
INSERT INTO result
SELECT d.metric_family, d.type, d.unit, d.help FROM SCHEMA_CATALOG.metadata d WHERE d.metric_family = m ORDER BY d.last_seen LIMIT 1;
END LOOP;
RETURN QUERY SELECT mf, t, u, h from result;
END;
$$ LANGUAGE plpgsql;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.get_multiple_metric_metadata(TEXT[]) TO prom_writer;

CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_targets(metric_family_name TEXT, series_ids BIGINT[]) RETURNS TABLE (series_id BIGINT, target_keys TEXT[], target_values TEXT[])
AS
$$
DECLARE
metric_table_name TEXT;
q TEXT;
BEGIN
SELECT table_name INTO metric_table_name FROM SCHEMA_CATALOG.metric WHERE metric_name = metric_family_name;
q = FORMAT('select s.id series_id, array_agg(l.key) target_keys, array_agg(l.value) target_values from SCHEMA_CATALOG.label l inner join SCHEMA_DATA_SERIES.%I s on (true) where l.id = any(s.labels) and (key = ''job'' or key = ''instance'') and s.id = any(%L::bigint[]) group by series_id;', metric_table_name, series_ids);
RETURN QUERY EXECUTE q;
END;
$$ language plpgsql;



SELECT info.*
FROM unnest(metric_families) AS family(name)
INNER JOIN LATERAL (
SELECT metric_family, type, unit, help FROM SCHEMA_CATALOG.metadata WHERE metric_family = family.name ORDER BY last_seen DESC LIMIT 1
) AS info ON (true)
$$ LANGUAGE SQL;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.get_multiple_metric_metadata(TEXT[]) TO prom_reader;
16 changes: 16 additions & 0 deletions pkg/migrations/sql/preinstall/007-tables_metadata.sql
@@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS SCHEMA_CATALOG.metadata
(
last_seen TIMESTAMPTZ NOT NULL,
metric_family TEXT NOT NULL,
type TEXT DEFAULT NULL,
unit TEXT DEFAULT NULL,
help TEXT DEFAULT NULL,
PRIMARY KEY (metric_family, type, unit, help)
);
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE SCHEMA_CATALOG.metadata TO prom_writer;
GRANT SELECT ON TABLE SCHEMA_CATALOG.metadata TO prom_reader;

CREATE INDEX IF NOT EXISTS metadata_index ON SCHEMA_CATALOG.metadata
(
metric_family, last_seen
);
16 changes: 16 additions & 0 deletions pkg/migrations/sql/versions/dev/0.4.2-dev/1-metric_metadata.sql
@@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS SCHEMA_CATALOG.metadata
(
last_seen TIMESTAMPTZ NOT NULL,
metric_family TEXT NOT NULL,
type TEXT DEFAULT NULL,
unit TEXT DEFAULT NULL,
help TEXT DEFAULT NULL,
PRIMARY KEY (metric_family, type, unit, help)
);
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE SCHEMA_CATALOG.metadata TO prom_writer;
GRANT SELECT ON TABLE SCHEMA_CATALOG.metadata TO prom_reader;

CREATE INDEX IF NOT EXISTS metadata_index ON SCHEMA_CATALOG.metadata
(
metric_family, last_seen
);

0 comments on commit 29a04f8

Please sign in to comment.