From fd635313e0e010c50d3896c840c9b7f186bda086 Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Tue, 26 Mar 2024 21:50:23 +0530 Subject: [PATCH 1/7] random delay Signed-off-by: Vanshikav123 --- cmd/thanos/config.go | 1 + cmd/thanos/sidecar.go | 29 +++++++++++++++++++++++++++++ docs/components/sidecar.md | 1 + 3 files changed, 31 insertions(+) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index a6abd7c3c9..7f7735af08 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -164,6 +164,7 @@ type shipperConfig struct { allowOutOfOrderUpload bool hashFunc string metaFileName string + uploadJitter time.Duration } func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig { diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 74ab3090fa..ae1cc42b6a 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "math" + "math/rand" "net/http" "net/url" "sync" @@ -363,6 +364,9 @@ func runSidecar( uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName) return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + // Generate random delay using upload jitter. + jitter := time.Duration(rand.Int63n(int64(conf.shipper.uploadJitter))) + time.Sleep(jitter) if uploaded, err := s.Sync(ctx); err != nil { level.Warn(logger).Log("err", err, "uploaded", uploaded) } @@ -506,7 +510,30 @@ type sidecarConfig struct { storeRateLimits store.SeriesSelectLimits } +type durationValue time.Duration + +// Set implements the Set method of the kingpin.Value interface. +func (d *durationValue) Set(value string) error { + duration, err := time.ParseDuration(value) + if err != nil { + return err + } + *d = durationValue(duration) + return nil +} + +// String implements the String method of the kingpin.Value interface. +func (d *durationValue) String() string { + return time.Duration(*d).String() +} + +// Register a flag with the provided duration value. +func registerDurationFlag(cmd extkingpin.FlagClause, value *time.Duration, flagName string, defaultValue string, help string) { + cmd.Flag(flagName, help).Default(defaultValue).SetValue((*durationValue)(value)) +} + func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { + var uploadJitter time.Duration sc.http.registerFlag(cmd) sc.grpc.registerFlag(cmd) sc.prometheus.registerFlag(cmd) @@ -518,4 +545,6 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { sc.storeRateLimits.RegisterFlags(cmd) cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime) + registerDurationFlag(cmd, &uploadJitter, "upload-jitter", "0s", "Maximum random delay before uploading blocks.") + sc.shipper.uploadJitter = uploadJitter } diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index d41a920aa2..2e07e57d7f 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -228,6 +228,7 @@ Flags: configuration. See format details: https://thanos.io/tip/thanos/tracing.md/#configuration --tsdb.path="./data" Data directory of TSDB. + --upload-jitter=0s Maximum random delay before uploading blocks. --version Show application version. ``` From e09f3f89d4a60ef0d9c1c17256a501fb193a7174 Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Thu, 4 Apr 2024 00:07:15 +0530 Subject: [PATCH 2/7] added new jitter function Signed-off-by: Vanshikav123 --- cmd/thanos/sidecar.go | 48 ++++++++++++++++++------------------------ pkg/runutil/runutil.go | 34 +++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 28 deletions(-) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index ae1cc42b6a..752ab1cffe 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -106,6 +106,19 @@ func registerSidecar(app *extkingpin.App) { }) } +// DurationWithJitter returns random duration from "input - input*variancePerc" to "input + input*variancePerc" interval. +func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration { + + if input == 0 { + return 0 + } + + variance := int64(float64(input) * variancePerc) + jitter := rand.Int63n(variance*2) - variance + + return input + time.Duration(jitter) +} + func runSidecar( g *run.Group, logger log.Logger, @@ -363,10 +376,12 @@ func runSidecar( s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName) - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + return runutil.RepeatWithJitter(30*time.Second, ctx, 0.05, func() error { // Generate random delay using upload jitter. - jitter := time.Duration(rand.Int63n(int64(conf.shipper.uploadJitter))) - time.Sleep(jitter) + jitter := DurationWithJitter(conf.shipper.uploadJitter, 0.05) + if jitter > 0 { + time.Sleep(jitter) + } if uploaded, err := s.Sync(ctx); err != nil { level.Warn(logger).Log("err", err, "uploaded", uploaded) } @@ -510,28 +525,6 @@ type sidecarConfig struct { storeRateLimits store.SeriesSelectLimits } -type durationValue time.Duration - -// Set implements the Set method of the kingpin.Value interface. -func (d *durationValue) Set(value string) error { - duration, err := time.ParseDuration(value) - if err != nil { - return err - } - *d = durationValue(duration) - return nil -} - -// String implements the String method of the kingpin.Value interface. -func (d *durationValue) String() string { - return time.Duration(*d).String() -} - -// Register a flag with the provided duration value. -func registerDurationFlag(cmd extkingpin.FlagClause, value *time.Duration, flagName string, defaultValue string, help string) { - cmd.Flag(flagName, help).Default(defaultValue).SetValue((*durationValue)(value)) -} - func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { var uploadJitter time.Duration sc.http.registerFlag(cmd) @@ -545,6 +538,7 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { sc.storeRateLimits.RegisterFlags(cmd) cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime) - registerDurationFlag(cmd, &uploadJitter, "upload-jitter", "0s", "Maximum random delay before uploading blocks.") - sc.shipper.uploadJitter = uploadJitter + conf := &sidecarConfig{} + cmd.Flag("upload-jitter", "Maximum random delay before uploading blocks.").Default("0s").DurationVar(&uploadJitter) + conf.shipper.uploadJitter = uploadJitter } diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 809dfce36b..2a15d46673 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -12,7 +12,10 @@ // err := runutil.Repeat(10*time.Second, stopc, func() error { // // ... // }) -// +//err := runutil.RepeatWithJitter(30*time.Second, ctx, 0.05, func() error { +// Your code here +//}) + // Retry starts executing closure function f until no error is returned from f: // // err := runutil.Retry(10*time.Second, stopc, func() error { @@ -50,8 +53,10 @@ package runutil import ( + "context" "fmt" "io" + "math/rand" "os" "path/filepath" "strings" @@ -99,6 +104,33 @@ func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error } } +// RepeatWithJitter executes f with a random jitter added to the interval between each execution. +// It continues until ctx is done or f returns an error. +// The jitter factor should be between 0 and 1, where 0 means no jitter and 1 means the interval can vary from 0 to 2 times the original interval. +func RepeatWithJitter(interval time.Duration, ctx context.Context, jitterFactor float64, f func() error) error { + for { + select { + case <-ctx.Done(): + return nil + default: + + if err := f(); err != nil { + return err + } + + jitter := time.Duration(float64(interval) * jitterFactor) + + jitteredInterval := interval + time.Duration(rand.Float64()*float64(jitter)) + + select { + case <-ctx.Done(): + return nil + case <-time.After(jitteredInterval): + } + } + } +} + // Retry executes f every interval seconds until timeout or no error is returned from f. func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error { return RetryWithLog(log.NewNopLogger(), interval, stopc, f) From cced736f7716c419a5afe797b46a5c83b09a58ae Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Thu, 4 Apr 2024 17:46:57 +0530 Subject: [PATCH 3/7] resolved review changes Signed-off-by: Vanshikav123 --- cmd/thanos/sidecar.go | 5 +---- pkg/runutil/runutil.go | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 752ab1cffe..6245a0eb4f 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -108,7 +108,6 @@ func registerSidecar(app *extkingpin.App) { // DurationWithJitter returns random duration from "input - input*variancePerc" to "input + input*variancePerc" interval. func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration { - if input == 0 { return 0 } @@ -526,7 +525,6 @@ type sidecarConfig struct { } func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { - var uploadJitter time.Duration sc.http.registerFlag(cmd) sc.grpc.registerFlag(cmd) sc.prometheus.registerFlag(cmd) @@ -539,6 +537,5 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime) conf := &sidecarConfig{} - cmd.Flag("upload-jitter", "Maximum random delay before uploading blocks.").Default("0s").DurationVar(&uploadJitter) - conf.shipper.uploadJitter = uploadJitter + cmd.Flag("upload-jitter", "Maximum random delay before uploading blocks.").Default("0s").DurationVar(&conf.shipper.uploadJitter) } diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 2a15d46673..46b36cd43b 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -113,7 +113,6 @@ func RepeatWithJitter(interval time.Duration, ctx context.Context, jitterFactor case <-ctx.Done(): return nil default: - if err := f(); err != nil { return err } From 24eafbe32bd7f6105e2aa7980152ec4030a52428 Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Fri, 5 Apr 2024 22:42:47 +0530 Subject: [PATCH 4/7] remove unnecessary jitter Signed-off-by: Vanshikav123 --- cmd/thanos/sidecar.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 6245a0eb4f..99849d56f2 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -376,11 +376,6 @@ func runSidecar( uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName) return runutil.RepeatWithJitter(30*time.Second, ctx, 0.05, func() error { - // Generate random delay using upload jitter. - jitter := DurationWithJitter(conf.shipper.uploadJitter, 0.05) - if jitter > 0 { - time.Sleep(jitter) - } if uploaded, err := s.Sync(ctx); err != nil { level.Warn(logger).Log("err", err, "uploaded", uploaded) } From 3566f56dbe9be73873cdd536076ae5b23ab51bb6 Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Fri, 5 Apr 2024 23:57:34 +0530 Subject: [PATCH 5/7] final changes Signed-off-by: Vanshikav123 --- cmd/thanos/config.go | 1 - cmd/thanos/sidecar.go | 17 +---------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 7f7735af08..a6abd7c3c9 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -164,7 +164,6 @@ type shipperConfig struct { allowOutOfOrderUpload bool hashFunc string metaFileName string - uploadJitter time.Duration } func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig { diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 99849d56f2..eec705cbe8 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "math" - "math/rand" "net/http" "net/url" "sync" @@ -106,18 +105,6 @@ func registerSidecar(app *extkingpin.App) { }) } -// DurationWithJitter returns random duration from "input - input*variancePerc" to "input + input*variancePerc" interval. -func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration { - if input == 0 { - return 0 - } - - variance := int64(float64(input) * variancePerc) - jitter := rand.Int63n(variance*2) - variance - - return input + time.Duration(jitter) -} - func runSidecar( g *run.Group, logger log.Logger, @@ -375,7 +362,7 @@ func runSidecar( s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName) - return runutil.RepeatWithJitter(30*time.Second, ctx, 0.05, func() error { + return runutil.RepeatWithJitter(30*time.Second, ctx, 0.2, func() error { if uploaded, err := s.Sync(ctx); err != nil { level.Warn(logger).Log("err", err, "uploaded", uploaded) } @@ -531,6 +518,4 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { sc.storeRateLimits.RegisterFlags(cmd) cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime) - conf := &sidecarConfig{} - cmd.Flag("upload-jitter", "Maximum random delay before uploading blocks.").Default("0s").DurationVar(&conf.shipper.uploadJitter) } From ab01ce948e4bf670d1084763986259acbf031be1 Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Mon, 8 Apr 2024 15:24:00 +0530 Subject: [PATCH 6/7] doc changes Signed-off-by: Vanshikav123 --- docs/components/sidecar.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index 2e07e57d7f..d41a920aa2 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -228,7 +228,6 @@ Flags: configuration. See format details: https://thanos.io/tip/thanos/tracing.md/#configuration --tsdb.path="./data" Data directory of TSDB. - --upload-jitter=0s Maximum random delay before uploading blocks. --version Show application version. ``` From da63e089d748ac8e203917dc2f776812e9ab92c5 Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Mon, 8 Apr 2024 16:18:26 +0530 Subject: [PATCH 7/7] context position Signed-off-by: Vanshikav123 --- cmd/thanos/sidecar.go | 2 +- pkg/runutil/runutil.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index eec705cbe8..735f9f5163 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -362,7 +362,7 @@ func runSidecar( s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName) - return runutil.RepeatWithJitter(30*time.Second, ctx, 0.2, func() error { + return runutil.RepeatWithJitter(ctx, 30*time.Second, 0.2, func() error { if uploaded, err := s.Sync(ctx); err != nil { level.Warn(logger).Log("err", err, "uploaded", uploaded) } diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 46b36cd43b..f7d638fdfb 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -107,7 +107,7 @@ func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error // RepeatWithJitter executes f with a random jitter added to the interval between each execution. // It continues until ctx is done or f returns an error. // The jitter factor should be between 0 and 1, where 0 means no jitter and 1 means the interval can vary from 0 to 2 times the original interval. -func RepeatWithJitter(interval time.Duration, ctx context.Context, jitterFactor float64, f func() error) error { +func RepeatWithJitter(ctx context.Context, interval time.Duration, jitterFactor float64, f func() error) error { for { select { case <-ctx.Done():