From 642b6090fabaedba047eb76fdeb165e5e4085a4c Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Tue, 14 Jul 2020 16:13:02 +0200 Subject: [PATCH 1/7] Add timeout for uploads Add more logs fo receive Signed-off-by: Kemal Akkoyun --- cmd/thanos/receive.go | 55 +++++++++++++++++++++++++++------------- pkg/receive/multitsdb.go | 5 ++-- pkg/server/grpc/grpc.go | 15 ++++++----- pkg/server/http/http.go | 7 +++-- 4 files changed, 55 insertions(+), 27 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index e63c3cf0e5..83f9ed15e3 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -15,13 +15,13 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" - kingpin "gopkg.in/alecthomas/kingpin.v2" + "gopkg.in/alecthomas/kingpin.v2" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" @@ -84,6 +84,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden()) + uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for the initial and last upload request.").Default("10m").Hidden()) + tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool() @@ -163,8 +165,9 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { *replicaHeader, *replicationFactor, time.Duration(*forwardTimeout), - comp, + time.Duration(*uploadTimeout), *allowOutOfOrderUpload, + comp, ) } } @@ -202,8 +205,9 @@ func runReceive( replicaHeader string, replicationFactor uint64, forwardTimeout time.Duration, - comp component.SourceStoreAPI, + uploadTimeout time.Duration, allowOutOfOrderUpload bool, + comp component.SourceStoreAPI, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive") @@ -294,6 +298,7 @@ func runReceive( level.Debug(logger).Log("msg", "setting up tsdb") { + log.With(logger, "component", "storage") dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_receive_multi_db_updates_attempted_total", Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes", @@ -311,12 +316,15 @@ func runReceive( // Before quitting, ensure the WAL is flushed and the DBs are closed. defer func() { + level.Info(logger).Log("msg", "shutting down Multi TSDB") if err := dbs.Flush(); err != nil { - level.Warn(logger).Log("err", err, "msg", "failed to flush storage") + level.Error(logger).Log("err", err, "msg", "failed to flush storage") } if err := dbs.Close(); err != nil { - level.Warn(logger).Log("err", err, "msg", "failed to close multi db") + level.Error(logger).Log("err", err, "msg", "failed to close storage") + return } + level.Info(logger).Log("msg", "Multi TSDB is closed") }() for { @@ -328,7 +336,7 @@ func runReceive( return nil } dbUpdatesStarted.Inc() - level.Info(logger).Log("msg", "updating Multi DB") + level.Info(logger).Log("msg", "updating Multi TSDB") if err := dbs.Flush(); err != nil { return errors.Wrap(err, "flushing storage") @@ -341,7 +349,7 @@ func runReceive( <-uploadDone } statusProber.Ready() - level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests") + level.Info(logger).Log("msg", "Multi TSDB started, and server is ready to receive web requests") dbUpdatesCompleted.Inc() dbReady <- struct{}{} } @@ -394,7 +402,7 @@ func runReceive( return nil } webHandler.Hashring(h) - msg := "hashring has changed; server is not ready to receive web requests." + msg := "hashring has changed; server is not ready to receive web requests" statusProber.NotReady(errors.New(msg)) level.Info(logger).Log("msg", msg) hashringChangedChan <- struct{}{} @@ -489,20 +497,27 @@ func runReceive( } if upload { - level.Debug(logger).Log("msg", "upload enabled") - if err := dbs.Sync(context.Background()); err != nil { - level.Warn(logger).Log("msg", "initial upload failed", "err", err) + logger := log.With(logger, "component", "uploader") + { + level.Info(logger).Log("msg", "upload enabled, starting initial sync") + ctx, cancel := context.WithTimeout(context.Background(), uploadTimeout) + if err := dbs.Sync(ctx); err != nil { + cancel() + level.Warn(logger).Log("msg", "initial upload failed", "err", err) + } + cancel() + level.Info(logger).Log("msg", "initial sync done") } - { // Run the uploader in a loop. ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + level.Debug(logger).Log("msg", "recurring upload starting") if err := dbs.Sync(ctx); err != nil { - level.Warn(logger).Log("msg", "interval upload failed", "err", err) + level.Warn(logger).Log("msg", "recurring upload failed", "err", err) } - + level.Debug(logger).Log("msg", "upload done") return nil }) }, func(error) { @@ -521,9 +536,15 @@ func runReceive( // Before quitting, ensure all blocks are uploaded. defer func() { <-uploadC - if err := dbs.Sync(context.Background()); err != nil { - level.Warn(logger).Log("msg", "on demnad upload failed", "err", err) + level.Info(logger).Log("msg", "uploading the last cut block before exiting") + dctx, dCancel := context.WithTimeout(context.Background(), uploadTimeout) + if err := dbs.Sync(dctx); err != nil { + dCancel() + level.Error(logger).Log("msg", "on demand upload failed", "err", err) + return } + dCancel() + level.Info(logger).Log("msg", "the last cut block is uploaded") }() defer close(uploadDone) for { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 7708a6e9e4..420a9e5ff9 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -18,13 +18,14 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" + "golang.org/x/sync/errgroup" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" - "golang.org/x/sync/errgroup" ) type MultiTSDB struct { @@ -57,7 +58,7 @@ func NewMultiTSDB( return &MultiTSDB{ dataDir: dataDir, - logger: l, + logger: log.With(l, "component", "multi-tsdb"), reg: reg, tsdbOpts: tsdbOpts, mtx: &sync.RWMutex{}, diff --git a/pkg/server/grpc/grpc.go b/pkg/server/grpc/grpc.go index d90fe59334..897f15cbe3 100644 --- a/pkg/server/grpc/grpc.go +++ b/pkg/server/grpc/grpc.go @@ -18,16 +18,17 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/prober" - "github.com/thanos-io/thanos/pkg/rules/rulespb" - "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/tracing" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" grpc_health "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" + + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/prober" + "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tracing" ) // A Server defines parameters to serve RPC requests, a wrapper around grpc.Server. @@ -123,10 +124,11 @@ func (s *Server) ListenAndServe() error { // Shutdown gracefully shuts down the server by waiting, // for specified amount of time (by gracePeriod) for connections to return to idle and then shut down. func (s *Server) Shutdown(err error) { - defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err) + level.Info(s.logger).Log("msg", "internal server is shutting down", "err", err) if s.opts.gracePeriod == 0 { s.srv.Stop() + level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err) return } @@ -147,6 +149,7 @@ func (s *Server) Shutdown(err error) { case <-stopped: cancel() } + level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err) } // ReadWriteStoreServer is a StoreServer and a WriteableStoreServer. diff --git a/pkg/server/http/http.go b/pkg/server/http/http.go index 162da79a77..6de4e80f8f 100644 --- a/pkg/server/http/http.go +++ b/pkg/server/http/http.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/prober" ) @@ -60,15 +61,15 @@ func (s *Server) ListenAndServe() error { // Shutdown gracefully shuts down the server by waiting, // for specified amount of time (by gracePeriod) for connections to return to idle and then shut down. func (s *Server) Shutdown(err error) { + level.Info(s.logger).Log("msg", "internal server is shutting down", "err", err) if err == http.ErrServerClosed { level.Warn(s.logger).Log("msg", "internal server closed unexpectedly") return } - defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err) - if s.opts.gracePeriod == 0 { s.srv.Close() + level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err) return } @@ -77,7 +78,9 @@ func (s *Server) Shutdown(err error) { if err := s.srv.Shutdown(ctx); err != nil { level.Error(s.logger).Log("msg", "internal server shut down failed", "err", err) + return } + level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err) } // Handle registers the handler for the given pattern. From 437c7b6357a39a69be401fd28a2745eabb05561e Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Tue, 14 Jul 2020 17:15:13 +0200 Subject: [PATCH 2/7] Add timeout for all uploads Merge uploader routines Address review issues Signed-off-by: Kemal Akkoyun --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 82 ++++++++++++++++++++++--------------------- 2 files changed, 43 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a737ca62..0125f67cde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page - [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs. +- [#2892](https://github.com/thanos-io/thanos/pull/2892) receive: Add time-out for each block upload. And receiver fails when the initial upload fails. ## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10 diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 83f9ed15e3..8cad91556c 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -82,9 +82,9 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64() - forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden()) + forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) - uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for the initial and last upload request.").Default("10m").Hidden()) + uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for each block upload request.").Default("10m").Hidden()) tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) @@ -316,15 +316,17 @@ func runReceive( // Before quitting, ensure the WAL is flushed and the DBs are closed. defer func() { - level.Info(logger).Log("msg", "shutting down Multi TSDB") + level.Info(logger).Log("msg", "shutting down storage") if err := dbs.Flush(); err != nil { level.Error(logger).Log("err", err, "msg", "failed to flush storage") + } else { + level.Info(logger).Log("msg", "storage is flushed successfully") } if err := dbs.Close(); err != nil { level.Error(logger).Log("err", err, "msg", "failed to close storage") return } - level.Info(logger).Log("msg", "Multi TSDB is closed") + level.Info(logger).Log("msg", "storage is closed") }() for { @@ -336,7 +338,7 @@ func runReceive( return nil } dbUpdatesStarted.Inc() - level.Info(logger).Log("msg", "updating Multi TSDB") + level.Info(logger).Log("msg", "updating storage") if err := dbs.Flush(); err != nil { return errors.Wrap(err, "flushing storage") @@ -349,7 +351,7 @@ func runReceive( <-uploadDone } statusProber.Ready() - level.Info(logger).Log("msg", "Multi TSDB started, and server is ready to receive web requests") + level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests") dbUpdatesCompleted.Inc() dbReady <- struct{}{} } @@ -498,69 +500,69 @@ func runReceive( if upload { logger := log.With(logger, "component", "uploader") - { - level.Info(logger).Log("msg", "upload enabled, starting initial sync") - ctx, cancel := context.WithTimeout(context.Background(), uploadTimeout) + upload := func(ctx context.Context) error { + level.Debug(logger).Log("msg", "upload starting") + start := time.Now() + ctx, cancel := context.WithTimeout(ctx, uploadTimeout) + defer cancel() + if err := dbs.Sync(ctx); err != nil { - cancel() - level.Warn(logger).Log("msg", "initial upload failed", "err", err) + level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err) + } else { + level.Debug(logger).Log("msg", "upload done", "elapsed", time.Since(start)) } - cancel() - level.Info(logger).Log("msg", "initial sync done") + + return nil } { - // Run the uploader in a loop. - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "recurring upload starting") - if err := dbs.Sync(ctx); err != nil { - level.Warn(logger).Log("msg", "recurring upload failed", "err", err) - } - level.Debug(logger).Log("msg", "upload done") - return nil - }) - }, func(error) { - cancel() - }) + level.Info(logger).Log("msg", "upload enabled, starting initial sync") + if err := upload(context.Background()); err != nil { + return errors.Wrapf(err, "initial upload failed") + } + level.Info(logger).Log("msg", "initial sync done") } - { - // Upload on demand. ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { // Ensure we clean up everything properly. defer func() { runutil.CloseWithLogOnErr(logger, bkt, "bucket client") }() + // Before quitting, ensure all blocks are uploaded. defer func() { - <-uploadC - level.Info(logger).Log("msg", "uploading the last cut block before exiting") + <-uploadC // Closed by storage routine when it's done. + level.Info(logger).Log("msg", "uploading the final cut block before exiting") dctx, dCancel := context.WithTimeout(context.Background(), uploadTimeout) if err := dbs.Sync(dctx); err != nil { dCancel() - level.Error(logger).Log("msg", "on demand upload failed", "err", err) + level.Error(logger).Log("msg", "the final upload failed", "err", err) return } dCancel() - level.Info(logger).Log("msg", "the last cut block is uploaded") + level.Info(logger).Log("msg", "the final cut block was uploaded") }() + defer close(uploadDone) + + // Run the uploader in a loop. + tick := time.NewTicker(30 * time.Second) + defer tick.Stop() + for { - select { - case <-ctx.Done(): - return nil - default: - } select { case <-ctx.Done(): return nil case <-uploadC: - if err := dbs.Sync(ctx); err != nil { - level.Warn(logger).Log("err", err) + // Upload on demand. + if err := upload(ctx); err != nil { + level.Warn(logger).Log("msg", "on demand upload failed", "err", err) } uploadDone <- struct{}{} + case <-tick.C: + if err := upload(ctx); err != nil { + level.Warn(logger).Log("msg", "recurring upload failed", "err", err) + } } } }, func(error) { From a3586b2134467cc213a5acd2ba05835583457184 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Tue, 14 Jul 2020 17:40:13 +0200 Subject: [PATCH 3/7] Fix error handling Signed-off-by: Kemal Akkoyun --- cmd/thanos/receive.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 8cad91556c..4cf44b2931 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -508,16 +508,15 @@ func runReceive( if err := dbs.Sync(ctx); err != nil { level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err) - } else { - level.Debug(logger).Log("msg", "upload done", "elapsed", time.Since(start)) + return err } - + level.Debug(logger).Log("msg", "upload done", "elapsed", time.Since(start)) return nil } { level.Info(logger).Log("msg", "upload enabled, starting initial sync") if err := upload(context.Background()); err != nil { - return errors.Wrapf(err, "initial upload failed") + return errors.Wrap(err, "initial upload failed") } level.Info(logger).Log("msg", "initial sync done") } From b97724886402c733fc15495400be1605ea3b0ab5 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Wed, 15 Jul 2020 08:39:48 +0200 Subject: [PATCH 4/7] Fix white noise Signed-off-by: Kemal Akkoyun --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0125f67cde..48757006b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page - [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs. -- [#2892](https://github.com/thanos-io/thanos/pull/2892) receive: Add time-out for each block upload. And receiver fails when the initial upload fails. +- [#2892](https://github.com/thanos-io/thanos/pull/2892) receive: Add time-out for each block upload. And receiver fails when the initial upload fails. ## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10 From 6c7457e9608bd96115c6e56d2d9b2267e1ffb075 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Mon, 20 Jul 2020 10:35:42 +0200 Subject: [PATCH 5/7] Address review issues Signed-off-by: Kemal Akkoyun --- pkg/server/grpc/grpc.go | 3 ++- pkg/server/http/http.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/server/grpc/grpc.go b/pkg/server/grpc/grpc.go index 897f15cbe3..2300c98968 100644 --- a/pkg/server/grpc/grpc.go +++ b/pkg/server/grpc/grpc.go @@ -146,10 +146,11 @@ func (s *Server) Shutdown(err error) { case <-ctx.Done(): level.Info(s.logger).Log("msg", "grace period exceeded enforcing shutdown") s.srv.Stop() + return case <-stopped: cancel() } - level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err) + level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err) } // ReadWriteStoreServer is a StoreServer and a WriteableStoreServer. diff --git a/pkg/server/http/http.go b/pkg/server/http/http.go index 6de4e80f8f..b916819132 100644 --- a/pkg/server/http/http.go +++ b/pkg/server/http/http.go @@ -80,7 +80,7 @@ func (s *Server) Shutdown(err error) { level.Error(s.logger).Log("msg", "internal server shut down failed", "err", err) return } - level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err) + level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err) } // Handle registers the handler for the given pattern. From d9ced69b2c7b1a5abcac10b127b87d715444e50c Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Mon, 20 Jul 2020 11:28:16 +0200 Subject: [PATCH 6/7] Remove upload timeouts, addreessed in another PR Signed-off-by: Kemal Akkoyun --- CHANGELOG.md | 2 +- cmd/thanos/receive.go | 14 ++++---------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48757006b2..4eb9246d25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page - [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs. -- [#2892](https://github.com/thanos-io/thanos/pull/2892) receive: Add time-out for each block upload. And receiver fails when the initial upload fails. +- [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails. ## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10 diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 4cf44b2931..d5da7d23b4 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -84,8 +84,6 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) - uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for each block upload request.").Default("10m").Hidden()) - tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool() @@ -165,7 +163,6 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { *replicaHeader, *replicationFactor, time.Duration(*forwardTimeout), - time.Duration(*uploadTimeout), *allowOutOfOrderUpload, comp, ) @@ -205,7 +202,6 @@ func runReceive( replicaHeader string, replicationFactor uint64, forwardTimeout time.Duration, - uploadTimeout time.Duration, allowOutOfOrderUpload bool, comp component.SourceStoreAPI, ) error { @@ -503,8 +499,6 @@ func runReceive( upload := func(ctx context.Context) error { level.Debug(logger).Log("msg", "upload starting") start := time.Now() - ctx, cancel := context.WithTimeout(ctx, uploadTimeout) - defer cancel() if err := dbs.Sync(ctx); err != nil { level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err) @@ -532,13 +526,13 @@ func runReceive( defer func() { <-uploadC // Closed by storage routine when it's done. level.Info(logger).Log("msg", "uploading the final cut block before exiting") - dctx, dCancel := context.WithTimeout(context.Background(), uploadTimeout) - if err := dbs.Sync(dctx); err != nil { - dCancel() + ctx, cancel := context.WithCancel(context.Background()) + if err := dbs.Sync(ctx); err != nil { + cancel() level.Error(logger).Log("msg", "the final upload failed", "err", err) return } - dCancel() + cancel() level.Info(logger).Log("msg", "the final cut block was uploaded") }() From 40c5e9ea7c736a8b4755f16614f3c8680be5600a Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Tue, 21 Jul 2020 16:54:45 +0200 Subject: [PATCH 7/7] Trigger CI Signed-off-by: Kemal Akkoyun