From 24e1cc0faf219049174020955f8e3c8251106d87 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Mon, 17 Oct 2022 17:52:00 +0200 Subject: [PATCH] Receive: Reload tenant limit configuration on file change (#5673) * Create a PathOrContent reloader Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Add docs to staticPathContent.Rewrite Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Run goimports Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Properly cancel the context in the test Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Watch parent directory of file This helps handling deletes and other situations. Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove useless ctx.Done() Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Add a debounce timer to config reload It helps managing situations where a create event is followed by a write or when a big file write is sent by the fsnotify backend as many write events. Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix event.Op bitmask check Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Update lastReload Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix debouncer for path content reloader Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Improve documentation of the PathContentRealoder Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Dain reload timer before resetting Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Run tests in parallel Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Simplify debouncing logic Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Add more tests to file reloader Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Simplify condition for triggering reload Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Use absolute path to config file Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Get rid of parallel test Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Put back 2s wait between fs operations Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove useless sleep Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Stop reloadTimer when context cancelled Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove unused fucntion Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Add missing copyright to test file Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Auto-reload tenant limit config on file changes Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Wrap error when reloading config Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Move limiter config reloader and update logs Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Get rid of useless types and allocations Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove errorChan from config reload starter Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Retrigger CI Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Use UnRegisterer in the Limiter To ensure that limit reloads will be able to re-register their metrics. Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Better guard against nil registerer in the limiter Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove wrong nil guard Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Retrigger CI Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> Signed-off-by: utukj --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 46 +++-- docs/components/receive.md | 2 +- go.mod | 8 +- go.sum | 4 +- pkg/extkingpin/path_content_reloader.go | 128 ++++++++++++ pkg/extkingpin/path_content_reloader_test.go | 105 ++++++++++ pkg/receive/handler.go | 22 +- pkg/receive/handler_test.go | 38 ++-- pkg/receive/limiter.go | 189 ++++++++++++++++-- pkg/receive/limiter_config.go | 4 +- pkg/receive/limiter_config_test.go | 6 +- pkg/receive/limiter_test.go | 100 +++++++++ pkg/receive/request_limiter.go | 31 +-- pkg/receive/request_limiter_test.go | 20 +- pkg/receive/testdata/limits.yaml | 22 ++ .../limits_config/invalid_limits.yaml | 17 ++ 17 files changed, 646 insertions(+), 97 deletions(-) create mode 100644 pkg/extkingpin/path_content_reloader.go create mode 100644 pkg/extkingpin/path_content_reloader_test.go create mode 100644 pkg/receive/limiter_test.go create mode 100644 pkg/receive/testdata/limits.yaml create mode 100644 pkg/receive/testdata/limits_config/invalid_limits.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ed82d6525..6e1d2143c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5734](https://github.com/thanos-io/thanos/pull/5734) Store: Support disable block viewer UI. - [#5411](https://github.com/thanos-io/thanos/pull/5411) Tracing: Add OpenTelemetry Protocol exporter. - [#5779](https://github.com/thanos-io/thanos/pull/5779) Objstore: Support specifying S3 storage class. +- [#5673](https://github.com/thanos-io/thanos/pull/5673) Receive: Reload tenant limit configuration on file change. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 5c47b91dd5..d86b560983 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -192,19 +192,6 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } - var limitsConfig *receive.RootLimitsConfig - if conf.limitsConfig != nil { - limitsContentYaml, err := conf.limitsConfig.Content() - if err != nil { - return errors.Wrap(err, "get content of limit configuration") - } - limitsConfig, err = receive.ParseRootLimitConfig(limitsContentYaml) - if err != nil { - return errors.Wrap(err, "parse limit configuration") - } - } - limiter := receive.NewLimiter(limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter")) - dbs := receive.NewMultiTSDB( conf.dataDir, logger, @@ -217,6 +204,23 @@ func runReceive( hashFunc, ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) + + var limitsConfig *receive.RootLimitsConfig + if conf.limitsConfig != nil { + limitsContentYaml, err := conf.limitsConfig.Content() + if err != nil { + return errors.Wrap(err, "get content of limit configuration") + } + limitsConfig, err = receive.ParseRootLimitConfig(limitsContentYaml) + if err != nil { + return errors.Wrap(err, "parse limit configuration") + } + } + limiter, err := receive.NewLimiter(conf.limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter")) + if err != nil { + return errors.Wrap(err, "creating limiter") + } + webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ Writer: writer, ListenAddress: conf.rwAddress, @@ -399,6 +403,22 @@ func runReceive( }) } + { + if limiter.CanReload() { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + level.Debug(logger).Log("msg", "limits config initialized with file watcher.") + if err := limiter.StartConfigReloader(ctx); err != nil { + return err + } + <-ctx.Done() + return nil + }, func(err error) { + cancel() + }) + } + } + level.Info(logger).Log("msg", "starting receiver") return nil } diff --git a/docs/components/receive.md b/docs/components/receive.md index 6fa13938e9..ef4e39e35e 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -86,7 +86,7 @@ Thanos Receive has some limits and gates that can be configured to control resou To configure the gates and limits you can use one of the two options: -- `--receive.limits-config-file=`: where `` is the path to the YAML file. +- `--receive.limits-config-file=`: where `` is the path to the YAML file. Any modification to the indicated file will trigger a configuration reload. If the updated configuration is invalid an error will be logged and it won't replace the previous valid configuration. - `--receive.limits-config=`: where `` is the content of YAML file. By default all the limits and gates are **disabled**. diff --git a/go.mod b/go.mod index 13743c8020..bee3e97fe7 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/dustin/go-humanize v1.0.0 github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a - github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d + github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/fatih/structtag v1.2.0 github.com/felixge/fgprof v0.9.2 @@ -108,6 +108,7 @@ require ( require ( github.com/efficientgo/core v1.0.0-rc.0 + github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd github.com/minio/sha256-simd v1.0.0 ) @@ -127,10 +128,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.10.0 ) -require ( - github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd - go.opentelemetry.io/contrib/propagators/autoprop v0.34.0 -) +require go.opentelemetry.io/contrib/propagators/autoprop v0.34.0 require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 // indirect diff --git a/go.sum b/go.sum index 5ee9bab6be..97fc0d0411 100644 --- a/go.sum +++ b/go.sum @@ -252,8 +252,8 @@ github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a h1:cnJajqeh/Hjv github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a/go.mod h1:Hi+sz0REtlhVZ8zcdeTC3j6LUEEpJpPtNjOaOKuNcgI= github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd h1:svR6KxSP1xiPw10RN4Pd7g6BAVkEcNN628PAqZH31mM= github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M= -github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d h1:WZV/mrUyKS9w9r+Jdw+zq/tdGAb5LwB+H37EkMLhEMA= -github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q= +github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd h1:VaYzzXeUbC5fVheskcKVNOyJMEYD+HgrJNzIAg/mRIM= +github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q= github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4= github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM= diff --git a/pkg/extkingpin/path_content_reloader.go b/pkg/extkingpin/path_content_reloader.go new file mode 100644 index 0000000000..68c2cd252c --- /dev/null +++ b/pkg/extkingpin/path_content_reloader.go @@ -0,0 +1,128 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package extkingpin + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" +) + +type fileContent interface { + Content() ([]byte, error) + Path() string +} + +// PathContentReloader starts a file watcher that monitors the file indicated by fileContent.Path() and runs +// reloadFunc whenever a change is detected. +// A debounce timer can be configured via opts to handle situations where many "write" events are received together or +// a "create" event is followed up by a "write" event, for example. Files will be effectively reloaded at the latest +// after 2 times the debounce timer. By default the debouncer timer is 1 second. +// To ensure renames and deletes are properly handled, the file watcher is put at the file's parent folder. See +// https://github.com/fsnotify/fsnotify/issues/214 for more details. +func PathContentReloader(ctx context.Context, fileContent fileContent, logger log.Logger, reloadFunc func(), debounceTime time.Duration) error { + filePath, err := filepath.Abs(fileContent.Path()) + if err != nil { + return errors.Wrap(err, "getting absolute file path") + } + + watcher, err := fsnotify.NewWatcher() + if filePath == "" { + level.Debug(logger).Log("msg", "no path detected for config reload") + } + if err != nil { + return errors.Wrap(err, "creating file watcher") + } + go func() { + var reloadTimer *time.Timer + if debounceTime != 0 { + reloadTimer = time.AfterFunc(debounceTime, func() { + reloadFunc() + level.Debug(logger).Log("msg", "configuration reloaded after debouncing") + }) + } + defer watcher.Close() + for { + select { + case <-ctx.Done(): + if reloadTimer != nil { + reloadTimer.Stop() + } + return + case event := <-watcher.Events: + // fsnotify sometimes sends a bunch of events without name or operation. + // It's unclear what they are and why they are sent - filter them out. + if event.Name == "" { + break + } + // We are watching the file's parent folder (more details on this is done can be found below), but are + // only interested in changed to the target file. Discard every other file as quickly as possible. + if event.Name != filePath { + break + } + // We only react to files being written or created. + // On chmod or remove we have nothing to do. + // On rename we have the old file name (not useful). A create event for the new file will come later. + if event.Op&fsnotify.Write == 0 && event.Op&fsnotify.Create == 0 { + break + } + level.Debug(logger).Log("msg", fmt.Sprintf("change detected for %s", filePath), "eventName", event.Name, "eventOp", event.Op) + if reloadTimer != nil { + reloadTimer.Reset(debounceTime) + } + case err := <-watcher.Errors: + level.Error(logger).Log("msg", "watcher error", "error", err) + } + } + }() + // We watch the file's parent folder and not the file itself to better handle DELETE and RENAME events. Check + // https://github.com/fsnotify/fsnotify/issues/214 for more details. + if err := watcher.Add(path.Dir(filePath)); err != nil { + return errors.Wrapf(err, "adding path %s to file watcher", filePath) + } + return nil +} + +type staticPathContent struct { + content []byte + path string +} + +var _ fileContent = (*staticPathContent)(nil) + +// Content returns the cached content. +func (t *staticPathContent) Content() ([]byte, error) { + return t.content, nil +} + +// Path returns the path to the file that contains the content. +func (t *staticPathContent) Path() string { + return t.path +} + +// NewStaticPathContent creates a new content that can be used to serve a static configuration. It copies the +// configuration from `fromPath` into `destPath` to avoid confusion with file watchers. +func NewStaticPathContent(fromPath string) (*staticPathContent, error) { + content, err := os.ReadFile(fromPath) + if err != nil { + return nil, errors.Wrapf(err, "could not load test content: %s", fromPath) + } + return &staticPathContent{content, fromPath}, nil +} + +// Rewrite rewrites the file backing this staticPathContent and swaps the local content cache. The file writing +// is needed to trigger the file system monitor. +func (t *staticPathContent) Rewrite(newContent []byte) error { + t.content = newContent + // Write the file to ensure possible file watcher reloaders get triggered. + return os.WriteFile(t.path, newContent, 0666) +} diff --git a/pkg/extkingpin/path_content_reloader_test.go b/pkg/extkingpin/path_content_reloader_test.go new file mode 100644 index 0000000000..fb20f83d5c --- /dev/null +++ b/pkg/extkingpin/path_content_reloader_test.go @@ -0,0 +1,105 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package extkingpin + +import ( + "context" + "os" + "path" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestPathContentReloader(t *testing.T) { + type args struct { + runSteps func(t *testing.T, testFile string, pathContent *staticPathContent) + } + tests := []struct { + name string + args args + wantReloads int + }{ + { + name: "Many operations, only rewrite triggers one reload", + args: args{ + runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { + testutil.Ok(t, os.Chmod(testFile, 0777)) + testutil.Ok(t, os.Remove(testFile)) + testutil.Ok(t, pathContent.Rewrite([]byte("test modified"))) + }, + }, + wantReloads: 1, + }, + { + name: "Many operations, only rename triggers one reload", + args: args{ + runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { + testutil.Ok(t, os.Chmod(testFile, 0777)) + testutil.Ok(t, os.Rename(testFile, testFile+".tmp")) + testutil.Ok(t, os.Rename(testFile+".tmp", testFile)) + }, + }, + wantReloads: 1, + }, + { + name: "Many operations, two rewrites trigger two reloads", + args: args{ + runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { + testutil.Ok(t, os.Chmod(testFile, 0777)) + testutil.Ok(t, os.Remove(testFile)) + testutil.Ok(t, pathContent.Rewrite([]byte("test modified"))) + time.Sleep(2 * time.Second) + testutil.Ok(t, pathContent.Rewrite([]byte("test modified again"))) + }, + }, + wantReloads: 1, + }, + { + name: "Chmod doesn't trigger reload", + args: args{ + runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { + testutil.Ok(t, os.Chmod(testFile, 0777)) + }, + }, + wantReloads: 0, + }, + { + name: "Remove doesn't trigger reload", + args: args{ + runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { + testutil.Ok(t, os.Remove(testFile)) + }, + }, + wantReloads: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testFile := path.Join(t.TempDir(), "test") + testutil.Ok(t, os.WriteFile(testFile, []byte("test"), 0666)) + pathContent, err := NewStaticPathContent(testFile) + testutil.Ok(t, err) + + wg := &sync.WaitGroup{} + wg.Add(tt.wantReloads) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + reloadCount := 0 + err = PathContentReloader(ctx, pathContent, log.NewLogfmtLogger(os.Stdout), func() { + reloadCount++ + wg.Done() + }, 100*time.Millisecond) + testutil.Ok(t, err) + + tt.args.runSteps(t, testFile, pathContent) + wg.Wait() + testutil.Equals(t, tt.wantReloads, reloadCount) + }) + } +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 156bb74566..12afb752b8 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -17,10 +17,6 @@ import ( "sync" "time" - "github.com/thanos-io/thanos/pkg/api" - statusapi "github.com/thanos-io/thanos/pkg/api/status" - "github.com/thanos-io/thanos/pkg/logging" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" @@ -35,6 +31,9 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/api" + statusapi "github.com/thanos-io/thanos/pkg/api/status" + "github.com/thanos-io/thanos/pkg/logging" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -99,7 +98,7 @@ type Options struct { ForwardTimeout time.Duration RelabelConfigs []*relabel.Config TSDBStats TSDBStats - Limiter *limiter + Limiter *Limiter } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -124,7 +123,7 @@ type Handler struct { writeSamplesTotal *prometheus.HistogramVec writeTimeseriesTotal *prometheus.HistogramVec - limiter *limiter + Limiter *Limiter } func NewHandler(logger log.Logger, o *Options) *Handler { @@ -150,7 +149,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Max: 30 * time.Second, Jitter: true, }, - limiter: o.Limiter, + Limiter: o.Limiter, forwardRequests: promauto.With(registerer).NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -407,17 +406,18 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { tLogger := log.With(h.logger, "tenant", tenant) + writeGate := h.Limiter.WriteGate() tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) { - err = h.limiter.writeGate.Start(r.Context()) + err = writeGate.Start(r.Context()) }) + defer writeGate.Done() if err != nil { level.Error(tLogger).Log("err", err, "msg", "internal server error") http.Error(w, err.Error(), http.StatusInternalServerError) return } - defer h.limiter.writeGate.Done() - under, err := h.limiter.HeadSeriesLimiter.isUnderLimit(tenant) + under, err := h.Limiter.HeadSeriesLimiter.isUnderLimit(tenant) if err != nil { level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error()) } @@ -428,7 +428,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } - requestLimiter := h.limiter.requestLimiter + requestLimiter := h.Limiter.RequestLimiter() // io.ReadAll dynamically adjust the byte slice for read data, starting from 512B. // Since this is receive hot path, grow upfront saving allocations and CPU time. compressed := bytes.Buffer{} diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 44076de141..4a2a536038 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -13,6 +13,7 @@ import ( "net/http" "net/http/httptest" "os" + "path" "path/filepath" "runtime" "runtime/pprof" @@ -21,6 +22,8 @@ import ( "testing" "time" + "gopkg.in/yaml.v3" + "github.com/alecthomas/units" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" @@ -40,6 +43,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/errutil" + "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -362,6 +366,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin }, } + limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger()) for i := range appendables { h := NewHandler(nil, &Options{ TenantHeader: DefaultTenantHeader, @@ -369,7 +374,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin ReplicationFactor: replicationFactor, ForwardTimeout: 5 * time.Second, Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), - Limiter: NewLimiter(nil, nil, RouterIngestor, nil), + Limiter: limiter, }) handlers = append(handlers, h) h.peers = peers @@ -775,23 +780,28 @@ func TestReceiveWriteRequestLimits(t *testing.T) { } handlers, _ := newTestHandlerHashring(appendables, 3) handler := handlers[0] + tenant := "test" - handler.limiter = NewLimiter( - &RootLimitsConfig{ - WriteLimits: WriteLimitsConfig{ - TenantsLimits: TenantsWriteLimitsConfig{ - tenant: &WriteLimitConfig{ - RequestLimits: newEmptyRequestLimitsConfig(). - SetSizeBytesLimit(int64(1 * units.Megabyte)). - SetSeriesLimit(20). - SetSamplesLimit(200), - }, + tenantConfig, err := yaml.Marshal(&RootLimitsConfig{ + WriteLimits: WriteLimitsConfig{ + TenantsLimits: TenantsWriteLimitsConfig{ + tenant: &WriteLimitConfig{ + RequestLimits: NewEmptyRequestLimitsConfig(). + SetSizeBytesLimit(int64(1 * units.Megabyte)). + SetSeriesLimit(20). + SetSamplesLimit(200), }, }, }, - nil, - RouterIngestor, - log.NewNopLogger(), + }) + if err != nil { + t.Fatal("handler: failed to generate limit configuration") + } + tmpLimitsPath := path.Join(t.TempDir(), "limits.yaml") + testutil.Ok(t, os.WriteFile(tmpLimitsPath, tenantConfig, 0666)) + limitConfig, _ := extkingpin.NewStaticPathContent(tmpLimitsPath) + handler.Limiter, _ = NewLimiter( + limitConfig, nil, RouterIngestor, log.NewNopLogger(), ) wreq := &prompb.WriteRequest{ diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index bc3c4d8358..ff5bbe3199 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -5,59 +5,204 @@ package receive import ( "context" + "fmt" + "sync" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/extkingpin" + + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" ) -type limiter struct { - requestLimiter requestLimiter - writeGate gate.Gate - HeadSeriesLimiter headSeriesLimiter +// Limiter is responsible for managing the configuration and initialization of +// different types that apply limits to the Receive instance. +type Limiter struct { + sync.RWMutex + requestLimiter requestLimiter + HeadSeriesLimiter headSeriesLimiter + writeGate gate.Gate + registerer prometheus.Registerer + configPathOrContent fileContent + logger log.Logger + configReloadCounter prometheus.Counter + configReloadFailedCounter prometheus.Counter + receiverMode ReceiverMode +} + +// headSeriesLimiter encompasses active/head series limiting logic. +type headSeriesLimiter interface { + QueryMetaMonitoring(context.Context) error + isUnderLimit(tenant string) (bool, error) } -// requestLimiter encompasses logic for limiting remote write requests. type requestLimiter interface { AllowSizeBytes(tenant string, contentLengthBytes int64) bool AllowSeries(tenant string, amount int64) bool AllowSamples(tenant string, amount int64) bool } -// headSeriesLimiter encompasses active/head series limiting logic. -type headSeriesLimiter interface { - QueryMetaMonitoring(context.Context) error - isUnderLimit(tenant string) (bool, error) +// fileContent is an interface to avoid a direct dependency on kingpin or extkingpin. +type fileContent interface { + Content() ([]byte, error) + Path() string } -func NewLimiter(root *RootLimitsConfig, reg prometheus.Registerer, r ReceiverMode, logger log.Logger) *limiter { - limiter := &limiter{ +// NewLimiter creates a new *Limiter given a configuration and prometheus +// registerer. +func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger) (*Limiter, error) { + limiter := &Limiter{ writeGate: gate.NewNoop(), requestLimiter: &noopRequestLimiter{}, HeadSeriesLimiter: NewNopSeriesLimit(), + logger: logger, + receiverMode: r, + } + + if reg != nil { + limiter.registerer = NewUnRegisterer(reg) + limiter.configReloadCounter = promauto.With(limiter.registerer).NewCounter( + prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "limits_config_reload_total", + Help: "How many times the limit configuration was reloaded", + }, + ) + limiter.configReloadFailedCounter = promauto.With(limiter.registerer).NewCounter( + prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "limits_config_reload_err_total", + Help: "How many times the limit configuration failed to reload.", + }, + ) + } + + if configFile == nil { + return limiter, nil + } + + limiter.configPathOrContent = configFile + if err := limiter.loadConfig(); err != nil { + return nil, errors.Wrap(err, "load tenant limits config") + } + + return limiter, nil +} + +// StartConfigReloader starts the automatic configuration reloader based off of +// the file indicated by pathOrContent. It starts a Go routine in the given +// *run.Group. +func (l *Limiter) StartConfigReloader(ctx context.Context) error { + if !l.CanReload() { + return nil } - if root == nil { - return limiter + + return extkingpin.PathContentReloader(ctx, l.configPathOrContent, l.logger, func() { + level.Info(l.logger).Log("msg", "reloading limit config") + if err := l.loadConfig(); err != nil { + if failedReload := l.configReloadCounter; failedReload != nil { + failedReload.Inc() + } + errMsg := fmt.Sprintf("error reloading tenant limits config from %s", l.configPathOrContent.Path()) + level.Error(l.logger).Log("msg", errMsg, "err", err) + } + if reloadCounter := l.configReloadCounter; reloadCounter != nil { + reloadCounter.Inc() + } + }, 1*time.Second) +} + +func (l *Limiter) CanReload() bool { + if l.configPathOrContent == nil { + return false } + if l.configPathOrContent.Path() == "" { + return false + } + return true +} - maxWriteConcurrency := root.WriteLimits.GlobalLimits.MaxConcurrency +func (l *Limiter) loadConfig() error { + config, err := ParseLimitConfigContent(l.configPathOrContent) + if err != nil { + return err + } + l.Lock() + defer l.Unlock() + maxWriteConcurrency := config.WriteLimits.GlobalLimits.MaxConcurrency if maxWriteConcurrency > 0 { - limiter.writeGate = gate.New( + l.writeGate = gate.New( extprom.WrapRegistererWithPrefix( "thanos_receive_write_request_concurrent_", - reg, + l.registerer, ), int(maxWriteConcurrency), ) } - limiter.requestLimiter = newConfigRequestLimiter(reg, &root.WriteLimits) - - // Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided. - seriesLimitSupported := (r == RouterOnly || r == RouterIngestor) && (len(root.WriteLimits.TenantsLimits) != 0 || root.WriteLimits.DefaultLimits.HeadSeriesLimit != 0) + l.requestLimiter = newConfigRequestLimiter( + l.registerer, + &config.WriteLimits, + ) + seriesLimitSupported := (l.receiverMode == RouterOnly || l.receiverMode == RouterIngestor) && (len(config.WriteLimits.TenantsLimits) != 0 || config.WriteLimits.DefaultLimits.HeadSeriesLimit != 0) if seriesLimitSupported { - limiter.HeadSeriesLimiter = NewHeadSeriesLimit(root.WriteLimits, reg, logger) + l.HeadSeriesLimiter = NewHeadSeriesLimit(config.WriteLimits, l.registerer, l.logger) } + return nil +} + +// RequestLimiter is a safe getter for the request limiter. +func (l *Limiter) RequestLimiter() requestLimiter { + l.RLock() + defer l.RUnlock() + return l.requestLimiter +} + +// WriteGate is a safe getter for the write gate. +func (l *Limiter) WriteGate() gate.Gate { + l.RLock() + defer l.RUnlock() + return l.writeGate +} + +// ParseLimitConfigContent parses the limit configuration from the path or +// content. +func ParseLimitConfigContent(limitsConfig fileContent) (*RootLimitsConfig, error) { + if limitsConfig == nil { + return &RootLimitsConfig{}, nil + } + limitsContentYaml, err := limitsConfig.Content() + if err != nil { + return nil, errors.Wrap(err, "get content of limit configuration") + } + parsedConfig, err := ParseRootLimitConfig(limitsContentYaml) + if err != nil { + return nil, errors.Wrap(err, "parse limit configuration") + } + return parsedConfig, nil +} + +type nopConfigContent struct{} + +var _ fileContent = (*nopConfigContent)(nil) + +// Content returns no content and no error. +func (n nopConfigContent) Content() ([]byte, error) { + return nil, nil +} + +// Path returns an empty path. +func (n nopConfigContent) Path() string { + return "" +} - return limiter +// NewNopConfig creates a no-op config content (no configuration). +func NewNopConfig() nopConfigContent { + return nopConfigContent{} } diff --git a/pkg/receive/limiter_config.go b/pkg/receive/limiter_config.go index 67aa5ef93a..c3bd330b6e 100644 --- a/pkg/receive/limiter_config.go +++ b/pkg/receive/limiter_config.go @@ -78,6 +78,7 @@ type DefaultLimitsConfig struct { HeadSeriesLimit uint64 `yaml:"head_series_limit"` } +// TenantsWriteLimitsConfig is a map of tenant IDs to their *WriteLimitConfig. type TenantsWriteLimitsConfig map[string]*WriteLimitConfig // A tenant might not always have limits configured, so things here must @@ -110,8 +111,7 @@ type requestLimitsConfig struct { SamplesLimit *int64 `yaml:"samples_limit"` } -// Utils for initializing. -func newEmptyRequestLimitsConfig() *requestLimitsConfig { +func NewEmptyRequestLimitsConfig() *requestLimitsConfig { return &requestLimitsConfig{} } diff --git a/pkg/receive/limiter_config_test.go b/pkg/receive/limiter_config_test.go index b080680162..3e32ea41e8 100644 --- a/pkg/receive/limiter_config_test.go +++ b/pkg/receive/limiter_config_test.go @@ -35,7 +35,7 @@ func TestParseLimiterConfig(t *testing.T) { }, }, DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *newEmptyRequestLimitsConfig(). + RequestLimits: *NewEmptyRequestLimitsConfig(). SetSizeBytesLimit(1024). SetSeriesLimit(1000). SetSamplesLimit(10), @@ -44,7 +44,7 @@ func TestParseLimiterConfig(t *testing.T) { TenantsLimits: TenantsWriteLimitsConfig{ "acme": NewEmptyWriteLimitConfig(). SetRequestLimits( - newEmptyRequestLimitsConfig(). + NewEmptyRequestLimitsConfig(). SetSizeBytesLimit(0). SetSeriesLimit(0). SetSamplesLimit(0), @@ -52,7 +52,7 @@ func TestParseLimiterConfig(t *testing.T) { SetHeadSeriesLimit(2000), "ajax": NewEmptyWriteLimitConfig(). SetRequestLimits( - newEmptyRequestLimitsConfig(). + NewEmptyRequestLimitsConfig(). SetSeriesLimit(50000). SetSamplesLimit(500), ), diff --git a/pkg/receive/limiter_test.go b/pkg/receive/limiter_test.go new file mode 100644 index 0000000000..be7e8790c1 --- /dev/null +++ b/pkg/receive/limiter_test.go @@ -0,0 +1,100 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "context" + "os" + "path" + "testing" + "time" + + "github.com/thanos-io/thanos/pkg/extkingpin" + + "github.com/efficientgo/tools/core/pkg/testutil" + "github.com/go-kit/log" +) + +func TestLimiter_StartConfigReloader(t *testing.T) { + origLimitsFile, err := os.ReadFile(path.Join("testdata", "limits_config", "good_limits.yaml")) + testutil.Ok(t, err) + copyLimitsFile := path.Join(t.TempDir(), "limits.yaml") + testutil.Ok(t, os.WriteFile(copyLimitsFile, origLimitsFile, 0666)) + + goodLimits, err := extkingpin.NewStaticPathContent(copyLimitsFile) + if err != nil { + t.Fatalf("error trying to save static limit config: %s", err) + } + invalidLimitsPath := path.Join("./testdata", "limits_config", "invalid_limits.yaml") + invalidLimits, err := os.ReadFile(invalidLimitsPath) + if err != nil { + t.Fatalf("could not load test content at %s: %s", invalidLimitsPath, err) + } + + limiter, err := NewLimiter(goodLimits, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout)) + testutil.Ok(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = limiter.StartConfigReloader(ctx) + testutil.Ok(t, err) + + time.Sleep(1 * time.Second) + testutil.Ok(t, goodLimits.Rewrite(invalidLimits)) +} + +type emptyPathFile struct{} + +func (e emptyPathFile) Content() ([]byte, error) { + return []byte{}, nil +} + +func (e emptyPathFile) Path() string { + return "" +} + +func TestLimiter_CanReload(t *testing.T) { + validLimitsPath, err := extkingpin.NewStaticPathContent( + path.Join("testdata", "limits_config", "good_limits.yaml"), + ) + testutil.Ok(t, err) + emptyLimitsPath := emptyPathFile{} + + type args struct { + configFilePath fileContent + } + tests := []struct { + name string + args args + wantReload bool + }{ + { + name: "Nil config file path cannot be reloaded", + args: args{configFilePath: nil}, + wantReload: false, + }, + { + name: "Empty config file path cannot be reloaded", + args: args{configFilePath: emptyLimitsPath}, + wantReload: false, + }, + { + name: "Valid config file path can be reloaded", + args: args{configFilePath: validLimitsPath}, + wantReload: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + configFile := tt.args.configFilePath + limiter, err := NewLimiter(configFile, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout)) + testutil.Ok(t, err) + if tt.wantReload { + testutil.Assert(t, limiter.CanReload()) + } else { + testutil.Assert(t, !limiter.CanReload()) + } + }) + } +} diff --git a/pkg/receive/request_limiter.go b/pkg/receive/request_limiter.go index de7554de2f..7da0c64a6d 100644 --- a/pkg/receive/request_limiter.go +++ b/pkg/receive/request_limiter.go @@ -14,7 +14,7 @@ const ( sizeBytesLimitName = "body_size" ) -var unlimitedRequestLimitsConfig = newEmptyRequestLimitsConfig(). +var unlimitedRequestLimitsConfig = NewEmptyRequestLimitsConfig(). SetSizeBytesLimit(0). SetSeriesLimit(0). SetSamplesLimit(0) @@ -49,7 +49,12 @@ func newConfigRequestLimiter(reg prometheus.Registerer, writeLimits *WriteLimits tenantLimits: tenantRequestLimits, cachedDefaultLimits: defaultRequestLimits, } - limiter.limitsHit = promauto.With(reg).NewSummaryVec( + limiter.registerMetrics(reg) + return &limiter +} + +func (l *configRequestLimiter) registerMetrics(reg prometheus.Registerer) { + l.limitsHit = promauto.With(reg).NewSummaryVec( prometheus.SummaryOpts{ Namespace: "thanos", Subsystem: "receive", @@ -58,7 +63,7 @@ func newConfigRequestLimiter(reg prometheus.Registerer, writeLimits *WriteLimits Objectives: map[float64]float64{0.50: 0.1, 0.95: 0.1, 0.99: 0.001}, }, []string{"tenant", "limit"}, ) - limiter.configuredLimits = promauto.With(reg).NewGaugeVec( + l.configuredLimits = promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ Namespace: "thanos", Subsystem: "receive", @@ -66,16 +71,14 @@ func newConfigRequestLimiter(reg prometheus.Registerer, writeLimits *WriteLimits Help: "The configured write limits.", }, []string{"tenant", "limit"}, ) - for tenant, limits := range tenantRequestLimits { - limiter.configuredLimits.WithLabelValues(tenant, sizeBytesLimitName).Set(float64(*limits.SizeBytesLimit)) - limiter.configuredLimits.WithLabelValues(tenant, seriesLimitName).Set(float64(*limits.SeriesLimit)) - limiter.configuredLimits.WithLabelValues(tenant, samplesLimitName).Set(float64(*limits.SamplesLimit)) + for tenant, limits := range l.tenantLimits { + l.configuredLimits.WithLabelValues(tenant, sizeBytesLimitName).Set(float64(*limits.SizeBytesLimit)) + l.configuredLimits.WithLabelValues(tenant, seriesLimitName).Set(float64(*limits.SeriesLimit)) + l.configuredLimits.WithLabelValues(tenant, samplesLimitName).Set(float64(*limits.SamplesLimit)) } - limiter.configuredLimits.WithLabelValues("", sizeBytesLimitName).Set(float64(*defaultRequestLimits.SizeBytesLimit)) - limiter.configuredLimits.WithLabelValues("", seriesLimitName).Set(float64(*defaultRequestLimits.SeriesLimit)) - limiter.configuredLimits.WithLabelValues("", samplesLimitName).Set(float64(*defaultRequestLimits.SamplesLimit)) - - return &limiter + l.configuredLimits.WithLabelValues("", sizeBytesLimitName).Set(float64(*l.cachedDefaultLimits.SizeBytesLimit)) + l.configuredLimits.WithLabelValues("", seriesLimitName).Set(float64(*l.cachedDefaultLimits.SeriesLimit)) + l.configuredLimits.WithLabelValues("", samplesLimitName).Set(float64(*l.cachedDefaultLimits.SamplesLimit)) } func (l *configRequestLimiter) AllowSizeBytes(tenant string, contentLengthBytes int64) bool { @@ -100,7 +103,7 @@ func (l *configRequestLimiter) AllowSeries(tenant string, amount int64) bool { } allowed := *limit >= amount - if !allowed { + if !allowed && l.limitsHit != nil { l.limitsHit. WithLabelValues(tenant, seriesLimitName). Observe(float64(amount - *limit)) @@ -114,7 +117,7 @@ func (l *configRequestLimiter) AllowSamples(tenant string, amount int64) bool { return true } allowed := *limit >= amount - if !allowed { + if !allowed && l.limitsHit != nil { l.limitsHit. WithLabelValues(tenant, samplesLimitName). Observe(float64(amount - *limit)) diff --git a/pkg/receive/request_limiter_test.go b/pkg/receive/request_limiter_test.go index e654cd1cdf..dfbea066d9 100644 --- a/pkg/receive/request_limiter_test.go +++ b/pkg/receive/request_limiter_test.go @@ -15,12 +15,12 @@ func TestRequestLimiter_limitsFor(t *testing.T) { limits := WriteLimitsConfig{ DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *newEmptyRequestLimitsConfig(). + RequestLimits: *NewEmptyRequestLimitsConfig(). SetSeriesLimit(10), }, TenantsLimits: TenantsWriteLimitsConfig{ tenantWithLimits: &WriteLimitConfig{ - RequestLimits: newEmptyRequestLimitsConfig(). + RequestLimits: NewEmptyRequestLimitsConfig(). SetSeriesLimit(30), }, }, @@ -33,7 +33,7 @@ func TestRequestLimiter_limitsFor(t *testing.T) { { name: "Gets the default limits when tenant's limits aren't present", tenant: tenantWithoutLimits, - wantLimits: newEmptyRequestLimitsConfig(). + wantLimits: NewEmptyRequestLimitsConfig(). SetSeriesLimit(10). SetSamplesLimit(0). SetSizeBytesLimit(0), @@ -41,7 +41,7 @@ func TestRequestLimiter_limitsFor(t *testing.T) { { name: "Gets the tenant's limits when it is present", tenant: tenantWithLimits, - wantLimits: newEmptyRequestLimitsConfig(). + wantLimits: NewEmptyRequestLimitsConfig(). SetSeriesLimit(30). SetSamplesLimit(0). SetSizeBytesLimit(0), @@ -102,11 +102,11 @@ func TestRequestLimiter_AllowRequestBodySizeBytes(t *testing.T) { tenant := "tenant" limits := WriteLimitsConfig{ DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), + RequestLimits: *NewEmptyRequestLimitsConfig().SetSeriesLimit(10), }, TenantsLimits: TenantsWriteLimitsConfig{ tenant: &WriteLimitConfig{ - RequestLimits: newEmptyRequestLimitsConfig().SetSizeBytesLimit(tt.sizeByteLimit), + RequestLimits: NewEmptyRequestLimitsConfig().SetSizeBytesLimit(tt.sizeByteLimit), }, }, } @@ -159,11 +159,11 @@ func TestRequestLimiter_AllowSeries(t *testing.T) { tenant := "tenant" limits := WriteLimitsConfig{ DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), + RequestLimits: *NewEmptyRequestLimitsConfig().SetSeriesLimit(10), }, TenantsLimits: TenantsWriteLimitsConfig{ tenant: &WriteLimitConfig{ - RequestLimits: newEmptyRequestLimitsConfig().SetSeriesLimit(tt.seriesLimit), + RequestLimits: NewEmptyRequestLimitsConfig().SetSeriesLimit(tt.seriesLimit), }, }, } @@ -217,11 +217,11 @@ func TestRequestLimiter_AllowSamples(t *testing.T) { tenant := "tenant" limits := WriteLimitsConfig{ DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), + RequestLimits: *NewEmptyRequestLimitsConfig().SetSeriesLimit(10), }, TenantsLimits: TenantsWriteLimitsConfig{ tenant: &WriteLimitConfig{ - RequestLimits: newEmptyRequestLimitsConfig().SetSamplesLimit(tt.samplesLimit), + RequestLimits: NewEmptyRequestLimitsConfig().SetSamplesLimit(tt.samplesLimit), }, }, } diff --git a/pkg/receive/testdata/limits.yaml b/pkg/receive/testdata/limits.yaml new file mode 100644 index 0000000000..2345756179 --- /dev/null +++ b/pkg/receive/testdata/limits.yaml @@ -0,0 +1,22 @@ +write: + global: + max_concurrency: 30 + meta_monitoring_url: "http://localhost:9090" + meta_monitoring_limit_query: "sum(prometheus_tsdb_head_series) by (tenant)" + default: + request: + size_bytes_limit: 1024 + series_limit: 1000 + samples_limit: 10 + head_series_limit: 1000 + tenants: + acme: + request: + size_bytes_limit: 0 + series_limit: 0 + samples_limit: 0 + head_series_limit: 2000 + ajax: + request: + series_limit: 50000 + samples_limit: 500 diff --git a/pkg/receive/testdata/limits_config/invalid_limits.yaml b/pkg/receive/testdata/limits_config/invalid_limits.yaml new file mode 100644 index 0000000000..74db0453f8 --- /dev/null +++ b/pkg/receive/testdata/limits_config/invalid_limits.yaml @@ -0,0 +1,17 @@ +write: + global: + max_concurrency: 30 + request: + size_bytes_limit: 1024 + series_limit: 1000 + samples_limit: 10 + tenants: + acme: + request: + size_bytes_limit: 0 + series_limit: 0 + samples_limit: 0 + ajax: + request: + series_limit: 50000 + samples_limit: 500