diff --git a/CHANGELOG.md b/CHANGELOG.md index a0d7f6f046..142465b7ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7326](https://github.com/thanos-io/thanos/pull/7326) Query: fixing exemplars proxy when querying stores with multiple tenants. - [#7335](https://github.com/thanos-io/thanos/pull/7335) Dependency: Update minio-go to v7.0.70 which includes support for EKS Pod Identity. +- [#7403](https://github.com/thanos-io/thanos/pull/7403) Sidecar: fix startup sequence ### Added diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index df8a343a88..2d51063980 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -135,10 +135,9 @@ func runSidecar( return errors.Wrap(err, "getting object store config") } - var uploads = true - if len(confContentYaml) == 0 { + var uploads = len(confContentYaml) != 0 + if !uploads { level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") - uploads = false } grpcProbe := prober.NewGRPC() @@ -149,111 +148,119 @@ func runSidecar( prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), ) - srv := httpserver.New(logger, reg, comp, httpProbe, - httpserver.WithListen(conf.http.bindAddress), - httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)), - httpserver.WithTLSConfig(conf.http.tlsConfig), - ) + // Setup the HTTP server. + { + srv := httpserver.New(logger, reg, comp, httpProbe, + httpserver.WithListen(conf.http.bindAddress), + httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)), + httpserver.WithTLSConfig(conf.http.tlsConfig), + ) - g.Add(func() error { - statusProber.Healthy() + g.Add(func() error { + statusProber.Healthy() + return srv.ListenAndServe() + }, func(err error) { - return srv.ListenAndServe() - }, func(err error) { - statusProber.NotReady(err) - defer statusProber.NotHealthy(err) + statusProber.NotReady(err) + defer statusProber.NotHealthy(err) - srv.Shutdown(err) - }) + srv.Shutdown(err) + }) + } - // Setup all the concurrent groups. + // Once we have loaded external labels from prometheus we can use this to signal the servers + // that they can start now. + readyToStartGRPC := make(chan struct{}) + + // Setup Prometheus Heartbeats. { promUp := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Name: "thanos_sidecar_prometheus_up", Help: "Boolean indicator whether the sidecar can reach its Prometheus peer.", }) - ctx := context.Background() - // Only check Prometheus's flags when upload is enabled. - if uploads { - // Check prometheus's flags to ensure same sidecar flags. - // We retry infinitely until we validated prometheus flags + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + // Only check Prometheus's flags when upload is enabled. + if uploads { + // Check prometheus's flags to ensure same sidecar flags. + // We retry infinitely until we validated prometheus flags + err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error { + iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) + defer iterCancel() + + if err := validatePrometheus(iterCtx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil { + level.Warn(logger).Log( + "msg", "failed to validate prometheus flags. Is Prometheus running? Retrying", + "err", err, + ) + return err + } + + level.Info(logger).Log( + "msg", "successfully validated prometheus flags", + ) + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to validate prometheus flags") + } + } + + // We retry infinitely until we reach and fetch BuildVersion from our Prometheus. err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error { iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) defer iterCancel() - if err := validatePrometheus(iterCtx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil { + if err := m.BuildVersion(iterCtx); err != nil { level.Warn(logger).Log( - "msg", "failed to validate prometheus flags. Is Prometheus running? Retrying", + "msg", "failed to fetch prometheus version. Is Prometheus running? Retrying", "err", err, ) return err } level.Info(logger).Log( - "msg", "successfully validated prometheus flags", + "msg", "successfully loaded prometheus version", ) return nil }) if err != nil { - return errors.Wrap(err, "failed to validate prometheus flags") + return errors.Wrap(err, "failed to get prometheus version") } - } - - // We retry infinitely until we reach and fetch BuildVersion from our Prometheus. - err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error { - iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) - defer iterCancel() - if err := m.BuildVersion(iterCtx); err != nil { - level.Warn(logger).Log( - "msg", "failed to fetch prometheus version. Is Prometheus running? Retrying", - "err", err, - ) - return err - } - - level.Info(logger).Log( - "msg", "successfully loaded prometheus version", - ) - return nil - }) - if err != nil { - return errors.Wrap(err, "failed to get prometheus version") - } + // Blocking query of external labels before joining as a Source Peer into gossip. + // We retry infinitely until we reach and fetch labels from our Prometheus. + err = runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error { + iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) + defer iterCancel() - // Blocking query of external labels before joining as a Source Peer into gossip. - // We retry infinitely until we reach and fetch labels from our Prometheus. - err = runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error { - iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) - defer iterCancel() + if err := m.UpdateLabels(iterCtx); err != nil { + level.Warn(logger).Log( + "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", + "err", err, + ) + return err + } - if err := m.UpdateLabels(iterCtx); err != nil { - level.Warn(logger).Log( - "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", - "err", err, + level.Info(logger).Log( + "msg", "successfully loaded prometheus external labels", + "external_labels", m.Labels().String(), ) - return err + return nil + }) + if err != nil { + return errors.Wrap(err, "initial external labels query") } - level.Info(logger).Log( - "msg", "successfully loaded prometheus external labels", - "external_labels", m.Labels().String(), - ) - return nil - }) - if err != nil { - return errors.Wrap(err, "initial external labels query") - } + if len(m.Labels()) == 0 { + return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.") + } + promUp.Set(1) + statusProber.Ready() - if len(m.Labels()) == 0 { - return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.") - } - promUp.Set(1) - statusProber.Ready() + close(readyToStartGRPC) - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { // Periodically query the Prometheus config. We use this as a heartbeat as well as for updating // the external labels we apply. return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error { @@ -275,6 +282,8 @@ func runSidecar( cancel() }) } + + // Setup the Reloader. { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { @@ -283,6 +292,8 @@ func runSidecar( cancel() }) } + + // Setup the gRPC server. { c := promclient.NewWithTracingClient(logger, httpClient, clientconfig.ThanosUserAgent) @@ -336,15 +347,23 @@ func runSidecar( grpcserver.WithMaxConnAge(conf.grpc.maxConnectionAge), grpcserver.WithTLSConfig(tlsCfg), ) + + ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-readyToStartGRPC: + } + statusProber.Ready() return s.ListenAndServe() }, func(err error) { + cancel() statusProber.NotReady(err) s.Shutdown(err) }) } - if uploads { // The background shipper continuously scans the data directory and uploads // new blocks to Google Cloud Storage or an S3-compatible storage service.