Skip to content

Commit

Permalink
Add rate limits to Store servers
Browse files Browse the repository at this point in the history
This commit implements a RateLimited store server which can be used
to apply various limits to Series calls in components that implement
the Store API.

Rate limits are disabled by default but can be enabled selectively
for each individual Thanos component.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jan 26, 2023
1 parent 13827c1 commit a4b2d9e
Show file tree
Hide file tree
Showing 13 changed files with 289 additions and 19 deletions.
8 changes: 7 additions & 1 deletion cmd/thanos/query.go
Expand Up @@ -205,6 +205,9 @@ func registerQuery(app *extkingpin.App) {
queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Int64List()
queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Int64List()

var storeRateLimits store.RateLimits
storeRateLimits.RegisterFlags(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
Expand Down Expand Up @@ -321,6 +324,7 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
promqlEngineType(*promqlEngine),
storeRateLimits,
)
})
}
Expand Down Expand Up @@ -397,6 +401,7 @@ func runQuery(
queryTelemetrySamplesQuantiles []int64,
queryTelemetrySeriesQuantiles []int64,
promqlEngine promqlEngineType,
storeRateLimits store.RateLimits,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -759,9 +764,10 @@ func runQuery(
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution)
storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(store.RegisterStoreServer(storeServer)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
Expand Down
14 changes: 8 additions & 6 deletions cmd/thanos/receive.go
Expand Up @@ -210,8 +210,8 @@ func runReceive(
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
if conf.writeLimitsConfig != nil {
limitsContentYaml, err := conf.writeLimitsConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of limit configuration")
}
Expand All @@ -220,7 +220,7 @@ func runReceive(
return errors.Wrap(err, "parse limit configuration")
}
}
limiter, err := receive.NewLimiter(conf.limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))
if err != nil {
return errors.Wrap(err, "creating limiter")
}
Expand Down Expand Up @@ -314,7 +314,7 @@ func runReceive(
0,
store.LazyRetrieval,
)
mts := store.NewInstrumentedStoreServer(reg, proxy)
mts := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
WriteableStoreServer: webHandler,
Expand Down Expand Up @@ -798,12 +798,14 @@ type receiveConfig struct {
reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent

limitsConfig *extflag.PathOrContent
writeLimitsConfig *extflag.PathOrContent
storeRateLimits store.RateLimits
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd)
rc.grpcBindAddr, rc.grpcGracePeriod, rc.grpcCert, rc.grpcKey, rc.grpcClientCA, rc.grpcMaxConnAge = extkingpin.RegisterGRPCFlags(cmd)
rc.storeRateLimits.RegisterFlags(cmd)

cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Default("0.0.0.0:19291").StringVar(&rc.rwAddress)
Expand Down Expand Up @@ -915,7 +917,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

rc.limitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
5 changes: 4 additions & 1 deletion cmd/thanos/rule.go
Expand Up @@ -94,6 +94,7 @@ type ruleConfig struct {
dataDir string
lset labels.Labels
ignoredLabelNames []string
storeRateLimits store.RateLimits
}

func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -103,6 +104,7 @@ func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.shipper.registerFlag(cmd)
rc.query.registerFlag(cmd)
rc.alertmgr.registerFlag(cmd)
rc.storeRateLimits.RegisterFlags(cmd)
}

// registerRule registers a rule command.
Expand Down Expand Up @@ -634,7 +636,8 @@ func runRule(
return nil
}),
)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore))))
storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), conf.storeRateLimits)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer)))
}

options = append(options, grpcserver.WithServer(
Expand Down
23 changes: 13 additions & 10 deletions cmd/thanos/sidecar.go
Expand Up @@ -282,8 +282,9 @@ func runSidecar(
info.WithMetricMetadataInfoFunc(),
)

storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, promStore), conf.storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(store.NewInstrumentedStoreServer(reg, promStore))),
grpcserver.WithServer(store.RegisterStoreServer(storeServer)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
Expand Down Expand Up @@ -474,15 +475,16 @@ func (s *promMetadata) Version() string {
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
storeRateLimits store.RateLimits
}

func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -494,6 +496,7 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
sc.objStore = *extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
sc.shipper.registerFlag(cmd)
sc.storeRateLimits.RegisterFlags(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
}
5 changes: 4 additions & 1 deletion cmd/thanos/store.go
Expand Up @@ -58,6 +58,7 @@ type storeConfig struct {
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
seriesBatchSize int
storeRateLimits store.RateLimits
maxSampleCount uint64
maxTouchedSeriesCount uint64
maxDownloadedBytes units.Base2Bytes
Expand All @@ -84,6 +85,7 @@ type storeConfig struct {
func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.httpConfig = *sc.httpConfig.registerFlag(cmd)
sc.grpcConfig = *sc.grpcConfig.registerFlag(cmd)
sc.storeRateLimits.RegisterFlags(cmd)

cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar.").
Default("./data").StringVar(&sc.dataDir)
Expand Down Expand Up @@ -428,8 +430,9 @@ func runStore(
return errors.Wrap(err, "setup gRPC server")
}

storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, bs), conf.storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(store.NewInstrumentedStoreServer(reg, bs))),
grpcserver.WithServer(store.RegisterStoreServer(storeServer)),
grpcserver.WithServer(info.RegisterInfoServer(infoSrv)),
grpcserver.WithListen(conf.grpcConfig.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)),
Expand Down
8 changes: 8 additions & 0 deletions docs/components/query.md
Expand Up @@ -429,6 +429,14 @@ Flags:
that are always used, even if the health check
fails. Useful if you have a caching layer on
top.
--store.grpc.chunks-limit=0
The maximum chunks allowed for a single Series
request, The Series call fails if this limit is
exceeded. 0 means no limit.
--store.grpc.series-limit=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--store.response-timeout=0ms
If a Store doesn't send any data in this
specified duration then a Store will be ignored
Expand Down
8 changes: 8 additions & 0 deletions docs/components/receive.md
Expand Up @@ -332,6 +332,14 @@ Flags:
Path to YAML file with request logging
configuration. See format details:
https://thanos.io/tip/thanos/logging.md/#configuration
--store.grpc.chunks-limit=0
The maximum chunks allowed for a single Series
request, The Series call fails if this limit is
exceeded. 0 means no limit.
--store.grpc.series-limit=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
8 changes: 8 additions & 0 deletions docs/components/rule.md
Expand Up @@ -453,6 +453,14 @@ Flags:
Works only if compaction is disabled on
Prometheus. Do it once and then disable the
flag when done.
--store.grpc.chunks-limit=0
The maximum chunks allowed for a single Series
request, The Series call fails if this limit is
exceeded. 0 means no limit.
--store.grpc.series-limit=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
8 changes: 8 additions & 0 deletions docs/components/sidecar.md
Expand Up @@ -174,6 +174,14 @@ Flags:
Works only if compaction is disabled on
Prometheus. Do it once and then disable the
flag when done.
--store.grpc.chunks-limit=0
The maximum chunks allowed for a single Series
request, The Series call fails if this limit is
exceeded. 0 means no limit.
--store.grpc.series-limit=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
8 changes: 8 additions & 0 deletions docs/components/store.md
Expand Up @@ -162,12 +162,20 @@ Flags:
If true, Store Gateway will lazy memory map
index-header only once the block is required by
a query.
--store.grpc.chunks-limit=0
The maximum chunks allowed for a single Series
request, The Series call fails if this limit is
exceeded. 0 means no limit.
--store.grpc.downloaded-bytes-limit=0
Maximum amount of downloaded (either
fetched or touched) bytes in a single
Series/LabelNames/LabelValues call. The Series
call fails if this limit is exceeded. 0 means
no limit.
--store.grpc.series-limit=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--store.grpc.series-sample-limit=0
Expand Down
99 changes: 99 additions & 0 deletions pkg/store/ratelimit.go
@@ -0,0 +1,99 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

type RateLimits struct {
SeriesPerRequest uint64
ChunksPerRequest uint64
}

func (l *RateLimits) RegisterFlags(cmd extkingpin.FlagClause) {
cmd.Flag("store.grpc.series-limit", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest)
cmd.Flag("store.grpc.chunks-limit", "The maximum chunks allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest)
}

// rateLimitedStoreServer is a storepb.StoreServer that can apply rate limits against Series requests.
type rateLimitedStoreServer struct {
storepb.StoreServer
rateLimits RateLimits
}

// NewRateLimitedStoreServer creates a new rateLimitedStoreServer.
func NewRateLimitedStoreServer(store storepb.StoreServer, rateLimits RateLimits) storepb.StoreServer {
return &rateLimitedStoreServer{
StoreServer: store,
rateLimits: rateLimits,
}
}

func (s *rateLimitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
rateLimitedSrv := newRateLimitedServer(newInstrumentedServer(srv), s.rateLimits)
if err := s.StoreServer.Series(req, rateLimitedSrv); err != nil {
return err
}

return nil
}

// rateLimitedServer is a storepb.Store_SeriesServer that tracks statistics about sent series.
type rateLimitedServer struct {
*instrumentedServer
limiters []limiterFunc
}

func newRateLimitedServer(upstream *instrumentedServer, rateLimits RateLimits) *rateLimitedServer {
limiters := []limiterFunc{
seriesLimiter(upstream, rateLimits.SeriesPerRequest),
chunksLimiter(upstream, rateLimits.ChunksPerRequest),
}

return &rateLimitedServer{
instrumentedServer: upstream,
limiters: limiters,
}
}

func (i *rateLimitedServer) Send(response *storepb.SeriesResponse) error {
for _, limiter := range i.limiters {
if err := limiter(response); err != nil {
return err
}
}
return i.instrumentedServer.Send(response)
}

type limiterFunc func(response *storepb.SeriesResponse) error

func nopLimiter(*storepb.SeriesResponse) error { return nil }

func seriesLimiter(server *instrumentedServer, limit uint64) limiterFunc {
if limit == 0 {
return nopLimiter
}
return func(response *storepb.SeriesResponse) error {
if server.seriesSent >= float64(limit) {
return errors.Errorf("store series limit of %d exceeded", limit)
}
return nil
}
}

func chunksLimiter(server *instrumentedServer, limit uint64) limiterFunc {
if limit == 0 {
return nopLimiter
}
return func(response *storepb.SeriesResponse) error {
if server.chunksSent >= float64(limit) {
return errors.Errorf("store chunks limit of %d exceeded", limit)
}
return nil
}
}

0 comments on commit a4b2d9e

Please sign in to comment.