Skip to content

Commit

Permalink
Receive: Reload tenant limit configuration on file change (thanos-io#…
Browse files Browse the repository at this point in the history
…5673)

* Create a PathOrContent reloader

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Add docs to staticPathContent.Rewrite

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Run goimports

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Properly cancel the context in the test

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Watch parent directory of file

This helps handling deletes and other situations.

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Remove useless ctx.Done()

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Add a debounce timer to config reload

It helps managing situations where a create event is followed by a write or when a big file write is sent by the fsnotify backend as many write events.

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Fix event.Op bitmask check

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Update lastReload

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Fix debouncer for path content reloader

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Improve documentation of the PathContentRealoder

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Dain reload timer before resetting

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Run tests in parallel

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Simplify debouncing logic

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Add more tests to file reloader

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Simplify condition for triggering reload

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Use absolute path to config file

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Get rid of parallel test

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Put back 2s wait between fs operations

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Remove useless sleep

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Stop reloadTimer when context cancelled

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Remove unused fucntion

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Add missing copyright to test file

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Auto-reload tenant limit config on file changes

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Wrap error when reloading config

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Move limiter config reloader and update logs

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Get rid of useless types and allocations

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Remove errorChan from config reload starter

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Retrigger CI

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Use UnRegisterer in the Limiter

To ensure that limit reloads will be able to re-register their metrics.

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Better guard against nil registerer in the limiter

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Remove wrong nil guard

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

* Retrigger CI

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>

Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>
Signed-off-by: utukj <utukphd@gmail.com>
  • Loading branch information
douglascamata authored and utukJ committed Oct 18, 2022
1 parent ad11a03 commit 32ca327
Show file tree
Hide file tree
Showing 17 changed files with 646 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5734](https://github.com/thanos-io/thanos/pull/5734) Store: Support disable block viewer UI.
- [#5411](https://github.com/thanos-io/thanos/pull/5411) Tracing: Add OpenTelemetry Protocol exporter.
- [#5779](https://github.com/thanos-io/thanos/pull/5779) Objstore: Support specifying S3 storage class.
- [#5673](https://github.com/thanos-io/thanos/pull/5673) Receive: Reload tenant limit configuration on file change.

### Changed

Expand Down
46 changes: 33 additions & 13 deletions cmd/thanos/receive.go
Expand Up @@ -192,19 +192,6 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of limit configuration")
}
limitsConfig, err = receive.ParseRootLimitConfig(limitsContentYaml)
if err != nil {
return errors.Wrap(err, "parse limit configuration")
}
}
limiter := receive.NewLimiter(limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))

dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
Expand All @@ -217,6 +204,23 @@ func runReceive(
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of limit configuration")
}
limitsConfig, err = receive.ParseRootLimitConfig(limitsContentYaml)
if err != nil {
return errors.Wrap(err, "parse limit configuration")
}
}
limiter, err := receive.NewLimiter(conf.limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))
if err != nil {
return errors.Wrap(err, "creating limiter")
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Expand Down Expand Up @@ -399,6 +403,22 @@ func runReceive(
})
}

{
if limiter.CanReload() {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Debug(logger).Log("msg", "limits config initialized with file watcher.")
if err := limiter.StartConfigReloader(ctx); err != nil {
return err
}
<-ctx.Done()
return nil
}, func(err error) {
cancel()
})
}
}

level.Info(logger).Log("msg", "starting receiver")
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion docs/components/receive.md
Expand Up @@ -86,7 +86,7 @@ Thanos Receive has some limits and gates that can be configured to control resou

To configure the gates and limits you can use one of the two options:

- `--receive.limits-config-file=<file-path>`: where `<file-path>` is the path to the YAML file.
- `--receive.limits-config-file=<file-path>`: where `<file-path>` is the path to the YAML file. Any modification to the indicated file will trigger a configuration reload. If the updated configuration is invalid an error will be logged and it won't replace the previous valid configuration.
- `--receive.limits-config=<content>`: where `<content>` is the content of YAML file.

By default all the limits and gates are **disabled**.
Expand Down
8 changes: 3 additions & 5 deletions go.mod
Expand Up @@ -19,7 +19,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/dustin/go-humanize v1.0.0
github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d
github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fatih/structtag v1.2.0
github.com/felixge/fgprof v0.9.2
Expand Down Expand Up @@ -108,6 +108,7 @@ require (

require (
github.com/efficientgo/core v1.0.0-rc.0
github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd
github.com/minio/sha256-simd v1.0.0
)

Expand All @@ -127,10 +128,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.10.0
)

require (
github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd
go.opentelemetry.io/contrib/propagators/autoprop v0.34.0
)
require go.opentelemetry.io/contrib/propagators/autoprop v0.34.0

require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -252,8 +252,8 @@ github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a h1:cnJajqeh/Hjv
github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a/go.mod h1:Hi+sz0REtlhVZ8zcdeTC3j6LUEEpJpPtNjOaOKuNcgI=
github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd h1:svR6KxSP1xiPw10RN4Pd7g6BAVkEcNN628PAqZH31mM=
github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M=
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d h1:WZV/mrUyKS9w9r+Jdw+zq/tdGAb5LwB+H37EkMLhEMA=
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd h1:VaYzzXeUbC5fVheskcKVNOyJMEYD+HgrJNzIAg/mRIM=
github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4=
github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM=
Expand Down
128 changes: 128 additions & 0 deletions pkg/extkingpin/path_content_reloader.go
@@ -0,0 +1,128 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extkingpin

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"time"

"github.com/fsnotify/fsnotify"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
)

type fileContent interface {
Content() ([]byte, error)
Path() string
}

// PathContentReloader starts a file watcher that monitors the file indicated by fileContent.Path() and runs
// reloadFunc whenever a change is detected.
// A debounce timer can be configured via opts to handle situations where many "write" events are received together or
// a "create" event is followed up by a "write" event, for example. Files will be effectively reloaded at the latest
// after 2 times the debounce timer. By default the debouncer timer is 1 second.
// To ensure renames and deletes are properly handled, the file watcher is put at the file's parent folder. See
// https://github.com/fsnotify/fsnotify/issues/214 for more details.
func PathContentReloader(ctx context.Context, fileContent fileContent, logger log.Logger, reloadFunc func(), debounceTime time.Duration) error {
filePath, err := filepath.Abs(fileContent.Path())
if err != nil {
return errors.Wrap(err, "getting absolute file path")
}

watcher, err := fsnotify.NewWatcher()
if filePath == "" {
level.Debug(logger).Log("msg", "no path detected for config reload")
}
if err != nil {
return errors.Wrap(err, "creating file watcher")
}
go func() {
var reloadTimer *time.Timer
if debounceTime != 0 {
reloadTimer = time.AfterFunc(debounceTime, func() {
reloadFunc()
level.Debug(logger).Log("msg", "configuration reloaded after debouncing")
})
}
defer watcher.Close()
for {
select {
case <-ctx.Done():
if reloadTimer != nil {
reloadTimer.Stop()
}
return
case event := <-watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out.
if event.Name == "" {
break
}
// We are watching the file's parent folder (more details on this is done can be found below), but are
// only interested in changed to the target file. Discard every other file as quickly as possible.
if event.Name != filePath {
break
}
// We only react to files being written or created.
// On chmod or remove we have nothing to do.
// On rename we have the old file name (not useful). A create event for the new file will come later.
if event.Op&fsnotify.Write == 0 && event.Op&fsnotify.Create == 0 {
break
}
level.Debug(logger).Log("msg", fmt.Sprintf("change detected for %s", filePath), "eventName", event.Name, "eventOp", event.Op)
if reloadTimer != nil {
reloadTimer.Reset(debounceTime)
}
case err := <-watcher.Errors:
level.Error(logger).Log("msg", "watcher error", "error", err)
}
}
}()
// We watch the file's parent folder and not the file itself to better handle DELETE and RENAME events. Check
// https://github.com/fsnotify/fsnotify/issues/214 for more details.
if err := watcher.Add(path.Dir(filePath)); err != nil {
return errors.Wrapf(err, "adding path %s to file watcher", filePath)
}
return nil
}

type staticPathContent struct {
content []byte
path string
}

var _ fileContent = (*staticPathContent)(nil)

// Content returns the cached content.
func (t *staticPathContent) Content() ([]byte, error) {
return t.content, nil
}

// Path returns the path to the file that contains the content.
func (t *staticPathContent) Path() string {
return t.path
}

// NewStaticPathContent creates a new content that can be used to serve a static configuration. It copies the
// configuration from `fromPath` into `destPath` to avoid confusion with file watchers.
func NewStaticPathContent(fromPath string) (*staticPathContent, error) {
content, err := os.ReadFile(fromPath)
if err != nil {
return nil, errors.Wrapf(err, "could not load test content: %s", fromPath)
}
return &staticPathContent{content, fromPath}, nil
}

// Rewrite rewrites the file backing this staticPathContent and swaps the local content cache. The file writing
// is needed to trigger the file system monitor.
func (t *staticPathContent) Rewrite(newContent []byte) error {
t.content = newContent
// Write the file to ensure possible file watcher reloaders get triggered.
return os.WriteFile(t.path, newContent, 0666)
}
105 changes: 105 additions & 0 deletions pkg/extkingpin/path_content_reloader_test.go
@@ -0,0 +1,105 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extkingpin

import (
"context"
"os"
"path"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestPathContentReloader(t *testing.T) {
type args struct {
runSteps func(t *testing.T, testFile string, pathContent *staticPathContent)
}
tests := []struct {
name string
args args
wantReloads int
}{
{
name: "Many operations, only rewrite triggers one reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Remove(testFile))
testutil.Ok(t, pathContent.Rewrite([]byte("test modified")))
},
},
wantReloads: 1,
},
{
name: "Many operations, only rename triggers one reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Rename(testFile, testFile+".tmp"))
testutil.Ok(t, os.Rename(testFile+".tmp", testFile))
},
},
wantReloads: 1,
},
{
name: "Many operations, two rewrites trigger two reloads",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Chmod(testFile, 0777))
testutil.Ok(t, os.Remove(testFile))
testutil.Ok(t, pathContent.Rewrite([]byte("test modified")))
time.Sleep(2 * time.Second)
testutil.Ok(t, pathContent.Rewrite([]byte("test modified again")))
},
},
wantReloads: 1,
},
{
name: "Chmod doesn't trigger reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Chmod(testFile, 0777))
},
},
wantReloads: 0,
},
{
name: "Remove doesn't trigger reload",
args: args{
runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) {
testutil.Ok(t, os.Remove(testFile))
},
},
wantReloads: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testFile := path.Join(t.TempDir(), "test")
testutil.Ok(t, os.WriteFile(testFile, []byte("test"), 0666))
pathContent, err := NewStaticPathContent(testFile)
testutil.Ok(t, err)

wg := &sync.WaitGroup{}
wg.Add(tt.wantReloads)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
reloadCount := 0
err = PathContentReloader(ctx, pathContent, log.NewLogfmtLogger(os.Stdout), func() {
reloadCount++
wg.Done()
}, 100*time.Millisecond)
testutil.Ok(t, err)

tt.args.runSteps(t, testFile, pathContent)
wg.Wait()
testutil.Equals(t, tt.wantReloads, reloadCount)
})
}
}

0 comments on commit 32ca327

Please sign in to comment.