diff --git a/CHANGELOG.md b/CHANGELOG.md index 331a702fa32..b40e93880e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up. - [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss-configuration) for further information. +- [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down. ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 4bbb68cea2d..09ea8fba938 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -11,6 +11,7 @@ import ( "time" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/server" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -79,6 +80,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { Hidden().Default("false").Bool() httpAddr := regHTTPAddrFlag(cmd) + httpGracePeriod := regHTTPGracePeriodFlag(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process compactions."). Default("./data").String() @@ -116,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { return runCompact(g, logger, reg, *httpAddr, + time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, time.Duration(*consistencyDelay), @@ -143,6 +146,7 @@ func runCompact( logger log.Logger, reg *prometheus.Registry, httpBindAddr string, + httpGracePeriod time.Duration, dataDir string, objStoreConfig *extflag.PathOrContent, consistencyDelay time.Duration, @@ -175,9 +179,12 @@ func runCompact( statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil { - return errors.Wrap(err, "schedule HTTP server with probes") - } + srv := server.NewHTTP(logger, reg, component, statusProber, + server.WithListen(httpBindAddr), + server.WithGracePeriod(httpGracePeriod), + ) + + g.Add(srv.ListenAndServe, srv.Shutdown) confContentYaml, err := objStoreConfig.Content() if err != nil { diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index ef5f3fbef64..9e360cec679 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -7,6 +7,7 @@ import ( "time" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/server" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -34,6 +35,7 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(comp.String(), "continuously downsamples blocks in an object store bucket") httpAddr := regHTTPAddrFlag(cmd) + httpGracePeriod := regHTTPGracePeriodFlag(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings."). Default("./data").String() @@ -41,7 +43,7 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application) { objStoreConfig := regCommonObjStoreFlags(cmd, "", true) m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { - return runDownsample(g, logger, reg, *httpAddr, *dataDir, objStoreConfig, comp) + return runDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp) } } @@ -73,6 +75,7 @@ func runDownsample( logger log.Logger, reg *prometheus.Registry, httpBindAddr string, + httpGracePeriod time.Duration, dataDir string, objStoreConfig *extflag.PathOrContent, comp component.Component, @@ -123,9 +126,11 @@ func runDownsample( } // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil { - return errors.Wrap(err, "schedule HTTP server with probe") - } + srv := server.NewHTTP(logger, reg, comp, statusProber, + server.WithListen(httpBindAddr), + server.WithGracePeriod(httpGracePeriod), + ) + g.Add(srv.ListenAndServe, srv.Shutdown) level.Info(logger).Log("msg", "starting downsample node") return nil diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go index f8cd6570cc1..135552e6f6a 100644 --- a/cmd/thanos/flags.go +++ b/cmd/thanos/flags.go @@ -10,6 +10,13 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) +func modelDuration(flags *kingpin.FlagClause) *model.Duration { + value := new(model.Duration) + flags.SetValue(value) + + return value +} + func regGRPCFlags(cmd *kingpin.CmdClause) ( grpcBindAddr *string, grpcTLSSrvCert *string, @@ -33,11 +40,8 @@ func regHTTPAddrFlag(cmd *kingpin.CmdClause) *string { return cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String() } -func modelDuration(flags *kingpin.FlagClause) *model.Duration { - value := new(model.Duration) - flags.SetValue(value) - - return value +func regHTTPGracePeriodFlag(cmd *kingpin.CmdClause) *model.Duration { + return modelDuration(cmd.Flag("http-grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("5s")) } func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string, required bool, extraDesc ...string) *extflag.PathOrContent { diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index f232c9b5d63..3e93628e9b8 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -8,9 +8,6 @@ import ( "io" "io/ioutil" "math" - "net" - "net/http" - "net/http/pprof" "os" "os/signal" "path/filepath" @@ -29,11 +26,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/version" - "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/prober" - "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/tracing/client" @@ -230,18 +223,6 @@ func interrupt(logger log.Logger, cancel <-chan struct{}) error { } } -func registerProfile(mux *http.ServeMux) { - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) -} - -func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) { - mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{})) -} - func defaultGRPCTLSServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) { opts := []grpc.ServerOption{} tlsCfg, err := defaultTLSServerOpts(log.With(logger, "protocol", "gRPC"), cert, key, clientCA) @@ -383,30 +364,3 @@ func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer ope return s } - -// scheduleHTTPServer starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics, -// profiling and liveness/readiness probes. -func scheduleHTTPServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, readinessProber *prober.Prober, httpBindAddr string, handler http.Handler, comp component.Component) error { - mux := http.NewServeMux() - registerMetrics(mux, reg) - registerProfile(mux) - readinessProber.RegisterInMux(mux) - if handler != nil { - mux.Handle("/", handler) - } - - l, err := net.Listen("tcp", httpBindAddr) - if err != nil { - return errors.Wrap(err, "listen metrics address") - } - - g.Add(func() error { - level.Info(logger).Log("msg", "listening for requests and metrics", "component", comp.String(), "address", httpBindAddr) - readinessProber.SetHealthy() - return errors.Wrapf(http.Serve(l, mux), "serve %s and metrics", comp.String()) - }, func(err error) { - readinessProber.SetNotHealthy(err) - runutil.CloseWithLogOnErr(logger, l, "%s and metric listener", comp.String()) - }) - return nil -} diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index e4c98de02b1..c9d93b4feb1 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -31,6 +31,7 @@ import ( "github.com/thanos-io/thanos/pkg/query" v1 "github.com/thanos-io/thanos/pkg/query/api" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/server" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" @@ -45,6 +46,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes") httpBindAddr := regHTTPAddrFlag(cmd) + httpGracePeriod := regHTTPGracePeriodFlag(cmd) grpcBindAddr, srvCert, srvKey, srvClientCA := regGRPCFlags(cmd) secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() @@ -140,6 +142,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { *caCert, *serverName, *httpBindAddr, + time.Duration(*httpGracePeriod), *webRoutePrefix, *webExternalPrefix, *webPrefixHeaderName, @@ -222,6 +225,7 @@ func runQuery( caCert string, serverName string, httpBindAddr string, + httpGracePeriod time.Duration, webRoutePrefix string, webExternalPrefix string, webPrefixHeaderName string, @@ -376,9 +380,13 @@ func runQuery( api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil { - return errors.Wrap(err, "schedule HTTP server with probes") - } + srv := server.NewHTTP(logger, reg, comp, statusProber, + server.WithListen(httpBindAddr), + server.WithGracePeriod(httpGracePeriod), + ) + srv.Handle("/", router) + + g.Add(srv.ListenAndServe, srv.Shutdown) } // Start query (proxy) gRPC StoreAPI. { diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index a6858d3bf3f..649b03ef5c8 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -9,6 +9,7 @@ import ( "time" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/server" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -36,6 +37,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(comp.String(), "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)") httpBindAddr := regHTTPAddrFlag(cmd) + httpGracePeriod := regHTTPGracePeriodFlag(cmd) grpcBindAddr, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) rwAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). @@ -109,6 +111,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { *grpcKey, *grpcClientCA, *httpBindAddr, + time.Duration(*httpGracePeriod), *rwAddress, *rwServerCert, *rwServerKey, @@ -142,6 +145,7 @@ func runReceive( grpcKey string, grpcClientCA string, httpBindAddr string, + httpGracePeriod time.Duration, rwAddress string, rwServerCert string, rwServerKey string, @@ -335,9 +339,11 @@ func runReceive( level.Debug(logger).Log("msg", "setting up http server") // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil { - return errors.Wrap(err, "schedule HTTP server with probes") - } + srv := server.NewHTTP(logger, reg, comp, statusProber, + server.WithListen(httpBindAddr), + server.WithGracePeriod(httpGracePeriod), + ) + g.Add(srv.ListenAndServe, srv.Shutdown) level.Debug(logger).Log("msg", "setting up grpc server") { diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 5e76f5bc1f6..dedb979b2c3 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -18,6 +18,7 @@ import ( "time" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/server" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -62,6 +63,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(comp.String(), "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket") httpBindAddr := regHTTPAddrFlag(cmd) + httpGracePeriod := regHTTPGracePeriodFlag(cmd) grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd) labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated). Similar to external labels for Prometheus, used to identify ruler and its blocks as unique source."). @@ -161,6 +163,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { *key, *clientCA, *httpBindAddr, + time.Duration(*httpGracePeriod), *webRoutePrefix, *webExternalPrefix, *webPrefixHeaderName, @@ -196,6 +199,7 @@ func runRule( key string, clientCA string, httpBindAddr string, + httpGracePeriod time.Duration, webRoutePrefix string, webExternalPrefix string, webPrefixHeaderName string, @@ -548,9 +552,13 @@ func runRule( api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil { - return errors.Wrap(err, "schedule HTTP server with probes") - } + srv := server.NewHTTP(logger, reg, comp, statusProber, + server.WithListen(httpBindAddr), + server.WithGracePeriod(httpGracePeriod), + ) + srv.Handle("/", router) + + g.Add(srv.ListenAndServe, srv.Shutdown) } confContentYaml, err := objStoreConfig.Content() diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 416fb67d2d5..e9668065a67 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -9,6 +9,7 @@ import ( "time" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/server" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -36,6 +37,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(component.Sidecar.String(), "sidecar for Prometheus server") httpBindAddr := regHTTPAddrFlag(cmd) + httpGracePeriod := regHTTPGracePeriodFlag(cmd) grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd) promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network."). @@ -81,6 +83,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) { *key, *clientCA, *httpBindAddr, + time.Duration(*httpGracePeriod), *promURL, *promReadyTimeout, *dataDir, @@ -103,6 +106,7 @@ func runSidecar( key string, clientCA string, httpBindAddr string, + httpGracePeriod time.Duration, promURL *url.URL, promReadyTimeout time.Duration, dataDir string, @@ -134,11 +138,14 @@ func runSidecar( uploads = false } - statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil { - return errors.Wrap(err, "schedule HTTP server with probes") - } + statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) + srv := server.NewHTTP(logger, reg, comp, statusProber, + server.WithListen(httpBindAddr), + server.WithGracePeriod(httpGracePeriod), + ) + + g.Add(srv.ListenAndServe, srv.Shutdown) // Setup all the concurrent groups. { diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 6326b381041..54e8a631247 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -6,6 +6,7 @@ import ( "time" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/server" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -30,6 +31,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.") httpBindAddr := regHTTPAddrFlag(cmd) + httpGracePeriod := regHTTPGracePeriodFlag(cmd) grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache remote blocks."). @@ -83,6 +85,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { *key, *clientCA, *httpBindAddr, + time.Duration(*httpGracePeriod), uint64(*indexCacheSize), uint64(*chunkPoolSize), uint64(*maxSampleCount), @@ -114,6 +117,7 @@ func runStore( key string, clientCA string, httpBindAddr string, + httpGracePeriod time.Duration, indexCacheSizeBytes uint64, chunkPoolSizeBytes uint64, maxSampleCount uint64, @@ -128,9 +132,12 @@ func runStore( ) error { // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) - if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil { - return errors.Wrap(err, "schedule HTTP server") - } + srv := server.NewHTTP(logger, reg, component, statusProber, + server.WithListen(httpBindAddr), + server.WithGracePeriod(httpGracePeriod), + ) + + g.Add(srv.ListenAndServe, srv.Shutdown) confContentYaml, err := objStoreConfig.Content() if err != nil { diff --git a/docs/components/compact.md b/docs/components/compact.md index b39b65ee65c..df1056f571e 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -84,6 +84,8 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. + --http-grace-period=5s Time to wait after an interrupt received for HTTP + Server. --data-dir="./data" Data directory in which to cache blocks and process compactions. --objstore.config-file= diff --git a/docs/components/query.md b/docs/components/query.md index c10c03b3285..3ebf692471a 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -259,6 +259,8 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. + --http-grace-period=5s Time to wait after an interrupt received for + HTTP Server. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable diff --git a/docs/components/rule.md b/docs/components/rule.md index ba766090e07..a697dc02fbb 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -168,6 +168,8 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. + --http-grace-period=5s Time to wait after an interrupt received for + HTTP Server. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index a1d1a92112c..5546ad20f20 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -101,6 +101,8 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. + --http-grace-period=5s Time to wait after an interrupt received for + HTTP Server. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable diff --git a/docs/components/store.md b/docs/components/store.md index 9e5b7fe9433..6f11cebc9d5 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -51,6 +51,8 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. + --http-grace-period=5s Time to wait after an interrupt received for + HTTP Server. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable diff --git a/pkg/server/http.go b/pkg/server/http.go new file mode 100644 index 00000000000..344ebac78ff --- /dev/null +++ b/pkg/server/http.go @@ -0,0 +1,102 @@ +package server + +import ( + "context" + "net" + "net/http" + "net/http/pprof" + "time" + + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/prober" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type Server struct { + logger log.Logger + comp component.Component + prober *prober.Prober + + mux *http.ServeMux + srv *http.Server + listener net.Listener + + opts options +} + +func NewHTTP(logger log.Logger, reg *prometheus.Registry, comp component.Component, prober *prober.Prober, opts ...Option) Server { + options := options{ + gracePeriod: 5 * time.Second, + listen: "0.0.0.0:10902", + } + + for _, o := range opts { + o.apply(&options) + } + + mux := http.NewServeMux() + registerMetrics(mux, reg) + registerProfiler(mux) + prober.RegisterInMux(mux) + + return Server{ + logger: log.With(logger, "service", "http/server"), + comp: comp, + prober: prober, + mux: mux, + srv: &http.Server{Addr: options.listen, Handler: mux}, + opts: options, + } +} + +func (s *Server) ListenAndServe() error { + l, err := net.Listen("tcp", s.opts.listen) + if err != nil { + return errors.Wrap(err, "listen metrics address") + } + s.listener = l + s.prober.SetHealthy() + level.Info(s.logger).Log("msg", "listening for requests and metrics", "component", s.comp.String(), "address", s.opts.listen) + + return errors.Wrapf(http.Serve(l, s.mux), "serve %s and metrics", s.comp.String()) +} + +func (s *Server) Shutdown(err error) { + s.prober.SetNotReady(err) + defer s.prober.SetNotHealthy(err) + + if err == http.ErrServerClosed { + level.Warn(s.logger).Log("msg", "internal server closed unexpectedly") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), s.opts.gracePeriod) + defer cancel() + + level.Info(s.logger).Log("msg", "server shut down internal server") + + if err := s.srv.Shutdown(ctx); err != nil { + level.Error(s.logger).Log("msg", "server shut down failed", "err", err, "component", s.comp.String()) + } +} + +func (s *Server) Handle(pattern string, handler http.Handler) { + s.mux.Handle(pattern, handler) +} + +func registerProfiler(mux *http.ServeMux) { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) +} + +func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) { + mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{})) +} diff --git a/pkg/server/option.go b/pkg/server/option.go new file mode 100644 index 00000000000..702b6dbeccc --- /dev/null +++ b/pkg/server/option.go @@ -0,0 +1,33 @@ +package server + +import ( + "time" +) + +type options struct { + gracePeriod time.Duration + listen string +} + +// Option overrides behavior of Server. +type Option interface { + apply(*options) +} + +type optionFunc func(*options) + +func (f optionFunc) apply(o *options) { + f(o) +} + +func WithGracePeriod(t time.Duration) Option { + return optionFunc(func(o *options) { + o.gracePeriod = t + }) +} + +func WithListen(s string) Option { + return optionFunc(func(o *options) { + o.listen = s + }) +}