Skip to content

Commit

Permalink
receive: Add liveness and readiness probe (thanos-io#1537)
Browse files Browse the repository at this point in the history
* Add prober to receive

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Add changelog entries

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Update README

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Remove default

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Wait hashring to be ready

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun authored and ivan-kiselev committed Sep 26, 2019
1 parent 3fab5d9 commit 3cfc0bf
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 21 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.md
Expand Up @@ -13,12 +13,14 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Added
- [#1538](https://github.com/thanos-io/thanos/pull/1538) Added `/-/ready` and `/-/healthy` endpoints to Thanos Rule.
-[1533](https://github.com/thanos-io/thanos/pull/1533) Thanos inspect now supports the timeout flag.
- [#1537](https://github.com/thanos-io/thanos/pull/1537) Added `/-/ready` and `/-/healthy` endpoints to Thanos Receive.
- [#1534](https://github.com/thanos-io/thanos/pull/1534) Added `/-/ready` endpoint to Thanos Query.
- [#1533](https://github.com/thanos-io/thanos/pull/1533) Thanos inspect now supports the timeout flag.

### Fixed

-[#1525](https://github.com/thanos-io/thanos/pull/1525) Thanos now deletes block's file in correct order allowing to detect partial blocks without problems.
-[#1505](https://github.com/thanos-io/thanos/pull/1505) Thanos store now removes invalid local cache blocks.
- [#1525](https://github.com/thanos-io/thanos/pull/1525) Thanos now deletes block's file in correct order allowing to detect partial blocks without problems.
- [#1505](https://github.com/thanos-io/thanos/pull/1505) Thanos store now removes invalid local cache blocks.

## v0.7.0 - 2019.09.02

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/compact.go
Expand Up @@ -168,9 +168,9 @@ func runCompact(
downsampleMetrics := newDownsampleMetrics(reg)

statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil {
return errors.Wrap(err, "create default HTTP server with readiness prober")
return errors.Wrap(err, "schedule HTTP server with probes")
}

confContentYaml, err := objStoreConfig.Content()
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/main.go
Expand Up @@ -80,7 +80,7 @@ func main() {
registerCompact(cmds, app)
registerBucket(cmds, app, "bucket")
registerDownsample(cmds, app, "downsample")
registerReceive(cmds, app, "receive")
registerReceive(cmds, app)
registerChecks(cmds, app, "check")

cmd, err := app.Parse(os.Args[1:])
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/query.go
Expand Up @@ -414,9 +414,9 @@ func runQuery(

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil {
return errors.Wrap(err, "create default HTTP server with readiness prober")
return errors.Wrap(err, "schedule HTTP server with probes")
}
}
// Start query (proxy) gRPC StoreAPI.
Expand Down
36 changes: 25 additions & 11 deletions cmd/thanos/receive.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
Expand All @@ -28,11 +29,12 @@ import (
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerReceive(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)")
func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Receive
cmd := app.Command(comp.String(), "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)")

grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)
httpMetricsBindAddr := regHTTPAddrFlag(cmd)
httpBindAddr := regHTTPAddrFlag(cmd)

remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Default("0.0.0.0:19291").String()
Expand Down Expand Up @@ -62,7 +64,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri

tsdbBlockDuration := modelDuration(cmd.Flag("tsdb.block-duration", "Duration for local TSDB blocks").Default("2h").Hidden())

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
Expand Down Expand Up @@ -97,7 +99,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
*cert,
*key,
*clientCA,
*httpMetricsBindAddr,
*httpBindAddr,
*remoteWriteAddress,
*dataDir,
objStoreConfig,
Expand All @@ -109,6 +111,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application, name stri
*replicaHeader,
*replicationFactor,
*tsdbBlockDuration,
comp,
)
}
}
Expand All @@ -122,7 +125,7 @@ func runReceive(
cert string,
key string,
clientCA string,
httpMetricsBindAddr string,
httpBindAddr string,
remoteWriteAddress string,
dataDir string,
objStoreConfig *pathOrContent,
Expand All @@ -134,6 +137,7 @@ func runReceive(
replicaHeader string,
replicationFactor uint64,
tsdbBlockDuration model.Duration,
comp component.Component,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")
Expand All @@ -159,6 +163,8 @@ func runReceive(
ReplicationFactor: replicationFactor,
})

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})
Expand Down Expand Up @@ -198,6 +204,7 @@ func runReceive(
)
}

hashringReady := make(chan struct{})
level.Debug(logger).Log("msg", "setting up hashring")
{
updates := make(chan receive.Hashring)
Expand Down Expand Up @@ -227,15 +234,20 @@ func runReceive(
func() error {
select {
case h := <-updates:
close(hashringReady)
webHandler.Hashring(h)
statusProber.SetReady()
case <-cancel:
close(hashringReady)
return nil
}
select {
// If any new hashring is received, then mark the handler as unready, but keep it alive.
case <-updates:
msg := "hashring has changed; server is not ready to receive web requests."
webHandler.Hashring(nil)
level.Info(logger).Log("msg", "hashring has changed; server is not ready to receive web requests.")
statusProber.SetNotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
case <-cancel:
return nil
}
Expand All @@ -248,9 +260,10 @@ func runReceive(
)
}

level.Debug(logger).Log("msg", "setting up metric http listen-group")
if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil {
return err
level.Debug(logger).Log("msg", "setting up http server")
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
return errors.Wrap(err, "schedule HTTP server with probes")
}

level.Debug(logger).Log("msg", "setting up grpc server")
Expand All @@ -277,6 +290,8 @@ func runReceive(
}
s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts)

// Wait hashring to be ready before start serving metrics
<-hashringReady
level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
Expand Down Expand Up @@ -343,6 +358,5 @@ func runReceive(
}

level.Info(logger).Log("msg", "starting receiver")

return nil
}
4 changes: 2 additions & 2 deletions cmd/thanos/sidecar.go
Expand Up @@ -120,9 +120,9 @@ func runSidecar(
}

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
return errors.Wrap(err, "create default HTTP server with readiness prober")
return errors.Wrap(err, "schedule HTTP server with probes")
}

// Setup all the concurrent groups.
Expand Down

0 comments on commit 3cfc0bf

Please sign in to comment.