Skip to content

Commit

Permalink
Implement query sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
fpetkovski committed May 6, 2022
1 parent 5fc3a84 commit d44b9e3
Show file tree
Hide file tree
Showing 39 changed files with 1,843 additions and 252 deletions.
2 changes: 2 additions & 0 deletions cmd/thanos/query_frontend.go
Expand Up @@ -136,6 +136,8 @@ func registerQueryFrontend(app *extkingpin.App) {

cmd.Flag("query-frontend.forward-header", "List of headers forwarded by the query-frontend to downstream queriers, default is empty").PlaceHolder("<http-header-name>").StringsVar(&cfg.ForwardHeaders)

cmd.Flag("query-frontend.num-shards", "Number of queriers to use when sharding PromQL queries").IntVar(&cfg.NumShards)

cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall : Logs the finish call of the requests. LogStartAndFinishCall : Logs the start and finish call of the requests. NoLogCall : Disable request logging.").Default("").EnumVar(&cfg.RequestLoggingDecision, "NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "")
reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd)

Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/receive.go
Expand Up @@ -333,8 +333,9 @@ func setupAndRunGRPCServer(g *run.Group,
if isReady() {
minTime, maxTime := mts.TimeRange()
return &infopb.StoreInfo{
MinTime: minTime,
MaxTime: maxTime,
MinTime: minTime,
MaxTime: maxTime,
SupportsSharding: true,
}
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/sidecar.go
Expand Up @@ -268,8 +268,9 @@ func runSidecar(
if httpProbe.IsReady() {
mint, maxt := promStore.Timestamps()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
}
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/store.go
Expand Up @@ -393,8 +393,9 @@ func runStore(
if httpProbe.IsReady() {
mint, maxt := bs.TimeRange()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
}
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -98,6 +98,8 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

require github.com/stretchr/testify v1.7.0

require (
cloud.google.com/go v0.99.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
Expand Down Expand Up @@ -191,7 +193,6 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/sony/gobreaker v0.4.1 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
go.elastic.co/apm/module/apmhttp v1.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/query/grpc.go
Expand Up @@ -78,6 +78,7 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
request.ShardInfo,
)
qry, err := qe.NewInstantQuery(queryable, request.Query, ts)
if err != nil {
Expand Down Expand Up @@ -145,6 +146,7 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
request.ShardInfo,
)

startTime := time.Unix(request.StartTimeSeconds, 0)
Expand Down
234 changes: 171 additions & 63 deletions pkg/api/query/querypb/query.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/api/query/querypb/query.proto
Expand Up @@ -8,6 +8,7 @@ option go_package = "querypb";

import "gogoproto/gogo.proto";
import "store/storepb/types.proto";
import "store/storepb/rpc.proto";
import "store/storepb/prompb/types.proto";

option (gogoproto.sizer_all) = true;
Expand Down Expand Up @@ -36,6 +37,8 @@ message QueryRequest {
bool enablePartialResponse = 8;
bool enableQueryPushdown = 9;
bool skipChunks = 10;

ShardInfo shard_info = 11;
}

message StoreMatchers {
Expand Down Expand Up @@ -70,6 +73,8 @@ message QueryRangeRequest {
bool enablePartialResponse = 10;
bool enableQueryPushdown = 11;
bool skipChunks = 12;

ShardInfo shard_info = 13;
}

message QueryRangeResponse {
Expand Down
41 changes: 36 additions & 5 deletions pkg/api/query/v1.go
Expand Up @@ -21,6 +21,7 @@ package v1

import (
"context"
"encoding/json"
"math"
"net/http"
"sort"
Expand Down Expand Up @@ -70,6 +71,7 @@ const (
StoreMatcherParam = "storeMatch[]"
Step = "step"
Stats = "stats"
ShardInfoParam = "shard_info"
)

// QueryAPI is an API used by Thanos Querier.
Expand Down Expand Up @@ -295,6 +297,25 @@ func (qapi *QueryAPI) parseStep(r *http.Request, defaultRangeQueryStep time.Dura
return d, nil
}

func (qapi *QueryAPI) parseShardInfo(r *http.Request) (*storepb.ShardInfo, *api.ApiError) {
data := r.FormValue(ShardInfoParam)
if data == "" {
return nil, nil
}

if len(data) == 0 {
return nil, nil
}

var info storepb.ShardInfo
err := json.Unmarshal([]byte(data), &info)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", ShardInfoParam)}
}

return &info, nil
}

func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiError) {
ts, err := parseTimeParam(r, "time", qapi.baseAPI.Now())
if err != nil {
Expand Down Expand Up @@ -338,13 +359,18 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, apiErr
}

shardInfo, apiErr := qapi.parseShardInfo(r)
if apiErr != nil {
return nil, nil, apiErr
}

qe := qapi.queryEngine(maxSourceResolution)

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false), r.FormValue("query"), ts)
qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false, shardInfo), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down Expand Up @@ -451,6 +477,11 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

shardInfo, apiErr := qapi.parseShardInfo(r)
if apiErr != nil {
return nil, nil, apiErr
}

qe := qapi.queryEngine(maxSourceResolution)

// Record the query range requested.
Expand All @@ -461,7 +492,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
defer span.Finish()

qry, err := qe.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false),
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false, shardInfo),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -534,7 +565,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true, nil).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -621,7 +652,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true).
q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true, nil).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -671,7 +702,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true, nil).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down
101 changes: 68 additions & 33 deletions pkg/info/infopb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/info/infopb/rpc.proto
Expand Up @@ -55,6 +55,7 @@ message InfoResponse {
message StoreInfo {
int64 min_time = 1;
int64 max_time = 2;
bool supports_sharding = 3;
}

// RulesInfo holds the metadata related to Rules API exposed by the component.
Expand Down
11 changes: 11 additions & 0 deletions pkg/query/endpointset.go
Expand Up @@ -760,6 +760,17 @@ func (er *endpointRef) TimeRange() (mint, maxt int64) {
return er.metadata.Store.MinTime, er.metadata.Store.MaxTime
}

func (er *endpointRef) SupportsSharding() bool {
er.mtx.RLock()
defer er.mtx.RUnlock()

if er.metadata == nil || er.metadata.Store == nil {
return false
}

return er.metadata.Store.SupportsSharding
}

func (er *endpointRef) String() string {
mint, maxt := er.TimeRange()
return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", er.addr, labelpb.PromLabelSetsToString(er.LabelSets()), mint, maxt)
Expand Down
4 changes: 4 additions & 0 deletions pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go
Expand Up @@ -204,6 +204,10 @@ func (s *storeRef) TimeRange() (int64, int64) {
return s.minTime, s.maxTime
}

func (s *storeRef) SupportsSharding() bool {
return false
}

func (s *storeRef) String() string {
mint, maxt := s.TimeRange()
return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", s.addr, labelpb.PromLabelSetsToString(s.LabelSets()), mint, maxt)
Expand Down

0 comments on commit d44b9e3

Please sign in to comment.