Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: Add readiness probe to query #1534

Merged
merged 2 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ func runCompact(

statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := defaultHTTPListener(g, logger, reg, httpBindAddr, statusProber); err != nil {
return errors.Wrap(err, "create readiness prober")
if err := defaultHTTPListener(component, g, logger, reg, nil, httpBindAddr, statusProber); err != nil {
return errors.Wrap(err, "create default HTTP server with readiness prober")
}

confContentYaml, err := objStoreConfig.Content()
Expand Down
14 changes: 9 additions & 5 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -74,7 +75,7 @@ func main() {
cmds := map[string]setupFunc{}
registerSidecar(cmds, app)
registerStore(cmds, app, "store")
registerQuery(cmds, app, "query")
registerQuery(cmds, app)
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
registerRule(cmds, app, "rule")
registerCompact(cmds, app)
registerBucket(cmds, app, "bucket")
Expand Down Expand Up @@ -355,24 +356,27 @@ func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Regi

// defaultHTTPListener starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
// profiling and liveness/readiness probes.
func defaultHTTPListener(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, readinessProber *prober.Prober) error {
func defaultHTTPListener(comp component.Component, g *run.Group, logger log.Logger, reg *prometheus.Registry, router http.Handler, httpBindAddr string, readinessProber *prober.Prober) error {
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
readinessProber.RegisterInMux(mux)
if router != nil {
mux.Handle("/", router)
}

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "listening for metrics", "address", httpBindAddr)
level.Info(logger).Log("msg", fmt.Sprintf("listening for %s and metrics", comp.String()), "address", httpBindAddr)
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve metrics")
return errors.Wrap(http.Serve(l, mux), fmt.Sprintf("serve %s and metrics", comp.String()))
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "metric listener")
runutil.CloseWithLogOnErr(logger, l, fmt.Sprintf("%s and metric listener", comp.String()))
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
})
return nil
}
38 changes: 13 additions & 25 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
v1 "github.com/thanos-io/thanos/pkg/query/api"
"github.com/thanos-io/thanos/pkg/runutil"
Expand All @@ -42,8 +43,9 @@ import (
)

// registerQuery registers a query command.
func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")
func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Query
cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")

grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA := regCommonServerFlags(cmd)

Expand Down Expand Up @@ -99,7 +101,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string

storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
return errors.Wrap(err, "parse federation labels")
Expand Down Expand Up @@ -156,6 +158,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
time.Duration(*instantDefaultMaxSourceResolution),
component.Query,
)
}
}
Expand Down Expand Up @@ -274,6 +277,7 @@ func runQuery(
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
instantDefaultMaxSourceResolution time.Duration,
comp component.Component,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
duplicatedStores := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -385,6 +389,8 @@ func runQuery(
})
}
// Start query API + UI HTTP server.

statusProber := prober.NewProber(comp, logger, reg)
{
router := route.New()

Expand All @@ -408,29 +414,10 @@ func runQuery(

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos Querier is Healthy.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write health check response.")
}
})

mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
mux.Handle("/", router)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrapf(err, "listen HTTP on address %s", httpBindAddr)
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := defaultHTTPListener(comp, g, logger, reg, router, httpBindAddr, statusProber); err != nil {
return errors.Wrap(err, "create default HTTP server with readiness prober")
}

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for query and metrics", "address", httpBindAddr)
return errors.Wrap(http.Serve(l, mux), "serve query")
}, func(error) {
runutil.CloseWithLogOnErr(logger, l, "query and metric listener")
})
}
// Start query (proxy) gRPC StoreAPI.
{
Expand All @@ -448,6 +435,7 @@ func runQuery(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
statusProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func runSidecar(

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := defaultHTTPListener(g, logger, reg, httpBindAddr, statusProber); err != nil {
return errors.Wrap(err, "create readiness prober")
if err := defaultHTTPListener(comp, g, logger, reg, nil, httpBindAddr, statusProber); err != nil {
return errors.Wrap(err, "create default HTTP server with readiness prober")
}

// Setup all the concurrent groups.
Expand Down
4 changes: 4 additions & 0 deletions tutorials/kubernetes-demo/manifests/thanos-querier.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ spec:
containerPort: 10901
livenessProbe:
httpGet:
port: http
path: /-/healthy
readinessProbe:
httpGet:
port: http
path: /-/ready
---
apiVersion: v1
kind: Service
Expand Down