Skip to content

Commit

Permalink
Expose optional label matcher for label values handler (grafana#8824)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1bcf683)
  • Loading branch information
periklis committed Mar 30, 2023
1 parent cb16328 commit becf1a8
Show file tree
Hide file tree
Showing 18 changed files with 405 additions and 207 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,12 @@

##### Enhancements

* [8824](https://github.com/grafana/loki/pull/8824) **periklis**: Expose optional label matcher for label values handler
* [8852](https://github.com/grafana/loki/pull/8852) **wtchangdm**: Loki: Add `route_randomly` to Redis options.
* [8848](https://github.com/grafana/loki/pull/8848) **dannykopping**: Ruler: add configurable rule evaluation jitter.
* [8752](https://github.com/grafana/loki/pull/8752) **chaudum**: Add query fairness control across actors within a tenant to scheduler, which can be enabled by passing the `X-Loki-Actor-Path` header to the HTTP request of the query.
* [8786](https://github.com/grafana/loki/pull/8786) **DylanGuedes**: Ingester: add new /ingester/prepare_shutdown endpoint.
* [8744](https://github.com/grafana/loki/pull/8744) **dannykopping**: Ruler: remote rule evaluation.
* [8727](https://github.com/grafana/loki/pull/8727) **cstyan** **jeschkies**: Propagate per-request limit header to querier.
* [8682](https://github.com/grafana/loki/pull/8682) **dannykopping**: Add fetched chunk size distribution metric `loki_chunk_fetcher_fetched_size_bytes`.
* [8532](https://github.com/grafana/loki/pull/8532) **justcompile**: Adds Storage Class option to S3 objects
Expand Down
1 change: 1 addition & 0 deletions docs/sources/api/_index.md
Expand Up @@ -483,6 +483,7 @@ It accepts the following query parameters in the URL:
- `start`: The start time for the query as a nanosecond Unix epoch. Defaults to 6 hours ago.
- `end`: The end time for the query as a nanosecond Unix epoch. Defaults to now.
- `since`: A `duration` used to calculate `start` relative to `end`. If `end` is in the future, `start` is calculated as this duration before now. Any value specified for `start` supersedes this parameter.
- `query`: A set of log stream selector that selects the streams to match and return label values for `<name>`. Example: `{"app": "myapp", "environment": "dev"}`

In microservices mode, `/loki/api/v1/label/<name>/values` is exposed by the querier.

Expand Down
11 changes: 10 additions & 1 deletion pkg/ingester/ingester.go
Expand Up @@ -828,7 +828,16 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
if err != nil {
return nil, err
}
resp, err := instance.Label(ctx, req)

var matchers []*labels.Matcher
if req.Query != "" {
matchers, err = syntax.ParseMatchers(req.Query)
if err != nil {
return nil, err
}
}

resp, err := instance.Label(ctx, req, matchers...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/loghttp/labels.go
Expand Up @@ -82,5 +82,7 @@ func ParseLabelQuery(r *http.Request) (*logproto.LabelRequest, error) {
}
req.Start = &start
req.End = &end

req.Query = query(r)
return req, nil
}
319 changes: 188 additions & 131 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/logproto/logproto.proto
Expand Up @@ -119,6 +119,7 @@ message LabelRequest {
(gogoproto.stdtime) = true,
(gogoproto.nullable) = true
];
string query = 5; // Naming this query instead of match because this should be with queryrangebase.Request interface
}

message LabelResponse {
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/metrics.go
Expand Up @@ -177,7 +177,7 @@ func RecordLabelQueryMetrics(
ctx context.Context,
log log.Logger,
start, end time.Time,
label, status string,
label, query, status string,
stats logql_stats.Result,
) {
var (
Expand All @@ -199,6 +199,7 @@ func RecordLabelQueryMetrics(
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"label", label,
"query", query,
"splits", stats.Summary.Splits,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/metrics_test.go
Expand Up @@ -101,7 +101,7 @@ func TestLogLabelsQuery(t *testing.T) {
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordLabelQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, "foo", "200", stats.Result{
RecordLabelQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, "foo", "", "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
ExecTime: 25.25,
Expand All @@ -111,7 +111,7 @@ func TestLogLabelsQuery(t *testing.T) {
})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo splits=0 throughput=100kB total_bytes=100kB total_entries=12\n",
"level=info org_id=foo traceID=%s latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo query= splits=0 throughput=100kB total_bytes=100kB total_entries=12\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/http.go
Expand Up @@ -222,7 +222,7 @@ func (q *QuerierAPI) LabelHandler(w http.ResponseWriter, r *http.Request) {
status, _ = server.ClientHTTPStatusAndError(err)
}

logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, strconv.Itoa(status), statResult)
logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)

if err != nil {
serverutil.WriteError(err, w)
Expand Down
11 changes: 10 additions & 1 deletion pkg/querier/querier.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -364,6 +365,14 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ
return nil, err
}

var matchers []*labels.Matcher
if req.Query != "" {
matchers, err = syntax.ParseMatchers(req.Query)
if err != nil {
return nil, err
}
}

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(ctx, userID)
// TODO: remove this clause once we remove the deprecated query-timeout flag.
Expand Down Expand Up @@ -401,7 +410,7 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ
)

if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name)
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...)
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs")
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/querier/queryrange/codec.go
Expand Up @@ -186,10 +186,6 @@ func (r *LokiLabelNamesRequest) WithQuery(query string) queryrangebase.Request {
return &new
}

func (r *LokiLabelNamesRequest) GetQuery() string {
return ""
}

func (r *LokiLabelNamesRequest) GetStep() int64 {
return 0
}
Expand Down Expand Up @@ -259,6 +255,7 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []
StartTs: *req.Start,
EndTs: *req.End,
Path: r.URL.Path,
Query: req.Query,
}, nil
case IndexStatsOp:
req, err := loghttp.ParseIndexStatsQuery(r)
Expand Down Expand Up @@ -340,6 +337,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http
params := url.Values{
"start": []string{fmt.Sprintf("%d", request.StartTs.UnixNano())},
"end": []string{fmt.Sprintf("%d", request.EndTs.UnixNano())},
"query": []string{request.GetQuery()},
}

u := &url.URL{
Expand Down
35 changes: 35 additions & 0 deletions pkg/querier/queryrange/codec_test.go
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
strings "strings"
"testing"
"time"
Expand Down Expand Up @@ -80,6 +81,15 @@ func Test_codec_DecodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}, false},
{"label_values", func() (*http.Request, error) {
return http.NewRequest(http.MethodGet,
fmt.Sprintf(`/label/test/values?start=%d&end=%d&query={foo="bar"}`, start.UnixNano(), end.UnixNano()), nil)
}, &LokiLabelNamesRequest{
Path: "/label/test/values",
StartTs: start,
EndTs: end,
Query: `{foo="bar"}`,
}, false},
{"index_stats", func() (*http.Request, error) {
return LokiCodec.EncodeRequest(context.Background(), &logproto.IndexStatsRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Expand Down Expand Up @@ -334,20 +344,45 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
Path: "/loki/api/v1/labels/__name__/values",
StartTs: start,
EndTs: end,
Query: `{foo="bar"}`,
}
got, err = LokiCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels/__name__/values", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))

// testing a full roundtrip
req, err = LokiCodec.DecodeRequest(context.TODO(), got, nil)
require.NoError(t, err)
require.Equal(t, toEncode.StartTs, req.(*LokiLabelNamesRequest).StartTs)
require.Equal(t, toEncode.EndTs, req.(*LokiLabelNamesRequest).EndTs)
require.Equal(t, toEncode.Query, req.(*LokiLabelNamesRequest).Query)
require.Equal(t, "/loki/api/v1/labels/__name__/values", req.(*LokiLabelNamesRequest).Path)
}

func Test_codec_labels_DecodeRequest(t *testing.T) {
ctx := context.Background()
u, err := url.Parse(`/loki/api/v1/labels/__name__/values?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`)
require.NoError(t, err)

r := &http.Request{URL: u}
req, err := LokiCodec.DecodeRequest(context.TODO(), r, nil)
require.NoError(t, err)
require.Equal(t, start, req.(*LokiLabelNamesRequest).StartTs)
require.Equal(t, end, req.(*LokiLabelNamesRequest).EndTs)
require.Equal(t, `{foo="bar"}`, req.(*LokiLabelNamesRequest).Query)
require.Equal(t, "/loki/api/v1/labels/__name__/values", req.(*LokiLabelNamesRequest).Path)

got, err := LokiCodec.EncodeRequest(ctx, req)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels/__name__/values", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))
}

func Test_codec_index_stats_EncodeRequest(t *testing.T) {
Expand Down

0 comments on commit becf1a8

Please sign in to comment.