Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add endpointset flow #4421

Merged
merged 11 commits into from Aug 31, 2021
Merged
63 changes: 24 additions & 39 deletions cmd/thanos/query.go
Expand Up @@ -375,60 +375,45 @@ func runQuery(
)

var (
stores = query.NewStoreSet(
endpoints = query.NewEndpointSet(
logger,
reg,
func() (specs []query.StoreSpec) {

func() (specs []query.EndpointSpec) {
// Add strict & static nodes.
for _, addr := range strictStores {
specs = append(specs, query.NewGRPCStoreSpec(addr, true))
specs = append(specs, query.NewGRPCEndpointSpec(addr, true))
}
// Add DNS resolved addresses from static flags and file SD.

for _, addr := range dnsStoreProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
specs = append(specs, query.NewGRPCEndpointSpec(addr, false))
}
return removeDuplicateStoreSpecs(logger, duplicatedStores, specs)
},
func() (specs []query.RuleSpec) {

for _, addr := range dnsRuleProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
specs = append(specs, query.NewGRPCEndpointSpec(addr, false))
}

// NOTE(s-urbaniak): No need to remove duplicates, as rule apis are a subset of store apis.
// hence, any duplicates will be tracked in the store api set.

return specs
},
func() (specs []query.TargetSpec) {
for _, addr := range dnsTargetProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
specs = append(specs, query.NewGRPCEndpointSpec(addr, false))
}

return specs
},
func() (specs []query.MetadataSpec) {
for _, addr := range dnsMetadataProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
specs = append(specs, query.NewGRPCEndpointSpec(addr, false))
}

return specs
},
func() (specs []query.ExemplarSpec) {
for _, addr := range dnsExemplarProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
specs = append(specs, query.NewGRPCEndpointSpec(addr, false))
}

return specs
return removeDuplicateStoreSpecs(logger, duplicatedStores, specs)
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsStores, selectorLset)
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset)
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
Expand All @@ -454,12 +439,12 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
stores.Update(ctx)
endpoints.Update(ctx)
return nil
})
}, func(error) {
cancel()
stores.Close()
endpoints.Close()
})
}
// Run File Service Discovery and update the store set when the files are modified.
Expand All @@ -486,7 +471,7 @@ func runQuery(
continue
}
fileSDCache.Update(update)
stores.Update(ctxUpdate)
endpoints.Update(ctxUpdate)

if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
Expand Down Expand Up @@ -562,11 +547,11 @@ func runQuery(

ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewQueryAPI(
logger,
stores,
endpoints,
engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta),
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
Expand Down Expand Up @@ -644,8 +629,8 @@ func runQuery(
return nil
}

func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.StoreSpec) []query.StoreSpec {
set := make(map[string]query.StoreSpec)
func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.EndpointSpec) []query.EndpointSpec {
set := make(map[string]query.EndpointSpec)
for _, spec := range specs {
addr := spec.Addr()
if _, ok := set[addr]; ok {
Expand All @@ -654,7 +639,7 @@ func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Co
}
set[addr] = spec
}
deduplicated := make([]query.StoreSpec, 0, len(set))
deduplicated := make([]query.EndpointSpec, 0, len(set))
for _, value := range set {
deduplicated = append(deduplicated, value)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/query/v1.go
Expand Up @@ -94,7 +94,7 @@ type QueryAPI struct {
disableCORS bool

replicaLabels []string
storeSet *query.StoreSet
endpointSet *query.EndpointSet

defaultRangeQueryStep time.Duration
defaultInstantQueryMaxSourceResolution time.Duration
Expand All @@ -106,7 +106,7 @@ type QueryAPI struct {
// NewQueryAPI returns an initialized QueryAPI type.
func NewQueryAPI(
logger log.Logger,
storeSet *query.StoreSet,
endpointSet *query.EndpointSet,
qe func(int64) *promql.Engine,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
Expand Down Expand Up @@ -144,7 +144,7 @@ func NewQueryAPI(
enableTargetPartialResponse: enableTargetPartialResponse,
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
replicaLabels: replicaLabels,
storeSet: storeSet,
endpointSet: endpointSet,
defaultRangeQueryStep: defaultRangeQueryStep,
defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution,
defaultMetadataTimeRange: defaultMetadataTimeRange,
Expand Down Expand Up @@ -701,9 +701,9 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
}

func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiError) {
statuses := make(map[string][]query.StoreStatus)
for _, status := range qapi.storeSet.GetStoreStatus() {
statuses[status.StoreType.String()] = append(statuses[status.StoreType.String()], status)
statuses := make(map[string][]query.EndpointStatus)
for _, status := range qapi.endpointSet.GetEndpointStatus() {
statuses[status.ComponentType.String()] = append(statuses[status.ComponentType.String()], status)
}
return statuses, nil, nil
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/component/component.go
Expand Up @@ -88,6 +88,25 @@ func FromProto(storeType storepb.StoreType) StoreAPI {
}
}

func FromString(storeType string) StoreAPI {
switch storeType {
case "query":
return Query
case "rule":
return Rule
case "sidecar":
return Sidecar
case "store":
return Store
case "receive":
return Receive
case "debug":
return Debug
default:
return UnknownStoreAPI
}
}

var (
Bucket = source{component: component{name: "bucket"}}
Cleanup = source{component: component{name: "cleanup"}}
Expand Down