diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 61c1603b577..e422bb7045e 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -173,6 +173,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. r.Get("/label/:name/values", instr("label_values", api.labelValues)) r.Get("/series", instr("series", api.series)) + + r.Get("/labels", instr("label_names", api.labelNames)) } type queryData struct { @@ -611,3 +613,35 @@ func parseDuration(s string) (time.Duration, error) { } return 0, fmt.Errorf("cannot parse %q to a valid duration", s) } + +func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { + ctx := r.Context() + + enablePartialResponse, apiErr := api.parsePartialResponseParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + + var ( + warnmtx sync.Mutex + warnings []error + ) + warningReporter := func(err error) { + warnmtx.Lock() + warnings = append(warnings, err) + warnmtx.Unlock() + } + + q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64) + if err != nil { + return nil, nil, &ApiError{errorExec, err} + } + defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelNames") + + names, err := q.LabelNames() + if err != nil { + return nil, nil, &ApiError{errorExec, err} + } + + return names, warnings, nil +} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 819ff3ac2a1..07dc3ec71b1 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -265,9 +265,20 @@ func (q *querier) LabelValues(name string) ([]string, error) { } // LabelNames returns all the unique label names present in the block in sorted order. -// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702. func (q *querier) LabelNames() ([]string, error) { - return nil, errors.New("not implemented") + span, ctx := tracing.StartSpan(q.ctx, "querier_label_names") + defer span.Finish() + + resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse}) + if err != nil { + return nil, errors.Wrap(err, "proxy LabelNames()") + } + + for _, w := range resp.Warnings { + q.warningReporter(errors.New(w)) + } + + return resp.Names, nil } func (q *querier) Close() error { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 775210e3a17..7ca264e0804 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -804,8 +804,37 @@ func chunksSize(chks []storepb.AggrChunk) (size int) { } // LabelNames implements the storepb.StoreServer interface. -func (s *BucketStore) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") +func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + g, gctx := errgroup.WithContext(ctx) + + s.mtx.RLock() + + var mtx sync.Mutex + var sets [][]string + + for _, b := range s.blocks { + indexr := b.indexReader(gctx) + g.Go(func() error { + defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") + + res := indexr.LabelNames() + + mtx.Lock() + sets = append(sets, res) + mtx.Unlock() + + return nil + }) + } + + s.mtx.RUnlock() + + if err := g.Wait(); err != nil { + return nil, status.Error(codes.Aborted, err.Error()) + } + return &storepb.LabelNamesResponse{ + Names: strutil.MergeSlices(sets...), + }, nil } // LabelValues implements the storepb.StoreServer interface. @@ -1537,6 +1566,15 @@ func (r *bucketIndexReader) LabelValues(name string) []string { return res } +// LabelNames returns a list of label names. +func (r *bucketIndexReader) LabelNames() []string { + res := make([]string, 0, len(r.block.lvals)) + for ln, _ := range r.block.lvals { + res = append(res, ln) + } + return res +} + // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index e8fd677f531..edb791e4149 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -339,7 +339,32 @@ func extendLset(lset []storepb.Label, extend labels.Labels) []storepb.Label { func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - return nil, status.Error(codes.Unimplemented, "not implemented") + u := *p.base + u.Path = path.Join(u.Path, "/api/v1/labels") + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, status.Error(codes.Unknown, err.Error()) + } + + span, ctx := tracing.StartSpan(ctx, "/prom_label_names HTTP[client]") + defer span.Finish() + + resp, err := p.client.Do(req.WithContext(ctx)) + if err != nil { + return nil, status.Error(codes.Unknown, err.Error()) + } + defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body") + + var m struct { + Data []string `json:"data"` + } + if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { + return nil, status.Error(codes.Unknown, err.Error()) + } + sort.Strings(m.Data) + + return &storepb.LabelNamesResponse{Names: m.Data}, nil } // LabelValues returns all known label values for a given label name. diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index e9f3cfdc743..0a64813ecfb 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -323,7 +323,52 @@ func storeMatches(s Client, mint, maxt int64, matchers ...storepb.LabelMatcher) func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - return nil, status.Error(codes.Unimplemented, "not implemented") + var ( + warnings []string + names [][]string + mtx sync.Mutex + g, gctx = errgroup.WithContext(ctx) + ) + + stores, err := s.stores(ctx) + if err != nil { + return nil, status.Errorf(codes.Unknown, err.Error()) + } + for _, st := range stores { + st := st + g.Go(func() error { + resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{ + PartialResponseDisabled: r.PartialResponseDisabled, + }) + if err != nil { + err = errors.Wrapf(err, "fetch label names from store %s", st) + if r.PartialResponseDisabled { + return err + } + + mtx.Lock() + warnings = append(warnings, errors.Wrap(err, "fetch label names").Error()) + mtx.Unlock() + return nil + } + + mtx.Lock() + warnings = append(warnings, resp.Warnings...) + names = append(names, resp.Names) + mtx.Unlock() + + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + return &storepb.LabelNamesResponse{ + Names: strutil.MergeUnsortedSlices(names...), + Warnings: warnings, + }, nil } // LabelValues returns all known label values for a given label name. diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index db1819cebbd..b71a06a0d19 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -573,6 +573,45 @@ func TestProxyStore_LabelValues(t *testing.T) { testutil.Equals(t, 1, len(resp.Warnings)) } +func TestProxyStore_LabelNames(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + m1 := &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + Warnings: []string{"warning"}, + }, + } + + m2 := &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "c", "d"}, + }, + } + + cls := []Client{ + &testClient{StoreClient: m1}, + &testClient{StoreClient: m2}, + } + + q := NewProxyStore(nil, + func(context.Context) ([]Client, error) { return cls, nil }, + component.Query, + nil, + ) + + ctx := context.Background() + req := &storepb.LabelNamesRequest{ + PartialResponseDisabled: true, + } + resp, err := q.LabelNames(ctx, req) + testutil.Ok(t, err) + testutil.Assert(t, proto.Equal(req, m1.LastLabelNamesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelNamesReq) + + testutil.Equals(t, []string{"a", "b", "c", "d"}, resp.Names) + testutil.Equals(t, 1, len(resp.Warnings)) +} + type rawSeries struct { lset []storepb.Label samples []sample @@ -716,10 +755,12 @@ func (s *storeSeriesServer) Context() context.Context { type mockedStoreAPI struct { RespSeries []*storepb.SeriesResponse RespLabelValues *storepb.LabelValuesResponse + RespLabelNames *storepb.LabelNamesResponse RespError error LastSeriesReq *storepb.SeriesRequest LastLabelValuesReq *storepb.LabelValuesRequest + LastLabelNamesReq *storepb.LabelNamesRequest } func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ ...grpc.CallOption) (*storepb.InfoResponse, error) { @@ -733,7 +774,9 @@ func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, } func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") + s.LastLabelNamesReq = req + + return s.RespLabelNames, s.RespError } func (s *mockedStoreAPI) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest, _ ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 1a5b5f820bb..bf43d1e9756 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -165,10 +165,20 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb. } // LabelNames returns all known label names. -func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( +func (s *TSDBStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - return nil, status.Error(codes.Unimplemented, "not implemented") + q, err := s.db.Querier(math.MinInt64, math.MaxInt64) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names") + + res, err := q.LabelNames() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &storepb.LabelNamesResponse{Names: res}, nil } // LabelValues returns all known label values for a given label name.