Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Implement APIs for metadata support.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Jun 18, 2021
1 parent ccdc0b1 commit c3d10f5
Show file tree
Hide file tree
Showing 20 changed files with 685 additions and 481 deletions.
2 changes: 1 addition & 1 deletion pkg/api/common.go
Expand Up @@ -204,7 +204,7 @@ func respondQuery(w http.ResponseWriter, res *promql.Result, warnings storage.Wa
}
}

func respond(w http.ResponseWriter, status int, message string) {
func respond(w http.ResponseWriter, status int, message interface{}) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-store")
w.WriteHeader(status)
Expand Down
98 changes: 98 additions & 0 deletions pkg/api/metadata.go
@@ -0,0 +1,98 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package api

import (
"net/http"
"strconv"

"github.com/NYTimes/gziphandler"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/timescale/promscale/pkg/pgclient"
"github.com/timescale/promscale/pkg/pgmodel/metadata"
)

func MetricMetadata(conf *Config, client *pgclient.Client) http.Handler {
hf := corsWrapper(conf, metricMetadataHandler(client))
return gziphandler.GzipHandler(hf)
}

func metricMetadataHandler(client *pgclient.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
respondError(w, http.StatusBadRequest, err, "bad_data")
return
}
var (
metric = r.FormValue("metric")
limitStr = r.FormValue("limit")
limit int64
err error
)
if limitStr != "" {
limit, err = strconv.ParseInt(limitStr, 10, 32)
if err != nil {
respondError(w, http.StatusBadRequest, err, "converting string to integer")
return
}
}
data, err := metadata.MetricMetadata(client.Connection, metric, int(limit))
if err != nil {
respondError(w, http.StatusInternalServerError, err, "fetching metric metadata")
return
}
respond(w, http.StatusOK, data)
}
}

func TargetMetadata(conf *Config, client *pgclient.Client) http.Handler {
hf := corsWrapper(conf, targetMetadataHandler(client))
return gziphandler.GzipHandler(hf)
}

func targetMetadataHandler(client *pgclient.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
respondError(w, http.StatusBadRequest, err, "bad_data")
return
}
var (
metric = r.FormValue("metric")
limitStr = r.FormValue("limit")
matchTarget = r.FormValue("match_target")
limit int64
err error
)
if limitStr != "" {
limit, err = strconv.ParseInt(limitStr, 10, 32)
if err != nil {
respondError(w, http.StatusBadRequest, err, "converting string to integer")
return
}
}
var matchers []*labels.Matcher
if matchTarget != "" {
matchers, err = parser.ParseMetricSelector(matchTarget)
if err != nil {
respondError(w, http.StatusBadRequest, err, "bad_data")
return
}
} else {
matcher, err := labels.NewMatcher(labels.MatchRegexp, "job", ".*")
if err != nil {
respondError(w, http.StatusInternalServerError, err, "creating all jobs matcher")
return
}
matchers = append(matchers, matcher)
}
data, err := metadata.TargetMetadata(client.Connection, matchers, metric, int(limit))
if err != nil {
respondError(w, http.StatusInternalServerError, err, "fetching metric metadata")
return
}
respond(w, http.StatusOK, data)
}
}
8 changes: 8 additions & 0 deletions pkg/api/router.go
Expand Up @@ -82,6 +82,14 @@ func GenerateRouter(apiConf *Config, metrics *Metrics, client *pgclient.Client,
router.Get("/api/v1/labels", labelsHandler)
router.Post("/api/v1/labels", labelsHandler)

targetMetadataHandler := timeHandler(metrics.HTTPRequestDuration, "targets/metadata", TargetMetadata(apiConf, client))
router.Get("/api/v1/targets/metadata", targetMetadataHandler)
router.Post("/api/v1/targets/metadata", targetMetadataHandler)

metadataHandler := timeHandler(metrics.HTTPRequestDuration, "metadata", MetricMetadata(apiConf, client))
router.Get("/api/v1/metadata", metadataHandler)
router.Post("/api/v1/metadata", metadataHandler)

labelValuesHandler := timeHandler(metrics.HTTPRequestDuration, "label/:name/values", LabelValues(apiConf, queryable))
router.Get("/api/v1/label/:name/values", labelValuesHandler)

Expand Down
6 changes: 4 additions & 2 deletions pkg/api/write_test.go
Expand Up @@ -247,6 +247,7 @@ func TestWrite(t *testing.T) {
receivedSamplesGauge := &mockMetric{}
failedSamplesGauge := &mockMetric{}
sentSamplesGauge := &mockMetric{}
sentMetadataGauge := &mockMetric{}
sendBatchHistogram := &mockMetric{}
invalidWriteReqs := &mockMetric{}
mock := &mockInserter{
Expand All @@ -260,6 +261,7 @@ func TestWrite(t *testing.T) {
ReceivedSamples: receivedSamplesGauge,
FailedSamples: failedSamplesGauge,
SentSamples: sentSamplesGauge,
SentMetadata: sentMetadataGauge,
SentBatchDuration: sendBatchHistogram,
InvalidWriteReqs: invalidWriteReqs,
WriteThroughput: util.NewThroughputCalc(time.Second),
Expand Down Expand Up @@ -351,9 +353,9 @@ type mockInserter struct {
err error
}

func (m *mockInserter) Ingest(r *prompb.WriteRequest) (uint64, error) {
func (m *mockInserter) Ingest(r *prompb.WriteRequest) (uint64, int, error) {
m.ts = r.Timeseries
return m.result, m.err
return m.result, 0, m.err
}

func getReader(s string) io.Reader {
Expand Down
4 changes: 2 additions & 2 deletions pkg/migrations/migration_files_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/migrations/sql/idempotent/metric-metadata.sql
Expand Up @@ -59,3 +59,19 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.get_multiple_metric_metadata(TEXT[]) TO prom_writer;

CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_targets(metric_family_name TEXT, series_ids BIGINT[]) RETURNS TABLE (series_id BIGINT, target_keys TEXT[], target_values TEXT[])
AS
$$
DECLARE
metric_table_name TEXT;
q TEXT;
BEGIN
SELECT table_name INTO metric_table_name FROM SCHEMA_CATALOG.metric WHERE metric_name = metric_family_name;
q = FORMAT('select s.id series_id, array_agg(l.key) target_keys, array_agg(l.value) target_values from SCHEMA_CATALOG.label l inner join SCHEMA_DATA_SERIES.%I s on (true) where l.id = any(s.labels) and (key = ''job'' or key = ''instance'') and s.id = any(%L::bigint[]) group by series_id;', metric_table_name, series_ids);
RETURN QUERY EXECUTE q;
END;
$$ language plpgsql;



25 changes: 1 addition & 24 deletions pkg/pgmodel/delete/delete.go
Expand Up @@ -30,7 +30,7 @@ func (pgDel *PgDelete) DeleteSeries(matchers []*labels.Matcher, _, _ time.Time)
err error
metricsTouched = make(map[string]struct{})
)
metricNames, seriesIDMatrix, err := pgDel.getMetricNameSeriesIDFromMatchers(matchers)
metricNames, seriesIDMatrix, err := querier.GetMetricNameSeriesIDFromMatchers(pgDel.Conn, matchers)
if err != nil {
return nil, nil, -1, fmt.Errorf("delete-series: %w", err)
}
Expand All @@ -54,29 +54,6 @@ func (pgDel *PgDelete) DeleteSeries(matchers []*labels.Matcher, _, _ time.Time)
return getKeys(metricsTouched), deletedSeriesIDs, totalRowsDeleted, nil
}

// getMetricNameSeriesIDFromMatchers returns the metric name list and the corresponding series ID array
// as a matrix.
func (pgDel *PgDelete) getMetricNameSeriesIDFromMatchers(matchers []*labels.Matcher) ([]string, [][]model.SeriesID, error) {
cb, err := querier.BuildSubQueries(matchers)
if err != nil {
return nil, nil, fmt.Errorf("delete series build subqueries: %w", err)
}
clauses, values, err := cb.Build(true)
if err != nil {
return nil, nil, fmt.Errorf("delete series build clauses: %w", err)
}
query := querier.BuildMetricNameSeriesIDQuery(clauses)
rows, err := pgDel.Conn.Query(context.Background(), query, values...)
if err != nil {
return nil, nil, fmt.Errorf("build metric name series: %w", err)
}
metricNames, correspondingSeriesIDs, err := querier.GetSeriesPerMetric(rows)
if err != nil {
return nil, nil, fmt.Errorf("series per metric: %w", err)
}
return metricNames, correspondingSeriesIDs, nil
}

func convertSeriesIDsToInt64s(s []model.SeriesID) []int64 {
temp := make([]int64, len(s))
for i := range s {
Expand Down

0 comments on commit c3d10f5

Please sign in to comment.