Skip to content

Commit

Permalink
Receive: Allow remote write request limits to be defined per file and…
Browse files Browse the repository at this point in the history
… tenant (thanos-io#5565)

* Allow per-tenant limits to be configured via file

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Refactor Receive's limiting logic

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Fix some methods that were in plural

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Improve metric description

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Add a TODO for later

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Do some cleanup after moving limits to config file

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Isolate rest of limiting logic from the handler

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Small refactor to the request limiter

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Rename MergeWith -> OverlayWith

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Update changelog

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Update documentation

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Add missing copyright notice to few files

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Fix test after change in config file tenants

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Retrigger CI because of bundled-Cortex failing test

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Expose default limits as metrics

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Retrigger CI

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Replace comment with a TODOs

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Fix changelog after bad merge

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>
Signed-off-by: GitHub <noreply@github.com>
Signed-off-by: Prakul Jain <prakul.jain@udaan.com>
  • Loading branch information
douglascamata authored and prajain12 committed Sep 6, 2022
1 parent 3e5a9e7 commit 063b459
Show file tree
Hide file tree
Showing 11 changed files with 583 additions and 201 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -52,6 +52,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5561](https://github.com/thanos-io/thanos/pull/5561) Query Frontend: Support instant query vertical sharding.
- [#5453](https://github.com/thanos-io/thanos/pull/5453) Compact: Skip erroneous empty non `*AggrChunk` chunks during 1h downsampling of 5m resolution blocks.
- [#5607](https://github.com/thanos-io/thanos/pull/5607) Query: Support custom lookback delta from request in query api.
- [#5565](https://github.com/thanos-io/thanos/pull/5565) Receive: Allow remote write request limits to be defined per file and tenant.

### Changed

Expand Down
87 changes: 36 additions & 51 deletions cmd/thanos/receive.go
Expand Up @@ -194,6 +194,18 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of limit configuration")
}
limitsConfig, err = receive.ParseRootLimitConfig(limitsContentYaml)
if err != nil {
return errors.Wrap(err, "parse limit configuration")
}
}

// Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided.
seriesLimitSupported := (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0

Expand All @@ -210,31 +222,28 @@ func runReceive(
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
SeriesLimitSupported: seriesLimitSupported,
MaxPerTenantLimit: conf.maxPerTenantLimit,
MetaMonitoringUrl: conf.metaMonitoringUrl,
MetaMonitoringHttpClient: conf.metaMonitoringHttpClient,
MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery,
WriteSeriesLimit: conf.writeSeriesLimit,
WriteSamplesLimit: conf.writeSamplesLimit,
WriteRequestSizeLimit: conf.writeRequestSizeLimit,
WriteRequestConcurrencyLimit: conf.writeRequestConcurrencyLimit,
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
LimitsConfig: limitsConfig,
SeriesLimitSupported: seriesLimitSupported,
MaxPerTenantLimit: conf.maxPerTenantLimit,
MetaMonitoringUrl: conf.metaMonitoringUrl,
MetaMonitoringHttpClient: conf.metaMonitoringHttpClient,
MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery,
})

grpcProbe := prober.NewGRPC()
Expand Down Expand Up @@ -825,10 +834,7 @@ type receiveConfig struct {
reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent

writeSeriesLimit int64
writeSamplesLimit int64
writeRequestSizeLimit int64
writeRequestConcurrencyLimit int
limitsConfig *extflag.PathOrContent
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -936,28 +942,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

// TODO(douglascamata): Allow all these limits to be configured per tenant
// and move the configuration to a file. Then this is done, remove the
// "hidden" modifier on all these flags.
cmd.Flag("receive.write-request-limits.max-series",
"The maximum amount of series accepted in remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeSeriesLimit)

cmd.Flag("receive.write-request-limits.max-samples",
"The maximum amount of samples accepted in remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeSamplesLimit)

cmd.Flag("receive.write-request-limits.max-size-bytes",
"The maximum size (in bytes) of remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeRequestSizeLimit)

cmd.Flag("receive.write-request-limits.max-concurrency",
"The maximum amount of remote write requests that will be concurrently processed while others wait."+
"The default is no limit, represented by 0.").
Default("0").Hidden().IntVar(&rc.writeRequestConcurrencyLimit)
rc.limitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
71 changes: 59 additions & 12 deletions docs/components/receive.md
Expand Up @@ -84,17 +84,68 @@ Thanos Receive has some limits and gates that can be configured to control resou
- **Limits**: if a request hits any configured limit the client will receive an error response from the server.
- **Gates**: if a request hits a gate without capacity it will wait until the gate's capacity is replenished to be processed. It doesn't trigger an error response from the server.

To configure the gates and limits you can use one of the two options:

- `--receive.limits-config-file=<file-path>`: where `<file-path>` is the path to the YAML file.
- `--receive.limits-config=<content>`: where `<content>` is the content of YAML file.

By default all the limits and gates are **disabled**.

### Understanding the configuration file

The configuration file follows a few standards:

1. The value `0` (zero) is used to explicitly define "there is no limit" (infinite limit).
2. In the configuration of default limits (in the `default` section) or global limits (in the `global` section), a value that is not present means "no limit".
3. In the configuration of per tenant limits (in the `tenants` section), a value that is not present means they are the same as the default.

All the configuration for the remote write endpoint of Receive is contained in the `write` key. Inside it there are 3 subsections:

- `global`: limits and/or gates that are applied considering all the requests.
- `default`: the default values for limits in case a given tenant doesn't have any specified.
- `tenants`: the limits for a given tenant.

From the example configuration below, it's understood that:

1. This Receive instance has a max concurrency of 30.
2. This Receive instance has some default request limits that apply of all tenants, **unless** a given tenant has their own limits (i.e. the `acme` tenant and partially for the `ajax` tenant).
3. Tenant `acme` has no request limits.
4. Tenant `ajax` has a request series limit of 50000 and samples limit of 500. Their request size bytes limit is inherited from the default, 1024 bytes.

The next sections explain what each configuration value means.

```yaml mdox-exec="cat pkg/receive/testdata/limits_config/good_limits.yaml"
write:
global:
max_concurrency: 30
default:
request:
size_bytes_limit: 1024
series_limit: 1000
samples_limit: 10
tenants:
acme:
request:
size_bytes_limit: 0
series_limit: 0
samples_limit: 0
ajax:
request:
series_limit: 50000
samples_limit: 500
```

**IMPORTANT**: this feature is experimental and a work-in-progres. It might change in the near future, i.e. configuration might move to a file (to allow easy configuration of different request limits per tenant) or its structure could change.

### Request limits
### Remote write request limits

Thanos Receive supports setting limits on the incoming remote write request sizes. These limits should help you to prevent a single tenant from being able to send big requests and possibly crash the Receive.

These limits are applied per request and can be configured with the following command line arguments:
These limits are applied per request and can be configured within the `request` key:

- `--receive.write-request-limits.max-size-bytes`: the maximum body size.
- `--receive.write-request-limits.max-series`: the maximum amount of series in a single remote write request.
- `--receive.write-request-limits.max-samples`: the maximum amount of samples in a single remote write request (summed from all series).
- `size_bytes_limit`: the maximum body size.
- `series_limit`: the maximum amount of series in a single remote write request.
- `samples_limit`: the maximum amount of samples in a single remote write request (summed from all series).

Any request above these limits will cause an 413 HTTP response (*Entity Too Large*) and should not be retried without modifications.

Expand All @@ -105,15 +156,11 @@ Future work that can improve this scenario:
- Proper handling of 413 responses in clients, given Receive can somehow communicate which limit was reached.
- Including in the 413 response which are the current limits that apply to the tenant.

By default all these limits are disabled.

## Request gates

The available request gates in Thanos Receive can be configured with the following command line arguments:
### Remote write request gates

- `--receive.write-request-limits.max-concurrency`: the maximum amount of remote write requests that will be concurrently worked on. Any request request that would exceed this limit will be accepted, but wait until the gate allows it to be processed.
The available request gates in Thanos Receive can be configured within the `global` key:

By default all gates are disabled.
- `max_concurrency`: the maximum amount of remote write requests that will be concurrently worked on. Any request request that would exceed this limit will be accepted, but wait until the gate allows it to be processed.

## Active Series Limiting (experimental)

Expand Down
81 changes: 31 additions & 50 deletions pkg/receive/handler.go
Expand Up @@ -21,8 +21,6 @@ import (
extflag "github.com/efficientgo/tools/extkingpin"
"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/httpconfig"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/promclient"
Expand Down Expand Up @@ -89,31 +87,28 @@ var (

// Options for the web Handler.
type Options struct {
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
SeriesLimitSupported bool
MaxPerTenantLimit uint64
MetaMonitoringUrl *url.URL
MetaMonitoringHttpClient *extflag.PathOrContent
MetaMonitoringLimitQuery string
WriteSeriesLimit int64
WriteSamplesLimit int64
WriteRequestSizeLimit int64
WriteRequestConcurrencyLimit int
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
LimitsConfig *RootLimitsConfig
SeriesLimitSupported bool
MaxPerTenantLimit uint64
MetaMonitoringUrl *url.URL
MetaMonitoringHttpClient *extflag.PathOrContent
MetaMonitoringLimitQuery string
}

// activeSeriesLimiter encompasses active series limiting logic.
Expand Down Expand Up @@ -145,8 +140,7 @@ type Handler struct {
writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec

writeGate gate.Gate
requestLimiter requestLimiter
limiter *limiter
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand All @@ -172,13 +166,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Max: 30 * time.Second,
Jitter: true,
},
writeGate: gate.NewNoop(),
requestLimiter: newRequestLimiter(
o.WriteRequestSizeLimit,
o.WriteSeriesLimit,
o.WriteSamplesLimit,
registerer,
),
limiter: newLimiter(o.LimitsConfig, registerer),
forwardRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_forward_requests_total",
Expand Down Expand Up @@ -217,13 +205,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
),
}

if o.WriteRequestConcurrencyLimit > 0 {
h.writeGate = gate.New(
extprom.WrapRegistererWithPrefix("thanos_receive_write_request_concurrent_", registerer),
o.WriteRequestConcurrencyLimit,
)
}

h.forwardRequests.WithLabelValues(labelSuccess)
h.forwardRequests.WithLabelValues(labelError)
h.replications.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -448,15 +429,14 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
tLogger := log.With(h.logger, "tenant", tenant)

tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) {
err = h.writeGate.Start(r.Context())
err = h.limiter.writeGate.Start(r.Context())
})
if err != nil {
level.Error(tLogger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

defer h.writeGate.Done()
defer h.limiter.writeGate.Done()

under, err := h.ActiveSeriesLimit.isUnderLimit(tenant, tLogger)
if err != nil {
Expand All @@ -469,11 +449,12 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

requestLimiter := h.limiter.requestLimiter
// io.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
if r.ContentLength >= 0 {
if !h.requestLimiter.AllowSizeBytes(tenant, r.ContentLength) {
if !requestLimiter.AllowSizeBytes(tenant, r.ContentLength) {
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}
Expand All @@ -493,7 +474,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if !h.requestLimiter.AllowSizeBytes(tenant, int64(len(reqBuf))) {
if !requestLimiter.AllowSizeBytes(tenant, int64(len(reqBuf))) {
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}
Expand Down Expand Up @@ -529,7 +510,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if !h.requestLimiter.AllowSeries(tenant, int64(len(wreq.Timeseries))) {
if !requestLimiter.AllowSeries(tenant, int64(len(wreq.Timeseries))) {
http.Error(w, "too many timeseries", http.StatusRequestEntityTooLarge)
return
}
Expand All @@ -538,7 +519,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
if !h.requestLimiter.AllowSamples(tenant, int64(totalSamples)) {
if !requestLimiter.AllowSamples(tenant, int64(totalSamples)) {
http.Error(w, "too many samples", http.StatusRequestEntityTooLarge)
return
}
Expand Down

0 comments on commit 063b459

Please sign in to comment.