Skip to content

Commit

Permalink
Support label matchers in labels API (#3566)
Browse files Browse the repository at this point in the history
* support label matchers in labels API

Signed-off-by: Ben Ye <yb532204897@gmail.com>

* add more test cases

Signed-off-by: Ben Ye <yb532204897@gmail.com>

* remove comment

Signed-off-by: Ben Ye <yb532204897@gmail.com>

* fix e2e

Signed-off-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
Ben Ye committed Dec 16, 2020
1 parent 7c3c43c commit 2508a70
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 33 deletions.
108 changes: 103 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"math"
"net/http"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -439,16 +440,39 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, false).
var matcherSets [][]*labels.Matcher
for _, s := range r.Form[MatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
defer runutil.CloseWithLogOnErr(qapi.logger, q, "queryable labelValues")

// TODO(fabxc): add back request context.
var (
vals []string
warnings storage.Warnings
)
// TODO(yeya24): push down matchers to Store level.
if len(matcherSets) > 0 {
// Get all series which match matchers.
var sets []storage.SeriesSet
for _, mset := range matcherSets {
s := q.Select(false, nil, mset...)
sets = append(sets, s)
}
vals, warnings, err = labelValuesByMatchers(sets, name)
} else {
vals, warnings, err = q.LabelValues(name)
}

vals, warnings, err := q.LabelValues(name)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -544,14 +568,39 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, false).
var matcherSets [][]*labels.Matcher
for _, s := range r.Form[MatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
defer runutil.CloseWithLogOnErr(qapi.logger, q, "queryable labelNames")

names, warnings, err := q.LabelNames()
var (
names []string
warnings storage.Warnings
)
// TODO(yeya24): push down matchers to Store level.
if len(matcherSets) > 0 {
// Get all series which match matchers.
var sets []storage.SeriesSet
for _, mset := range matcherSets {
s := q.Select(false, nil, mset...)
sets = append(sets, s)
}
names, warnings, err = labelNamesByMatchers(sets)
} else {
names, warnings, err = q.LabelNames()
}

if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -673,3 +722,52 @@ func parseDuration(s string) (time.Duration, error) {
}
return 0, errors.Errorf("cannot parse %q to a valid duration", s)
}

// Modified from https://github.com/eklockare/prometheus/blob/6178-matchers-with-label-values/web/api/v1/api.go#L571-L591.
// labelNamesByMatchers uses matchers to filter out matching series, then label names are extracted.
func labelNamesByMatchers(sets []storage.SeriesSet) ([]string, storage.Warnings, error) {
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
labelNamesSet := make(map[string]struct{})
for set.Next() {
series := set.At()
for _, lb := range series.Labels() {
labelNamesSet[lb.Name] = struct{}{}
}
}

warnings := set.Warnings()
if set.Err() != nil {
return nil, warnings, set.Err()
}
// Convert the map to an array.
labelNames := make([]string, 0, len(labelNamesSet))
for key := range labelNamesSet {
labelNames = append(labelNames, key)
}
sort.Strings(labelNames)
return labelNames, warnings, nil
}

// Modified from https://github.com/eklockare/prometheus/blob/6178-matchers-with-label-values/web/api/v1/api.go#L571-L591.
// LabelValuesByMatchers uses matchers to filter out matching series, then label values are extracted.
func labelValuesByMatchers(sets []storage.SeriesSet, name string) ([]string, storage.Warnings, error) {
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
labelValuesSet := make(map[string]struct{})
for set.Next() {
series := set.At()
labelValue := series.Labels().Get(name)
labelValuesSet[labelValue] = struct{}{}
}

warnings := set.Warnings()
if set.Err() != nil {
return nil, warnings, set.Err()
}
// Convert the map to an array.
labelValues := make([]string, 0, len(labelValuesSet))
for key := range labelValuesSet {
labelValues = append(labelValues, key)
}
sort.Strings(labelValues)
return labelValues, warnings, nil
}
79 changes: 67 additions & 12 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,6 @@ func TestMetadataEndpoints(t *testing.T) {
},
{
endpoint: api.labelNames,
params: map[string]string{
"name": "__name__",
},
response: []string{
"__name__",
"foo",
Expand All @@ -757,9 +754,6 @@ func TestMetadataEndpoints(t *testing.T) {
},
{
endpoint: apiWithLabelLookback.labelNames,
params: map[string]string{
"name": "foo",
},
response: []string{
"__name__",
"foo",
Expand All @@ -773,9 +767,6 @@ func TestMetadataEndpoints(t *testing.T) {
"start": []string{"1970-01-01T00:00:00Z"},
"end": []string{"1970-01-01T00:09:00Z"},
},
params: map[string]string{
"name": "foo",
},
response: []string{
"__name__",
"foo",
Expand All @@ -787,14 +778,78 @@ func TestMetadataEndpoints(t *testing.T) {
"start": []string{"1970-01-01T00:00:00Z"},
"end": []string{"1970-01-01T00:09:00Z"},
},
params: map[string]string{
"name": "foo",
},
response: []string{
"__name__",
"foo",
},
},
// Failed, to parse matchers.
{
endpoint: api.labelNames,
query: url.Values{
"match[]": []string{`{xxxx`},
},
errType: baseAPI.ErrorBadData,
},
// Failed to parse matchers.
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`{xxxx`},
},
params: map[string]string{
"name": "__name__",
},
errType: baseAPI.ErrorBadData,
},
{
endpoint: api.labelNames,
query: url.Values{
"match[]": []string{`test_metric_replica2`},
},
response: []string{"__name__", "foo", "replica1"},
},
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`test_metric_replica2`},
},
params: map[string]string{
"name": "__name__",
},
response: []string{"test_metric_replica2"},
},
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`{foo="bar"}`, `{foo="boo"}`},
},
params: map[string]string{
"name": "__name__",
},
response: []string{"test_metric1", "test_metric2", "test_metric_replica1", "test_metric_replica2"},
},
// No matched series.
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`{foo="yolo"}`},
},
params: map[string]string{
"name": "__name__",
},
response: []string{},
},
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`test_metric_replica2`},
},
params: map[string]string{
"name": "replica1",
},
response: []string{"a"},
},
// Bad name parameter.
{
endpoint: api.labelValues,
Expand Down
10 changes: 8 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,14 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []sto

// LabelNames returns all known label names. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()

if len(matchers) > 0 {
q.Add("match[]", storepb.MatchersToString(matchers...))
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()
Expand All @@ -689,11 +692,14 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, startTime,

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()

if len(matchers) > 0 {
q.Add("match[]", storepb.MatchersToString(matchers...))
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin

// LabelNames returns all known label names.
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base, r.Start, r.End)
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base, nil, r.Start, r.End)
if err != nil {
return nil, err
}
Expand All @@ -499,7 +499,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
return &storepb.LabelValuesResponse{Values: []string{l}}, nil
}

vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, r.Start, r.End)
vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, nil, r.Start, r.End)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestQueryFrontend(t *testing.T) {

t.Run("query frontend splitting works for labels names API", func(t *testing.T) {
// LabelNames and LabelValues API should still work via query frontend.
labelNames(t, ctx, queryFrontend.HTTPEndpoint(), timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelNames(t, ctx, queryFrontend.HTTPEndpoint(), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
return len(res) > 0
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -241,7 +241,7 @@ func TestQueryFrontend(t *testing.T) {
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))),
)

labelNames(t, ctx, queryFrontend.HTTPEndpoint(), timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelNames(t, ctx, queryFrontend.HTTPEndpoint(), nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
return len(res) > 0
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -262,7 +262,7 @@ func TestQueryFrontend(t *testing.T) {
})

t.Run("query frontend splitting works for labels values API", func(t *testing.T) {
labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
return len(res) == 1 && res[0] == "localhost:9090"
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -281,7 +281,7 @@ func TestQueryFrontend(t *testing.T) {
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))),
)

labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
return len(res) == 1 && res[0] == "localhost:9090"
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand Down

0 comments on commit 2508a70

Please sign in to comment.