diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a737ca62..4eb9246d25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page - [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs. +- [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails. ## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10 diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index e63c3cf0e5..d5da7d23b4 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -15,13 +15,13 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" - kingpin "gopkg.in/alecthomas/kingpin.v2" + "gopkg.in/alecthomas/kingpin.v2" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" @@ -82,7 +82,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64() - forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden()) + forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) @@ -163,8 +163,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { *replicaHeader, *replicationFactor, time.Duration(*forwardTimeout), - comp, *allowOutOfOrderUpload, + comp, ) } } @@ -202,8 +202,8 @@ func runReceive( replicaHeader string, replicationFactor uint64, forwardTimeout time.Duration, - comp component.SourceStoreAPI, allowOutOfOrderUpload bool, + comp component.SourceStoreAPI, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive") @@ -294,6 +294,7 @@ func runReceive( level.Debug(logger).Log("msg", "setting up tsdb") { + log.With(logger, "component", "storage") dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_receive_multi_db_updates_attempted_total", Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes", @@ -311,12 +312,17 @@ func runReceive( // Before quitting, ensure the WAL is flushed and the DBs are closed. defer func() { + level.Info(logger).Log("msg", "shutting down storage") if err := dbs.Flush(); err != nil { - level.Warn(logger).Log("err", err, "msg", "failed to flush storage") + level.Error(logger).Log("err", err, "msg", "failed to flush storage") + } else { + level.Info(logger).Log("msg", "storage is flushed successfully") } if err := dbs.Close(); err != nil { - level.Warn(logger).Log("err", err, "msg", "failed to close multi db") + level.Error(logger).Log("err", err, "msg", "failed to close storage") + return } + level.Info(logger).Log("msg", "storage is closed") }() for { @@ -328,7 +334,7 @@ func runReceive( return nil } dbUpdatesStarted.Inc() - level.Info(logger).Log("msg", "updating Multi DB") + level.Info(logger).Log("msg", "updating storage") if err := dbs.Flush(); err != nil { return errors.Wrap(err, "flushing storage") @@ -341,7 +347,7 @@ func runReceive( <-uploadDone } statusProber.Ready() - level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests") + level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests") dbUpdatesCompleted.Inc() dbReady <- struct{}{} } @@ -394,7 +400,7 @@ func runReceive( return nil } webHandler.Hashring(h) - msg := "hashring has changed; server is not ready to receive web requests." + msg := "hashring has changed; server is not ready to receive web requests" statusProber.NotReady(errors.New(msg)) level.Info(logger).Log("msg", msg) hashringChangedChan <- struct{}{} @@ -489,57 +495,67 @@ func runReceive( } if upload { - level.Debug(logger).Log("msg", "upload enabled") - if err := dbs.Sync(context.Background()); err != nil { - level.Warn(logger).Log("msg", "initial upload failed", "err", err) - } + logger := log.With(logger, "component", "uploader") + upload := func(ctx context.Context) error { + level.Debug(logger).Log("msg", "upload starting") + start := time.Now() + if err := dbs.Sync(ctx); err != nil { + level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err) + return err + } + level.Debug(logger).Log("msg", "upload done", "elapsed", time.Since(start)) + return nil + } { - // Run the uploader in a loop. - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { - if err := dbs.Sync(ctx); err != nil { - level.Warn(logger).Log("msg", "interval upload failed", "err", err) - } - - return nil - }) - }, func(error) { - cancel() - }) + level.Info(logger).Log("msg", "upload enabled, starting initial sync") + if err := upload(context.Background()); err != nil { + return errors.Wrap(err, "initial upload failed") + } + level.Info(logger).Log("msg", "initial sync done") } - { - // Upload on demand. ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { // Ensure we clean up everything properly. defer func() { runutil.CloseWithLogOnErr(logger, bkt, "bucket client") }() + // Before quitting, ensure all blocks are uploaded. defer func() { - <-uploadC - if err := dbs.Sync(context.Background()); err != nil { - level.Warn(logger).Log("msg", "on demnad upload failed", "err", err) + <-uploadC // Closed by storage routine when it's done. + level.Info(logger).Log("msg", "uploading the final cut block before exiting") + ctx, cancel := context.WithCancel(context.Background()) + if err := dbs.Sync(ctx); err != nil { + cancel() + level.Error(logger).Log("msg", "the final upload failed", "err", err) + return } + cancel() + level.Info(logger).Log("msg", "the final cut block was uploaded") }() + defer close(uploadDone) + + // Run the uploader in a loop. + tick := time.NewTicker(30 * time.Second) + defer tick.Stop() + for { - select { - case <-ctx.Done(): - return nil - default: - } select { case <-ctx.Done(): return nil case <-uploadC: - if err := dbs.Sync(ctx); err != nil { - level.Warn(logger).Log("err", err) + // Upload on demand. + if err := upload(ctx); err != nil { + level.Warn(logger).Log("msg", "on demand upload failed", "err", err) } uploadDone <- struct{}{} + case <-tick.C: + if err := upload(ctx); err != nil { + level.Warn(logger).Log("msg", "recurring upload failed", "err", err) + } } } }, func(error) { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 7708a6e9e4..420a9e5ff9 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -18,13 +18,14 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" + "golang.org/x/sync/errgroup" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" - "golang.org/x/sync/errgroup" ) type MultiTSDB struct { @@ -57,7 +58,7 @@ func NewMultiTSDB( return &MultiTSDB{ dataDir: dataDir, - logger: l, + logger: log.With(l, "component", "multi-tsdb"), reg: reg, tsdbOpts: tsdbOpts, mtx: &sync.RWMutex{}, diff --git a/pkg/server/grpc/grpc.go b/pkg/server/grpc/grpc.go index d90fe59334..2300c98968 100644 --- a/pkg/server/grpc/grpc.go +++ b/pkg/server/grpc/grpc.go @@ -18,16 +18,17 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/prober" - "github.com/thanos-io/thanos/pkg/rules/rulespb" - "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/tracing" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" grpc_health "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" + + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/prober" + "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tracing" ) // A Server defines parameters to serve RPC requests, a wrapper around grpc.Server. @@ -123,10 +124,11 @@ func (s *Server) ListenAndServe() error { // Shutdown gracefully shuts down the server by waiting, // for specified amount of time (by gracePeriod) for connections to return to idle and then shut down. func (s *Server) Shutdown(err error) { - defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err) + level.Info(s.logger).Log("msg", "internal server is shutting down", "err", err) if s.opts.gracePeriod == 0 { s.srv.Stop() + level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err) return } @@ -144,9 +146,11 @@ func (s *Server) Shutdown(err error) { case <-ctx.Done(): level.Info(s.logger).Log("msg", "grace period exceeded enforcing shutdown") s.srv.Stop() + return case <-stopped: cancel() } + level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err) } // ReadWriteStoreServer is a StoreServer and a WriteableStoreServer. diff --git a/pkg/server/http/http.go b/pkg/server/http/http.go index 162da79a77..b916819132 100644 --- a/pkg/server/http/http.go +++ b/pkg/server/http/http.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/prober" ) @@ -60,15 +61,15 @@ func (s *Server) ListenAndServe() error { // Shutdown gracefully shuts down the server by waiting, // for specified amount of time (by gracePeriod) for connections to return to idle and then shut down. func (s *Server) Shutdown(err error) { + level.Info(s.logger).Log("msg", "internal server is shutting down", "err", err) if err == http.ErrServerClosed { level.Warn(s.logger).Log("msg", "internal server closed unexpectedly") return } - defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err) - if s.opts.gracePeriod == 0 { s.srv.Close() + level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err) return } @@ -77,7 +78,9 @@ func (s *Server) Shutdown(err error) { if err := s.srv.Shutdown(ctx); err != nil { level.Error(s.logger).Log("msg", "internal server shut down failed", "err", err) + return } + level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err) } // Handle registers the handler for the given pattern.