Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy: Only generate debug messages in debug mode #6228

Merged
merged 3 commits into from Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -28,6 +28,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6201](https://github.com/thanos-io/thanos/pull/6201) Query-Frontend: Disable absent and absent_over_time for vertical sharding.
- [#6212](https://github.com/thanos-io/thanos/pull/6212) Query-Frontend: Disable scalar for vertical sharding.
- [#6107](https://github.com/thanos-io/thanos/pull/6082) Change default user id in container image from 0(root) to 1001
- [#6228](https://github.com/thanos-io/thanos/pull/6228) Conditionally generate debug messages in ProxyStore to avoid memory bloat.

### Removed

Expand Down
11 changes: 9 additions & 2 deletions cmd/thanos/query.go
Expand Up @@ -229,7 +229,7 @@ func registerQuery(app *extkingpin.App) {
var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
return errors.Wrap(err, "parse federation labels")
Expand Down Expand Up @@ -278,6 +278,7 @@ func registerQuery(app *extkingpin.App) {
return runQuery(
g,
logger,
debugLogging,
reg,
tracer,
httpLogOpts,
Expand Down Expand Up @@ -353,6 +354,7 @@ func registerQuery(app *extkingpin.App) {
func runQuery(
g *run.Group,
logger log.Logger,
debugLogging bool,
reg *prometheus.Registry,
tracer opentracing.Tracer,
httpLogOpts []logging.Option,
Expand Down Expand Up @@ -490,6 +492,11 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
}

var (
endpoints = query.NewEndpointSet(
time.Now,
Expand Down Expand Up @@ -541,7 +548,7 @@ func runQuery(
endpointInfoTimeout,
queryConnMetricLabels...,
)
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy))
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients)
Expand Down
10 changes: 9 additions & 1 deletion cmd/thanos/receive.go
Expand Up @@ -58,7 +58,7 @@ func registerReceive(app *extkingpin.App) {
conf := &receiveConfig{}
conf.registerFlag(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
lset, err := parseFlagLabels(conf.labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
Expand Down Expand Up @@ -97,6 +97,7 @@ func registerReceive(app *extkingpin.App) {
return runReceive(
g,
logger,
debugLogging,
reg,
tracer,
grpcLogOpts, tagOpts,
Expand All @@ -113,6 +114,7 @@ func registerReceive(app *extkingpin.App) {
func runReceive(
g *run.Group,
logger log.Logger,
debugLogging bool,
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcLogOpts []grpc_logging.Option,
Expand Down Expand Up @@ -305,6 +307,11 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
}

proxy := store.NewProxyStore(
logger,
reg,
Expand All @@ -313,6 +320,7 @@ func runReceive(
labels.Labels{},
0,
store.LazyRetrieval,
options...,
)
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
Expand Down
42 changes: 35 additions & 7 deletions pkg/store/proxy.go
Expand Up @@ -76,6 +76,7 @@ type ProxyStore struct {
responseTimeout time.Duration
metrics *proxyStoreMetrics
retrievalStrategy RetrievalStrategy
debugLogging bool
}

type proxyStoreMetrics struct {
Expand All @@ -99,6 +100,16 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
}
}

// BucketStoreOption are functions that configure BucketStore.
type ProxyStoreOption func(s *ProxyStore)

// WithProxyStoreDebugLogging enables debug logging.
func WithProxyStoreDebugLogging() ProxyStoreOption {
return func(s *ProxyStore) {
s.debugLogging = true
}
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
Expand All @@ -109,6 +120,7 @@ func NewProxyStore(
selectorLabels labels.Labels,
responseTimeout time.Duration,
retrievalStrategy RetrievalStrategy,
options ...ProxyStoreOption,
) *ProxyStore {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -128,6 +140,11 @@ func NewProxyStore(
metrics: metrics,
retrievalStrategy: retrievalStrategy,
}

for _, option := range options {
option(s)
}

return s
}

Expand Down Expand Up @@ -273,7 +290,9 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
for _, st := range s.stores() {
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(srv.Context(), st, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason))
}
continue
}

Expand All @@ -289,8 +308,9 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.

for _, st := range stores {
st := st

storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
saswatamcode marked this conversation as resolved.
Show resolved Hide resolved
}

respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses)
if err != nil {
Expand Down Expand Up @@ -412,10 +432,14 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques

// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(gctx, st, r.Start, r.End); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
}
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
}

g.Go(func() error {
resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{
Expand Down Expand Up @@ -486,10 +510,14 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ

// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(gctx, st, r.Start, r.End); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
}
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
}

g.Go(func() error {
resp, err := st.LabelValues(gctx, &storepb.LabelValuesRequest{
Expand Down