Skip to content

Commit

Permalink
Simplify prober (#1694)
Browse files Browse the repository at this point in the history
* Simplify prober

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Preserve existing behaviour

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Add CHANGELOG entry

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun authored and bwplotka committed Nov 26, 2019
1 parent e5d96c8 commit 825f119
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 247 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -38,6 +38,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1666](https://github.com/thanos-io/thanos/pull/1666) `thanos_compact_group_compactions_total` now counts block compactions, so operations that resulted in a compacted block. The old behaviour
is now exposed by new metric: `thanos_compact_group_compaction_runs_started_total` and `thanos_compact_group_compaction_runs_completed_total` which counts compaction runs overall.
- [#1748](https://github.com/thanos-io/thanos/pull/1748) Updated all dependencies.
- [#1694](https://github.com/thanos-io/thanos/pull/1694) `prober_ready` and `prober_healthy` metrics are removed, for sake of `status`. Now `status` exposes same metric with a label, `check`. `check` can have "healty" or "ready" depending on status of the probe.

## [v0.8.1](https://github.com/thanos-io/thanos/releases/tag/v0.8.1) - 2019.10.14

Expand Down
13 changes: 11 additions & 2 deletions cmd/thanos/bucket.go
Expand Up @@ -320,7 +320,7 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
ctx, cancel := context.WithCancel(context.Background())

statusProber := prober.NewProber(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
statusProber := prober.New(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, component.Bucket, statusProber,
httpserver.WithListen(*httpBindAddr),
Expand Down Expand Up @@ -356,7 +356,16 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
cancel()
})

g.Add(srv.ListenAndServe, srv.Shutdown)
g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})

return nil
}
Expand Down
15 changes: 12 additions & 3 deletions cmd/thanos/compact.go
Expand Up @@ -180,14 +180,23 @@ func runCompact(

downsampleMetrics := newDownsampleMetrics(reg)

statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, component, statusProber,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)

g.Add(srv.ListenAndServe, srv.Shutdown)
g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})

confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -350,7 +359,7 @@ func runCompact(
})

level.Info(logger).Log("msg", "starting compact node")
statusProber.SetReady()
statusProber.Ready()
return nil
}

Expand Down
15 changes: 12 additions & 3 deletions cmd/thanos/downsample.go
Expand Up @@ -96,14 +96,14 @@ func runDownsample(
}()

metrics := newDownsampleMetrics(reg)
statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Start cycle of syncing blocks from the bucket and garbage collecting the bucket.
{
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
statusProber.SetReady()
statusProber.Ready()

level.Info(logger).Log("msg", "start first pass of downsampling")

Expand All @@ -128,7 +128,16 @@ func runDownsample(
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
g.Add(srv.ListenAndServe, srv.Shutdown)
g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})

level.Info(logger).Log("msg", "starting downsample node")
return nil
Expand Down
19 changes: 14 additions & 5 deletions cmd/thanos/query.go
Expand Up @@ -355,7 +355,7 @@ func runQuery(
}
// Start query API + UI HTTP server.

statusProber := prober.NewProber(comp, logger, reg)
statusProber := prober.New(comp, logger, reg)
{
router := route.New()

Expand Down Expand Up @@ -386,7 +386,16 @@ func runQuery(
)
srv.Handle("/", router)

g.Add(srv.ListenAndServe, srv.Shutdown)
g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})
}
// Start query (proxy) gRPC StoreAPI.
{
Expand All @@ -402,10 +411,10 @@ func runQuery(
)

g.Add(func() error {
statusProber.SetReady()
statusProber.Ready()
return s.ListenAndServe()
}, func(err error) {
statusProber.SetNotReady(err)
}, func(error) {
statusProber.NotReady(err)
s.Shutdown(err)
})
}
Expand Down
17 changes: 13 additions & 4 deletions cmd/thanos/receive.go
Expand Up @@ -198,7 +198,7 @@ func runReceive(
TLSClientConfig: rwTLSClientConfig,
})

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -268,7 +268,7 @@ func runReceive(
level.Info(logger).Log("msg", "tsdb started")
localStorage.Set(db.Get(), startTimeMargin)
webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage))
statusProber.SetReady()
statusProber.Ready()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
dbReady <- struct{}{}
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func runReceive(
webHandler.SetWriter(nil)
webHandler.Hashring(h)
msg := "hashring has changed; server is not ready to receive web requests."
statusProber.SetNotReady(errors.New(msg))
statusProber.NotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
updateDB <- struct{}{}
case <-cancel:
Expand All @@ -337,7 +337,16 @@ func runReceive(
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
g.Add(srv.ListenAndServe, srv.Shutdown)
g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})

level.Debug(logger).Log("msg", "setting up grpc server")
{
Expand Down
18 changes: 14 additions & 4 deletions cmd/thanos/rule.go
Expand Up @@ -500,7 +500,7 @@ func runRule(
cancel()
})
}
statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Start gRPC server.
{
store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)
Expand All @@ -515,11 +515,12 @@ func runRule(
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
)

g.Add(func() error {
statusProber.SetReady()
statusProber.Ready()
return s.ListenAndServe()
}, func(err error) {
statusProber.SetNotReady(err)
statusProber.NotReady(err)
s.Shutdown(err)
})
}
Expand Down Expand Up @@ -558,7 +559,16 @@ func runRule(
)
srv.Handle("/", router)

g.Add(srv.ListenAndServe, srv.Shutdown)
g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})
}

confContentYaml, err := objStoreConfig.Content()
Expand Down
21 changes: 15 additions & 6 deletions cmd/thanos/sidecar.go
Expand Up @@ -140,13 +140,22 @@ func runSidecar(
}

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
srv := httpserver.New(logger, reg, comp, statusProber,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)

g.Add(srv.ListenAndServe, srv.Shutdown)
g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})

// Setup all the concurrent groups.
{
Expand Down Expand Up @@ -179,7 +188,7 @@ func runSidecar(
"err", err,
)
promUp.Set(0)
statusProber.SetNotReady(err)
statusProber.NotReady(err)
return err
}

Expand All @@ -188,7 +197,7 @@ func runSidecar(
"external_labels", m.Labels().String(),
)
promUp.Set(1)
statusProber.SetReady()
statusProber.Ready()
lastHeartbeat.Set(float64(time.Now().UnixNano()) / 1e9)
return nil
})
Expand Down Expand Up @@ -247,10 +256,10 @@ func runSidecar(
grpcserver.WithTLSConfig(tlsCfg),
)
g.Add(func() error {
statusProber.SetReady()
statusProber.Ready()
return s.ListenAndServe()
}, func(err error) {
statusProber.SetNotReady(err)
statusProber.NotReady(err)
s.Shutdown(err)
})
}
Expand Down
17 changes: 13 additions & 4 deletions cmd/thanos/store.go
Expand Up @@ -132,13 +132,22 @@ func runStore(
advertiseCompatibilityLabel bool,
) error {
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
srv := httpserver.New(logger, reg, component, statusProber,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)

g.Add(srv.ListenAndServe, srv.Shutdown)
g.Add(func() error {
statusProber.Healthy()

return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)

srv.Shutdown(err)
})

confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -241,10 +250,10 @@ func runStore(

g.Add(func() error {
<-bucketStoreReady
statusProber.SetReady()
statusProber.Ready()
return s.ListenAndServe()
}, func(err error) {
statusProber.SetNotReady(err)
statusProber.NotReady(err)
s.Shutdown(err)
})
}
Expand Down

0 comments on commit 825f119

Please sign in to comment.