diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f05713a31..8400e52888 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5561](https://github.com/thanos-io/thanos/pull/5561) Query Frontend: Support instant query vertical sharding. - [#5453](https://github.com/thanos-io/thanos/pull/5453) Compact: Skip erroneous empty non `*AggrChunk` chunks during 1h downsampling of 5m resolution blocks. - [#5607](https://github.com/thanos-io/thanos/pull/5607) Query: Support custom lookback delta from request in query api. +- [#5565](https://github.com/thanos-io/thanos/pull/5565) Receive: Allow remote write request limits to be defined per file and tenant. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 140882d35a..a4d52ab1e0 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -194,6 +194,18 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } + var limitsConfig *receive.RootLimitsConfig + if conf.limitsConfig != nil { + limitsContentYaml, err := conf.limitsConfig.Content() + if err != nil { + return errors.Wrap(err, "get content of limit configuration") + } + limitsConfig, err = receive.ParseRootLimitConfig(limitsContentYaml) + if err != nil { + return errors.Wrap(err, "parse limit configuration") + } + } + // Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided. seriesLimitSupported := (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0 @@ -210,31 +222,28 @@ func runReceive( ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ - Writer: writer, - ListenAddress: conf.rwAddress, - Registry: reg, - Endpoint: conf.endpoint, - TenantHeader: conf.tenantHeader, - TenantField: conf.tenantField, - DefaultTenantID: conf.defaultTenantID, - ReplicaHeader: conf.replicaHeader, - ReplicationFactor: conf.replicationFactor, - RelabelConfigs: relabelConfig, - ReceiverMode: receiveMode, - Tracer: tracer, - TLSConfig: rwTLSConfig, - DialOpts: dialOpts, - ForwardTimeout: time.Duration(*conf.forwardTimeout), - TSDBStats: dbs, - SeriesLimitSupported: seriesLimitSupported, - MaxPerTenantLimit: conf.maxPerTenantLimit, - MetaMonitoringUrl: conf.metaMonitoringUrl, - MetaMonitoringHttpClient: conf.metaMonitoringHttpClient, - MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery, - WriteSeriesLimit: conf.writeSeriesLimit, - WriteSamplesLimit: conf.writeSamplesLimit, - WriteRequestSizeLimit: conf.writeRequestSizeLimit, - WriteRequestConcurrencyLimit: conf.writeRequestConcurrencyLimit, + Writer: writer, + ListenAddress: conf.rwAddress, + Registry: reg, + Endpoint: conf.endpoint, + TenantHeader: conf.tenantHeader, + TenantField: conf.tenantField, + DefaultTenantID: conf.defaultTenantID, + ReplicaHeader: conf.replicaHeader, + ReplicationFactor: conf.replicationFactor, + RelabelConfigs: relabelConfig, + ReceiverMode: receiveMode, + Tracer: tracer, + TLSConfig: rwTLSConfig, + DialOpts: dialOpts, + ForwardTimeout: time.Duration(*conf.forwardTimeout), + TSDBStats: dbs, + LimitsConfig: limitsConfig, + SeriesLimitSupported: seriesLimitSupported, + MaxPerTenantLimit: conf.maxPerTenantLimit, + MetaMonitoringUrl: conf.metaMonitoringUrl, + MetaMonitoringHttpClient: conf.metaMonitoringHttpClient, + MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery, }) grpcProbe := prober.NewGRPC() @@ -825,10 +834,7 @@ type receiveConfig struct { reqLogConfig *extflag.PathOrContent relabelConfigPath *extflag.PathOrContent - writeSeriesLimit int64 - writeSamplesLimit int64 - writeRequestSizeLimit int64 - writeRequestConcurrencyLimit int + limitsConfig *extflag.PathOrContent } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -936,28 +942,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) - // TODO(douglascamata): Allow all these limits to be configured per tenant - // and move the configuration to a file. Then this is done, remove the - // "hidden" modifier on all these flags. - cmd.Flag("receive.write-request-limits.max-series", - "The maximum amount of series accepted in remote write requests."+ - "The default is no limit, represented by 0."). - Default("0").Hidden().Int64Var(&rc.writeSeriesLimit) - - cmd.Flag("receive.write-request-limits.max-samples", - "The maximum amount of samples accepted in remote write requests."+ - "The default is no limit, represented by 0."). - Default("0").Hidden().Int64Var(&rc.writeSamplesLimit) - - cmd.Flag("receive.write-request-limits.max-size-bytes", - "The maximum size (in bytes) of remote write requests."+ - "The default is no limit, represented by 0."). - Default("0").Hidden().Int64Var(&rc.writeRequestSizeLimit) - - cmd.Flag("receive.write-request-limits.max-concurrency", - "The maximum amount of remote write requests that will be concurrently processed while others wait."+ - "The default is no limit, represented by 0."). - Default("0").Hidden().IntVar(&rc.writeRequestConcurrencyLimit) + rc.limitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/docs/components/receive.md b/docs/components/receive.md index 303db597f6..c292de6369 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -84,17 +84,68 @@ Thanos Receive has some limits and gates that can be configured to control resou - **Limits**: if a request hits any configured limit the client will receive an error response from the server. - **Gates**: if a request hits a gate without capacity it will wait until the gate's capacity is replenished to be processed. It doesn't trigger an error response from the server. +To configure the gates and limits you can use one of the two options: + +- `--receive.limits-config-file=`: where `` is the path to the YAML file. +- `--receive.limits-config=`: where `` is the content of YAML file. + +By default all the limits and gates are **disabled**. + +### Understanding the configuration file + +The configuration file follows a few standards: + +1. The value `0` (zero) is used to explicitly define "there is no limit" (infinite limit). +2. In the configuration of default limits (in the `default` section) or global limits (in the `global` section), a value that is not present means "no limit". +3. In the configuration of per tenant limits (in the `tenants` section), a value that is not present means they are the same as the default. + +All the configuration for the remote write endpoint of Receive is contained in the `write` key. Inside it there are 3 subsections: + +- `global`: limits and/or gates that are applied considering all the requests. +- `default`: the default values for limits in case a given tenant doesn't have any specified. +- `tenants`: the limits for a given tenant. + +From the example configuration below, it's understood that: + +1. This Receive instance has a max concurrency of 30. +2. This Receive instance has some default request limits that apply of all tenants, **unless** a given tenant has their own limits (i.e. the `acme` tenant and partially for the `ajax` tenant). +3. Tenant `acme` has no request limits. +4. Tenant `ajax` has a request series limit of 50000 and samples limit of 500. Their request size bytes limit is inherited from the default, 1024 bytes. + +The next sections explain what each configuration value means. + +```yaml mdox-exec="cat pkg/receive/testdata/limits_config/good_limits.yaml" +write: + global: + max_concurrency: 30 + default: + request: + size_bytes_limit: 1024 + series_limit: 1000 + samples_limit: 10 + tenants: + acme: + request: + size_bytes_limit: 0 + series_limit: 0 + samples_limit: 0 + ajax: + request: + series_limit: 50000 + samples_limit: 500 +``` + **IMPORTANT**: this feature is experimental and a work-in-progres. It might change in the near future, i.e. configuration might move to a file (to allow easy configuration of different request limits per tenant) or its structure could change. -### Request limits +### Remote write request limits Thanos Receive supports setting limits on the incoming remote write request sizes. These limits should help you to prevent a single tenant from being able to send big requests and possibly crash the Receive. -These limits are applied per request and can be configured with the following command line arguments: +These limits are applied per request and can be configured within the `request` key: -- `--receive.write-request-limits.max-size-bytes`: the maximum body size. -- `--receive.write-request-limits.max-series`: the maximum amount of series in a single remote write request. -- `--receive.write-request-limits.max-samples`: the maximum amount of samples in a single remote write request (summed from all series). +- `size_bytes_limit`: the maximum body size. +- `series_limit`: the maximum amount of series in a single remote write request. +- `samples_limit`: the maximum amount of samples in a single remote write request (summed from all series). Any request above these limits will cause an 413 HTTP response (*Entity Too Large*) and should not be retried without modifications. @@ -105,15 +156,11 @@ Future work that can improve this scenario: - Proper handling of 413 responses in clients, given Receive can somehow communicate which limit was reached. - Including in the 413 response which are the current limits that apply to the tenant. -By default all these limits are disabled. - -## Request gates - -The available request gates in Thanos Receive can be configured with the following command line arguments: +### Remote write request gates -- `--receive.write-request-limits.max-concurrency`: the maximum amount of remote write requests that will be concurrently worked on. Any request request that would exceed this limit will be accepted, but wait until the gate allows it to be processed. +The available request gates in Thanos Receive can be configured within the `global` key: -By default all gates are disabled. +- `max_concurrency`: the maximum amount of remote write requests that will be concurrently worked on. Any request request that would exceed this limit will be accepted, but wait until the gate allows it to be processed. ## Active Series Limiting (experimental) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 32468629e3..1403a51f2a 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -21,8 +21,6 @@ import ( extflag "github.com/efficientgo/tools/extkingpin" "github.com/thanos-io/thanos/pkg/api" statusapi "github.com/thanos-io/thanos/pkg/api/status" - "github.com/thanos-io/thanos/pkg/extprom" - "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/logging" "github.com/thanos-io/thanos/pkg/promclient" @@ -89,31 +87,28 @@ var ( // Options for the web Handler. type Options struct { - Writer *Writer - ListenAddress string - Registry *prometheus.Registry - TenantHeader string - TenantField string - DefaultTenantID string - ReplicaHeader string - Endpoint string - ReplicationFactor uint64 - ReceiverMode ReceiverMode - Tracer opentracing.Tracer - TLSConfig *tls.Config - DialOpts []grpc.DialOption - ForwardTimeout time.Duration - RelabelConfigs []*relabel.Config - TSDBStats TSDBStats - SeriesLimitSupported bool - MaxPerTenantLimit uint64 - MetaMonitoringUrl *url.URL - MetaMonitoringHttpClient *extflag.PathOrContent - MetaMonitoringLimitQuery string - WriteSeriesLimit int64 - WriteSamplesLimit int64 - WriteRequestSizeLimit int64 - WriteRequestConcurrencyLimit int + Writer *Writer + ListenAddress string + Registry *prometheus.Registry + TenantHeader string + TenantField string + DefaultTenantID string + ReplicaHeader string + Endpoint string + ReplicationFactor uint64 + ReceiverMode ReceiverMode + Tracer opentracing.Tracer + TLSConfig *tls.Config + DialOpts []grpc.DialOption + ForwardTimeout time.Duration + RelabelConfigs []*relabel.Config + TSDBStats TSDBStats + LimitsConfig *RootLimitsConfig + SeriesLimitSupported bool + MaxPerTenantLimit uint64 + MetaMonitoringUrl *url.URL + MetaMonitoringHttpClient *extflag.PathOrContent + MetaMonitoringLimitQuery string } // activeSeriesLimiter encompasses active series limiting logic. @@ -145,8 +140,7 @@ type Handler struct { writeSamplesTotal *prometheus.HistogramVec writeTimeseriesTotal *prometheus.HistogramVec - writeGate gate.Gate - requestLimiter requestLimiter + limiter *limiter } func NewHandler(logger log.Logger, o *Options) *Handler { @@ -172,13 +166,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Max: 30 * time.Second, Jitter: true, }, - writeGate: gate.NewNoop(), - requestLimiter: newRequestLimiter( - o.WriteRequestSizeLimit, - o.WriteSeriesLimit, - o.WriteSamplesLimit, - registerer, - ), + limiter: newLimiter(o.LimitsConfig, registerer), forwardRequests: promauto.With(registerer).NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -217,13 +205,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler { ), } - if o.WriteRequestConcurrencyLimit > 0 { - h.writeGate = gate.New( - extprom.WrapRegistererWithPrefix("thanos_receive_write_request_concurrent_", registerer), - o.WriteRequestConcurrencyLimit, - ) - } - h.forwardRequests.WithLabelValues(labelSuccess) h.forwardRequests.WithLabelValues(labelError) h.replications.WithLabelValues(labelSuccess) @@ -448,15 +429,14 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { tLogger := log.With(h.logger, "tenant", tenant) tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) { - err = h.writeGate.Start(r.Context()) + err = h.limiter.writeGate.Start(r.Context()) }) if err != nil { level.Error(tLogger).Log("err", err, "msg", "internal server error") http.Error(w, err.Error(), http.StatusInternalServerError) return } - - defer h.writeGate.Done() + defer h.limiter.writeGate.Done() under, err := h.ActiveSeriesLimit.isUnderLimit(tenant, tLogger) if err != nil { @@ -469,11 +449,12 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } + requestLimiter := h.limiter.requestLimiter // io.ReadAll dynamically adjust the byte slice for read data, starting from 512B. // Since this is receive hot path, grow upfront saving allocations and CPU time. compressed := bytes.Buffer{} if r.ContentLength >= 0 { - if !h.requestLimiter.AllowSizeBytes(tenant, r.ContentLength) { + if !requestLimiter.AllowSizeBytes(tenant, r.ContentLength) { http.Error(w, "write request too large", http.StatusRequestEntityTooLarge) return } @@ -493,7 +474,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } - if !h.requestLimiter.AllowSizeBytes(tenant, int64(len(reqBuf))) { + if !requestLimiter.AllowSizeBytes(tenant, int64(len(reqBuf))) { http.Error(w, "write request too large", http.StatusRequestEntityTooLarge) return } @@ -529,7 +510,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } - if !h.requestLimiter.AllowSeries(tenant, int64(len(wreq.Timeseries))) { + if !requestLimiter.AllowSeries(tenant, int64(len(wreq.Timeseries))) { http.Error(w, "too many timeseries", http.StatusRequestEntityTooLarge) return } @@ -538,7 +519,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { for _, timeseries := range wreq.Timeseries { totalSamples += len(timeseries.Samples) } - if !h.requestLimiter.AllowSamples(tenant, int64(totalSamples)) { + if !requestLimiter.AllowSamples(tenant, int64(totalSamples)) { http.Error(w, "too many samples", http.StatusRequestEntityTooLarge) return } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 77f577ca66..81406b3585 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -774,8 +774,22 @@ func TestReceiveWriteRequestLimits(t *testing.T) { } handlers, _ := newTestHandlerHashring(appendables, 3) handler := handlers[0] - handler.requestLimiter = newRequestLimiter(int64(1*units.Megabyte), 20, 200, nil) tenant := "test" + handler.limiter = newLimiter( + &RootLimitsConfig{ + WriteLimits: writeLimitsConfig{ + TenantsLimits: tenantsWriteLimitsConfig{ + tenant: &writeLimitConfig{ + RequestLimits: newEmptyRequestLimitsConfig(). + SetSizeBytesLimit(int64(1 * units.Megabyte)). + SetSeriesLimit(20). + SetSamplesLimit(200), + }, + }, + }, + }, + nil, + ) wreq := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{}, diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index bd06d5d31b..c90a79ab0f 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -5,92 +5,43 @@ package receive import ( "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/gate" ) -const ( - seriesLimitName = "series" - samplesLimitName = "samples" - sizeBytesLimitName = "body_size" -) - -type requestLimiter struct { - sizeBytesLimit int64 - seriesLimit int64 - samplesLimit int64 - limitsHit *prometheus.SummaryVec - configuredLimits *prometheus.GaugeVec +type limiter struct { + requestLimiter requestLimiter + writeGate gate.Gate + // TODO: extract active series limiting logic into a self-contained type and + // move it here. } -func newRequestLimiter(sizeBytesLimit, seriesLimit, samplesLimit int64, reg prometheus.Registerer) requestLimiter { - limiter := requestLimiter{ - sizeBytesLimit: sizeBytesLimit, - seriesLimit: seriesLimit, - samplesLimit: samplesLimit, - limitsHit: promauto.With(reg).NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: "thanos", - Subsystem: "receive", - Name: "write_limits_hit", - Help: "Summary of how far beyond the limit a refused remote write request was.", - Objectives: map[float64]float64{0.50: 0.1, 0.95: 0.1, 0.99: 0.001}, - }, []string{"tenant", "limit"}, - ), - configuredLimits: promauto.With(reg).NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "thanos", - Subsystem: "receive", - Name: "write_limits", - Help: "The configured write limits.", - }, []string{"limit"}, - ), - } - limiter.configuredLimits.WithLabelValues(sizeBytesLimitName).Set(float64(sizeBytesLimit)) - limiter.configuredLimits.WithLabelValues(seriesLimitName).Set(float64(seriesLimit)) - limiter.configuredLimits.WithLabelValues(samplesLimitName).Set(float64(samplesLimit)) - return limiter +type requestLimiter interface { + AllowSizeBytes(tenant string, contentLengthBytes int64) bool + AllowSeries(tenant string, amount int64) bool + AllowSamples(tenant string, amount int64) bool } -func (l *requestLimiter) AllowSizeBytes(tenant string, contentLengthBytes int64) bool { - if l.sizeBytesLimit <= 0 { - return true - } - - // This happens when the content length is unknown, then we allow it. - if contentLengthBytes < 0 { - return true +func newLimiter(root *RootLimitsConfig, reg prometheus.Registerer) *limiter { + limiter := &limiter{ + writeGate: gate.NewNoop(), + requestLimiter: &noopRequestLimiter{}, } - allowed := l.sizeBytesLimit >= contentLengthBytes - if !allowed { - l.limitsHit. - WithLabelValues(tenant, sizeBytesLimitName). - Observe(float64(contentLengthBytes - l.sizeBytesLimit)) + if root == nil { + return limiter } - return allowed -} -func (l *requestLimiter) AllowSeries(tenant string, amount int64) bool { - if l.seriesLimit <= 0 { - return true - } - allowed := l.seriesLimit >= amount - if !allowed { - l.limitsHit. - WithLabelValues(tenant, seriesLimitName). - Observe(float64(amount - l.seriesLimit)) + maxWriteConcurrency := root.WriteLimits.GlobalLimits.MaxConcurrency + if maxWriteConcurrency > 0 { + limiter.writeGate = gate.New( + extprom.WrapRegistererWithPrefix( + "thanos_receive_write_request_concurrent_", + reg, + ), + int(maxWriteConcurrency), + ) } - return allowed -} + limiter.requestLimiter = newConfigRequestLimiter(reg, &root.WriteLimits) -func (l *requestLimiter) AllowSamples(tenant string, amount int64) bool { - if l.samplesLimit <= 0 { - return true - } - allowed := l.samplesLimit >= amount - if !allowed { - l.limitsHit. - WithLabelValues(tenant, samplesLimitName). - Observe(float64(amount - l.samplesLimit)) - } - return allowed + return limiter } diff --git a/pkg/receive/limiter_config.go b/pkg/receive/limiter_config.go new file mode 100644 index 0000000000..8e84e1653b --- /dev/null +++ b/pkg/receive/limiter_config.go @@ -0,0 +1,96 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "github.com/thanos-io/thanos/pkg/errors" + "gopkg.in/yaml.v2" +) + +// RootLimitsConfig is the root configuration for limits. +type RootLimitsConfig struct { + // WriteLimits hold the limits for writing data. + WriteLimits writeLimitsConfig `yaml:"write"` +} + +// ParseRootLimitConfig parses the root limit configuration. Even though +// the result is a pointer, it will only be nil if an error is returned. +func ParseRootLimitConfig(content []byte) (*RootLimitsConfig, error) { + var root RootLimitsConfig + if err := yaml.UnmarshalStrict(content, &root); err != nil { + return nil, errors.Wrapf(err, "parsing config YAML file") + } + return &root, nil +} + +type writeLimitsConfig struct { + // GlobalLimits are limits that are shared across all tenants. + GlobalLimits globalLimitsConfig `yaml:"global"` + // DefaultLimits are the default limits for tenants without specified limits. + DefaultLimits defaultLimitsConfig `yaml:"default"` + // TenantsLimits are the limits per tenant. + TenantsLimits tenantsWriteLimitsConfig `yaml:"tenants"` +} + +type globalLimitsConfig struct { + // MaxConcurrency represents the maximum concurrency during write operations. + MaxConcurrency int64 `yaml:"max_concurrency"` +} + +type defaultLimitsConfig struct { + // RequestLimits holds the difficult per-request limits. + RequestLimits requestLimitsConfig `yaml:"request"` + // HeadSeriesConfig *headSeriesLimiter `yaml:"head_series"` +} + +type tenantsWriteLimitsConfig map[string]*writeLimitConfig + +// A tenant might not always have limits configured, so things here must +// use pointers. +type writeLimitConfig struct { + // RequestLimits holds the difficult per-request limits. + RequestLimits *requestLimitsConfig `yaml:"request"` + // HeadSeriesConfig *headSeriesLimiter `yaml:"head_series"` +} + +type requestLimitsConfig struct { + SizeBytesLimit *int64 `yaml:"size_bytes_limit"` + SeriesLimit *int64 `yaml:"series_limit"` + SamplesLimit *int64 `yaml:"samples_limit"` +} + +func newEmptyRequestLimitsConfig() *requestLimitsConfig { + return &requestLimitsConfig{} +} + +// OverlayWith overlays the current configuration with another one. This means +// that limit values that are not set (have a nil value) will be overwritten in +// the caller. +func (rl *requestLimitsConfig) OverlayWith(other *requestLimitsConfig) *requestLimitsConfig { + if rl.SamplesLimit == nil { + rl.SamplesLimit = other.SamplesLimit + } + if rl.SeriesLimit == nil { + rl.SeriesLimit = other.SeriesLimit + } + if rl.SizeBytesLimit == nil { + rl.SizeBytesLimit = other.SizeBytesLimit + } + return rl +} + +func (rl *requestLimitsConfig) SetSizeBytesLimit(value int64) *requestLimitsConfig { + rl.SizeBytesLimit = &value + return rl +} + +func (rl *requestLimitsConfig) SetSeriesLimit(value int64) *requestLimitsConfig { + rl.SeriesLimit = &value + return rl +} + +func (rl *requestLimitsConfig) SetSamplesLimit(value int64) *requestLimitsConfig { + rl.SamplesLimit = &value + return rl +} diff --git a/pkg/receive/limiter_config_test.go b/pkg/receive/limiter_config_test.go new file mode 100644 index 0000000000..a58940dd9a --- /dev/null +++ b/pkg/receive/limiter_config_test.go @@ -0,0 +1,64 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "os" + "path" + "testing" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestParseLimiterConfig(t *testing.T) { + tests := []struct { + name string + configFileName string + want *RootLimitsConfig + wantErr bool + }{ + { + name: "Parses a configuration without issues", + configFileName: "good_limits.yaml", + wantErr: false, + want: &RootLimitsConfig{ + WriteLimits: writeLimitsConfig{ + GlobalLimits: globalLimitsConfig{MaxConcurrency: 30}, + DefaultLimits: defaultLimitsConfig{ + RequestLimits: *newEmptyRequestLimitsConfig(). + SetSizeBytesLimit(1024). + SetSeriesLimit(1000). + SetSamplesLimit(10), + }, + TenantsLimits: tenantsWriteLimitsConfig{ + "acme": &writeLimitConfig{ + RequestLimits: newEmptyRequestLimitsConfig(). + SetSizeBytesLimit(0). + SetSeriesLimit(0). + SetSamplesLimit(0), + }, + "ajax": &writeLimitConfig{ + RequestLimits: newEmptyRequestLimitsConfig(). + SetSeriesLimit(50000). + SetSamplesLimit(500), + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filePath := path.Join("testdata", "limits_config", tt.configFileName) + fileContent, err := os.ReadFile(filePath) + if err != nil { + t.Fatalf("couldn't read test limits configuration file '%s': %s", filePath, err) + } + + got, err := ParseRootLimitConfig(fileContent) + testutil.Ok(t, err) + testutil.Equals(t, tt.want, got) + }) + } +} diff --git a/pkg/receive/request_limiter.go b/pkg/receive/request_limiter.go new file mode 100644 index 0000000000..8479c95960 --- /dev/null +++ b/pkg/receive/request_limiter.go @@ -0,0 +1,142 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + seriesLimitName = "series" + samplesLimitName = "samples" + sizeBytesLimitName = "body_size" +) + +var unlimitedRequestLimitsConfig = newEmptyRequestLimitsConfig(). + SetSizeBytesLimit(0). + SetSeriesLimit(0). + SetSamplesLimit(0) + +type configRequestLimiter struct { + tenantLimits map[string]*requestLimitsConfig + cachedDefaultLimits *requestLimitsConfig + limitsHit *prometheus.SummaryVec + configuredLimits *prometheus.GaugeVec +} + +func newConfigRequestLimiter(reg prometheus.Registerer, writeLimits *writeLimitsConfig) *configRequestLimiter { + // Merge the default limits configuration with an unlimited configuration + // to ensure the nils are overwritten with zeroes. + defaultRequestLimits := writeLimits.DefaultLimits.RequestLimits.OverlayWith(unlimitedRequestLimitsConfig) + + // Load up the request limits into a map with the tenant name as key and + // merge with the defaults to provide easy and fast access when checking + // limits. + // The merge with the default happen because a tenant limit that isn't + // present means the value is inherited from the default configuration. + tenantsLimits := writeLimits.TenantsLimits + tenantRequestLimits := make(map[string]*requestLimitsConfig) + for tenant, limitConfig := range tenantsLimits { + tenantRequestLimits[tenant] = limitConfig.RequestLimits.OverlayWith(defaultRequestLimits) + } + + limiter := configRequestLimiter{ + tenantLimits: tenantRequestLimits, + cachedDefaultLimits: defaultRequestLimits, + } + limiter.limitsHit = promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "write_limits_hit", + Help: "Summary of how much beyond the limit a refused remote write request was.", + Objectives: map[float64]float64{0.50: 0.1, 0.95: 0.1, 0.99: 0.001}, + }, []string{"tenant", "limit"}, + ) + limiter.configuredLimits = promauto.With(reg).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "write_limits", + Help: "The configured write limits.", + }, []string{"tenant", "limit"}, + ) + for tenant, limits := range tenantRequestLimits { + limiter.configuredLimits.WithLabelValues(tenant, sizeBytesLimitName).Set(float64(*limits.SizeBytesLimit)) + limiter.configuredLimits.WithLabelValues(tenant, seriesLimitName).Set(float64(*limits.SeriesLimit)) + limiter.configuredLimits.WithLabelValues(tenant, samplesLimitName).Set(float64(*limits.SamplesLimit)) + } + limiter.configuredLimits.WithLabelValues("", sizeBytesLimitName).Set(float64(*defaultRequestLimits.SizeBytesLimit)) + limiter.configuredLimits.WithLabelValues("", seriesLimitName).Set(float64(*defaultRequestLimits.SeriesLimit)) + limiter.configuredLimits.WithLabelValues("", samplesLimitName).Set(float64(*defaultRequestLimits.SamplesLimit)) + + return &limiter +} + +func (l *configRequestLimiter) AllowSizeBytes(tenant string, contentLengthBytes int64) bool { + limit := l.limitsFor(tenant).SizeBytesLimit + if *limit <= 0 { + return true + } + + allowed := *limit >= contentLengthBytes + if !allowed { + l.limitsHit. + WithLabelValues(tenant, sizeBytesLimitName). + Observe(float64(contentLengthBytes - *limit)) + } + return allowed +} + +func (l *configRequestLimiter) AllowSeries(tenant string, amount int64) bool { + limit := l.limitsFor(tenant).SeriesLimit + if *limit <= 0 { + return true + } + + allowed := *limit >= amount + if !allowed { + l.limitsHit. + WithLabelValues(tenant, seriesLimitName). + Observe(float64(amount - *limit)) + } + return allowed +} + +func (l *configRequestLimiter) AllowSamples(tenant string, amount int64) bool { + limit := l.limitsFor(tenant).SamplesLimit + if *limit <= 0 { + return true + } + allowed := *limit >= amount + if !allowed { + l.limitsHit. + WithLabelValues(tenant, samplesLimitName). + Observe(float64(amount - *limit)) + } + return allowed +} + +func (l *configRequestLimiter) limitsFor(tenant string) *requestLimitsConfig { + limits, ok := l.tenantLimits[tenant] + if !ok { + limits = l.cachedDefaultLimits + } + return limits +} + +type noopRequestLimiter struct{} + +func (l *noopRequestLimiter) AllowSizeBytes(tenant string, contentLengthBytes int64) bool { + return true +} + +func (l *noopRequestLimiter) AllowSeries(tenant string, amount int64) bool { + return true +} + +func (l *noopRequestLimiter) AllowSamples(tenant string, amount int64) bool { + return true +} diff --git a/pkg/receive/limiter_test.go b/pkg/receive/request_limiter_test.go similarity index 51% rename from pkg/receive/limiter_test.go rename to pkg/receive/request_limiter_test.go index 05cf6bdeb3..fc5a968c73 100644 --- a/pkg/receive/limiter_test.go +++ b/pkg/receive/request_limiter_test.go @@ -3,11 +3,65 @@ package receive -import "testing" +import ( + "testing" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestRequestLimiter_limitsFor(t *testing.T) { + tenantWithLimits := "limited-tenant" + tenantWithoutLimits := "unlimited-tenant" + + limits := writeLimitsConfig{ + DefaultLimits: defaultLimitsConfig{ + RequestLimits: *newEmptyRequestLimitsConfig(). + SetSeriesLimit(10), + }, + TenantsLimits: tenantsWriteLimitsConfig{ + tenantWithLimits: &writeLimitConfig{ + RequestLimits: newEmptyRequestLimitsConfig(). + SetSeriesLimit(30), + }, + }, + } + tests := []struct { + name string + tenant string + wantLimits *requestLimitsConfig + }{ + { + name: "Gets the default limits when tenant's limits aren't present", + tenant: tenantWithoutLimits, + wantLimits: newEmptyRequestLimitsConfig(). + SetSeriesLimit(10). + SetSamplesLimit(0). + SetSizeBytesLimit(0), + }, + { + name: "Gets the tenant's limits when it is present", + tenant: tenantWithLimits, + wantLimits: newEmptyRequestLimitsConfig(). + SetSeriesLimit(30). + SetSamplesLimit(0). + SetSizeBytesLimit(0), + }, + } + + requestLimiter := newConfigRequestLimiter(nil, &limits) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + limits := requestLimiter.limitsFor(tt.tenant) + testutil.Equals(t, tt.wantLimits, limits) + }) + } +} func TestRequestLimiter_AllowRequestBodySizeBytes(t *testing.T) { tests := []struct { name string + defaultLimits *requestLimitsConfig sizeByteLimit int64 sizeBytes int64 want bool @@ -45,10 +99,19 @@ func TestRequestLimiter_AllowRequestBodySizeBytes(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - l := newRequestLimiter(tt.sizeByteLimit, 0, 0, nil) - if got := l.AllowSizeBytes("tenant", tt.sizeBytes); got != tt.want { - t.Errorf("unexpected AllowRequestBodySizeBytes result (body size: %v), got %v, want %v", tt.sizeBytes, got, tt.want) + tenant := "tenant" + limits := writeLimitsConfig{ + DefaultLimits: defaultLimitsConfig{ + RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), + }, + TenantsLimits: tenantsWriteLimitsConfig{ + tenant: &writeLimitConfig{ + RequestLimits: newEmptyRequestLimitsConfig().SetSizeBytesLimit(tt.sizeByteLimit), + }, + }, } + l := newConfigRequestLimiter(nil, &limits) + testutil.Equals(t, tt.want, l.AllowSizeBytes(tenant, tt.sizeBytes)) }) } } @@ -93,10 +156,20 @@ func TestRequestLimiter_AllowSeries(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - l := newRequestLimiter(0, tt.seriesLimit, 0, nil) - if got := l.AllowSeries("tenant", tt.series); got != tt.want { - t.Errorf("unexpected AllowSeries result (series: %v), got %v, want %v", tt.series, got, tt.want) + tenant := "tenant" + limits := writeLimitsConfig{ + DefaultLimits: defaultLimitsConfig{ + RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), + }, + TenantsLimits: tenantsWriteLimitsConfig{ + tenant: &writeLimitConfig{ + RequestLimits: newEmptyRequestLimitsConfig().SetSeriesLimit(tt.seriesLimit), + }, + }, } + + l := newConfigRequestLimiter(nil, &limits) + testutil.Equals(t, tt.want, l.AllowSeries(tenant, tt.series)) }) } } @@ -141,10 +214,20 @@ func TestRequestLimiter_AllowSamples(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - l := newRequestLimiter(0, 0, tt.samplesLimit, nil) - if got := l.AllowSamples("tenant", tt.samples); got != tt.want { - t.Errorf("unexpected AllowSamples result (samples: %v), got %v, want %v", tt.samples, got, tt.want) + tenant := "tenant" + limits := writeLimitsConfig{ + DefaultLimits: defaultLimitsConfig{ + RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), + }, + TenantsLimits: tenantsWriteLimitsConfig{ + tenant: &writeLimitConfig{ + RequestLimits: newEmptyRequestLimitsConfig().SetSamplesLimit(tt.samplesLimit), + }, + }, } + + l := newConfigRequestLimiter(nil, &limits) + testutil.Equals(t, tt.want, l.AllowSamples("tenant", tt.samples)) }) } } diff --git a/pkg/receive/testdata/limits_config/good_limits.yaml b/pkg/receive/testdata/limits_config/good_limits.yaml new file mode 100644 index 0000000000..66017cd59c --- /dev/null +++ b/pkg/receive/testdata/limits_config/good_limits.yaml @@ -0,0 +1,18 @@ +write: + global: + max_concurrency: 30 + default: + request: + size_bytes_limit: 1024 + series_limit: 1000 + samples_limit: 10 + tenants: + acme: + request: + size_bytes_limit: 0 + series_limit: 0 + samples_limit: 0 + ajax: + request: + series_limit: 50000 + samples_limit: 500