diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 9b8c2feded..6bface0858 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -362,7 +362,7 @@ func runSidecar( s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName) - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + return runutil.RepeatWithJitter(ctx, 30*time.Second, 0.2, func() error { if uploaded, err := s.Sync(ctx); err != nil { level.Warn(logger).Log("err", err, "uploaded", uploaded) } diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 809dfce36b..f7d638fdfb 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -12,7 +12,10 @@ // err := runutil.Repeat(10*time.Second, stopc, func() error { // // ... // }) -// +//err := runutil.RepeatWithJitter(30*time.Second, ctx, 0.05, func() error { +// Your code here +//}) + // Retry starts executing closure function f until no error is returned from f: // // err := runutil.Retry(10*time.Second, stopc, func() error { @@ -50,8 +53,10 @@ package runutil import ( + "context" "fmt" "io" + "math/rand" "os" "path/filepath" "strings" @@ -99,6 +104,32 @@ func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error } } +// RepeatWithJitter executes f with a random jitter added to the interval between each execution. +// It continues until ctx is done or f returns an error. +// The jitter factor should be between 0 and 1, where 0 means no jitter and 1 means the interval can vary from 0 to 2 times the original interval. +func RepeatWithJitter(ctx context.Context, interval time.Duration, jitterFactor float64, f func() error) error { + for { + select { + case <-ctx.Done(): + return nil + default: + if err := f(); err != nil { + return err + } + + jitter := time.Duration(float64(interval) * jitterFactor) + + jitteredInterval := interval + time.Duration(rand.Float64()*float64(jitter)) + + select { + case <-ctx.Done(): + return nil + case <-time.After(jitteredInterval): + } + } + } +} + // Retry executes f every interval seconds until timeout or no error is returned from f. func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error { return RetryWithLog(log.NewNopLogger(), interval, stopc, f)