Skip to content

Commit

Permalink
.*: Add new http-grace-period flag (#1680)
Browse files Browse the repository at this point in the history
* Add new http-grace-period flag

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Update CHANGELOG

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Update docs

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Update pkg/server/http.go

Co-Authored-By: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Rename initializer for HTTP server

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
kakkoyun authored and GiedriusS committed Oct 28, 2019
1 parent 4cee079 commit 13238d8
Show file tree
Hide file tree
Showing 17 changed files with 226 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up.
- [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss-configuration) for further information.
- [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down.

### Fixed

Expand Down
13 changes: 10 additions & 3 deletions cmd/thanos/compact.go
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -79,6 +80,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
Hidden().Default("false").Bool()

httpAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process compactions.").
Default("./data").String()
Expand Down Expand Up @@ -116,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
time.Duration(*httpGracePeriod),
*dataDir,
objStoreConfig,
time.Duration(*consistencyDelay),
Expand Down Expand Up @@ -143,6 +146,7 @@ func runCompact(
logger log.Logger,
reg *prometheus.Registry,
httpBindAddr string,
httpGracePeriod time.Duration,
dataDir string,
objStoreConfig *extflag.PathOrContent,
consistencyDelay time.Duration,
Expand Down Expand Up @@ -175,9 +179,12 @@ func runCompact(

statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil {
return errors.Wrap(err, "schedule HTTP server with probes")
}
srv := server.NewHTTP(logger, reg, component, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
)

g.Add(srv.ListenAndServe, srv.Shutdown)

confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions cmd/thanos/downsample.go
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -34,14 +35,15 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(comp.String(), "continuously downsamples blocks in an object store bucket")

httpAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runDownsample(g, logger, reg, *httpAddr, *dataDir, objStoreConfig, comp)
return runDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp)
}
}

Expand Down Expand Up @@ -73,6 +75,7 @@ func runDownsample(
logger log.Logger,
reg *prometheus.Registry,
httpBindAddr string,
httpGracePeriod time.Duration,
dataDir string,
objStoreConfig *extflag.PathOrContent,
comp component.Component,
Expand Down Expand Up @@ -123,9 +126,11 @@ func runDownsample(
}

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
return errors.Wrap(err, "schedule HTTP server with probe")
}
srv := server.NewHTTP(logger, reg, comp, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
)
g.Add(srv.ListenAndServe, srv.Shutdown)

level.Info(logger).Log("msg", "starting downsample node")
return nil
Expand Down
14 changes: 9 additions & 5 deletions cmd/thanos/flags.go
Expand Up @@ -10,6 +10,13 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

func modelDuration(flags *kingpin.FlagClause) *model.Duration {
value := new(model.Duration)
flags.SetValue(value)

return value
}

func regGRPCFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
grpcTLSSrvCert *string,
Expand All @@ -33,11 +40,8 @@ func regHTTPAddrFlag(cmd *kingpin.CmdClause) *string {
return cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String()
}

func modelDuration(flags *kingpin.FlagClause) *model.Duration {
value := new(model.Duration)
flags.SetValue(value)

return value
func regHTTPGracePeriodFlag(cmd *kingpin.CmdClause) *model.Duration {
return modelDuration(cmd.Flag("http-grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("5s"))
}

func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string, required bool, extraDesc ...string) *extflag.PathOrContent {
Expand Down
46 changes: 0 additions & 46 deletions cmd/thanos/main.go
Expand Up @@ -8,9 +8,6 @@ import (
"io"
"io/ioutil"
"math"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"path/filepath"
Expand All @@ -29,11 +26,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/tracing/client"
Expand Down Expand Up @@ -230,18 +223,6 @@ func interrupt(logger log.Logger, cancel <-chan struct{}) error {
}
}

func registerProfile(mux *http.ServeMux) {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{}))
}

func defaultGRPCTLSServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
opts := []grpc.ServerOption{}
tlsCfg, err := defaultTLSServerOpts(log.With(logger, "protocol", "gRPC"), cert, key, clientCA)
Expand Down Expand Up @@ -383,30 +364,3 @@ func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer ope

return s
}

// scheduleHTTPServer starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
// profiling and liveness/readiness probes.
func scheduleHTTPServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, readinessProber *prober.Prober, httpBindAddr string, handler http.Handler, comp component.Component) error {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
readinessProber.RegisterInMux(mux)
if handler != nil {
mux.Handle("/", handler)
}

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "listening for requests and metrics", "component", comp.String(), "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrapf(http.Serve(l, mux), "serve %s and metrics", comp.String())
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "%s and metric listener", comp.String())
})
return nil
}
14 changes: 11 additions & 3 deletions cmd/thanos/query.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/thanos-io/thanos/pkg/query"
v1 "github.com/thanos-io/thanos/pkg/query/api"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/server"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
Expand All @@ -45,6 +46,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")

httpBindAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
grpcBindAddr, srvCert, srvKey, srvClientCA := regGRPCFlags(cmd)

secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool()
Expand Down Expand Up @@ -140,6 +142,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*caCert,
*serverName,
*httpBindAddr,
time.Duration(*httpGracePeriod),
*webRoutePrefix,
*webExternalPrefix,
*webPrefixHeaderName,
Expand Down Expand Up @@ -222,6 +225,7 @@ func runQuery(
caCert string,
serverName string,
httpBindAddr string,
httpGracePeriod time.Duration,
webRoutePrefix string,
webExternalPrefix string,
webPrefixHeaderName string,
Expand Down Expand Up @@ -376,9 +380,13 @@ func runQuery(
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil {
return errors.Wrap(err, "schedule HTTP server with probes")
}
srv := server.NewHTTP(logger, reg, comp, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
)
srv.Handle("/", router)

g.Add(srv.ListenAndServe, srv.Shutdown)
}
// Start query (proxy) gRPC StoreAPI.
{
Expand Down
12 changes: 9 additions & 3 deletions cmd/thanos/receive.go
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -36,6 +37,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(comp.String(), "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)")

httpBindAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
grpcBindAddr, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd)

rwAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Expand Down Expand Up @@ -109,6 +111,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
*grpcKey,
*grpcClientCA,
*httpBindAddr,
time.Duration(*httpGracePeriod),
*rwAddress,
*rwServerCert,
*rwServerKey,
Expand Down Expand Up @@ -142,6 +145,7 @@ func runReceive(
grpcKey string,
grpcClientCA string,
httpBindAddr string,
httpGracePeriod time.Duration,
rwAddress string,
rwServerCert string,
rwServerKey string,
Expand Down Expand Up @@ -335,9 +339,11 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up http server")
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
return errors.Wrap(err, "schedule HTTP server with probes")
}
srv := server.NewHTTP(logger, reg, comp, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
)
g.Add(srv.ListenAndServe, srv.Shutdown)

level.Debug(logger).Log("msg", "setting up grpc server")
{
Expand Down
14 changes: 11 additions & 3 deletions cmd/thanos/rule.go
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -62,6 +63,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(comp.String(), "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket")

httpBindAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)

labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated). Similar to external labels for Prometheus, used to identify ruler and its blocks as unique source.").
Expand Down Expand Up @@ -161,6 +163,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
*key,
*clientCA,
*httpBindAddr,
time.Duration(*httpGracePeriod),
*webRoutePrefix,
*webExternalPrefix,
*webPrefixHeaderName,
Expand Down Expand Up @@ -196,6 +199,7 @@ func runRule(
key string,
clientCA string,
httpBindAddr string,
httpGracePeriod time.Duration,
webRoutePrefix string,
webExternalPrefix string,
webPrefixHeaderName string,
Expand Down Expand Up @@ -548,9 +552,13 @@ func runRule(
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil {
return errors.Wrap(err, "schedule HTTP server with probes")
}
srv := server.NewHTTP(logger, reg, comp, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
)
srv.Handle("/", router)

g.Add(srv.ListenAndServe, srv.Shutdown)
}

confContentYaml, err := objStoreConfig.Content()
Expand Down
15 changes: 11 additions & 4 deletions cmd/thanos/sidecar.go
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -36,6 +37,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(component.Sidecar.String(), "sidecar for Prometheus server")

httpBindAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)

promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network.").
Expand Down Expand Up @@ -81,6 +83,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
*key,
*clientCA,
*httpBindAddr,
time.Duration(*httpGracePeriod),
*promURL,
*promReadyTimeout,
*dataDir,
Expand All @@ -103,6 +106,7 @@ func runSidecar(
key string,
clientCA string,
httpBindAddr string,
httpGracePeriod time.Duration,
promURL *url.URL,
promReadyTimeout time.Duration,
dataDir string,
Expand Down Expand Up @@ -134,11 +138,14 @@ func runSidecar(
uploads = false
}

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
return errors.Wrap(err, "schedule HTTP server with probes")
}
statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
srv := server.NewHTTP(logger, reg, comp, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
)

g.Add(srv.ListenAndServe, srv.Shutdown)

// Setup all the concurrent groups.
{
Expand Down

0 comments on commit 13238d8

Please sign in to comment.