Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of commands and flag parsing #2267

Merged
merged 1 commit into from May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 127 additions & 0 deletions cmd/thanos/config.go
@@ -0,0 +1,127 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package main

import (
"net/url"
"time"

"github.com/prometheus/common/model"
"gopkg.in/alecthomas/kingpin.v2"
)

type grpcConfig struct {
bindAddress string
gracePeriod model.Duration
tlsSrvCert string
tlsSrvKey string
tlsSrvClientCA string
}

func (gc *grpcConfig) registerFlag(cmd *kingpin.CmdClause) *grpcConfig {
cmd.Flag("grpc-address",
"Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components.").
Default("0.0.0.0:10901").StringVar(&gc.bindAddress)
cmd.Flag("grpc-grace-period",
"Time to wait after an interrupt received for GRPC Server.").
Default("2m").SetValue(&gc.gracePeriod)
cmd.Flag("grpc-server-tls-cert",
"TLS Certificate for gRPC server, leave blank to disable TLS").
Default("").StringVar(&gc.tlsSrvCert)
cmd.Flag("grpc-server-tls-key",
"TLS Key for the gRPC server, leave blank to disable TLS").
Default("").StringVar(&gc.tlsSrvKey)
cmd.Flag("grpc-server-tls-client-ca",
"TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").
Default("").StringVar(&gc.tlsSrvClientCA)
return gc
}

type httpConfig struct {
bindAddress string
gracePeriod model.Duration
}

func (hc *httpConfig) registerFlag(cmd *kingpin.CmdClause) *httpConfig {
cmd.Flag("http-address",
"Listen host:port for HTTP endpoints.").
Default("0.0.0.0:10902").StringVar(&hc.bindAddress)
cmd.Flag("http-grace-period",
"Time to wait after an interrupt received for HTTP Server.").
Default("2m").SetValue(&hc.gracePeriod)
return hc
}

type prometheusConfig struct {
url *url.URL
readyTimeout time.Duration
}

func (pc *prometheusConfig) registerFlag(cmd *kingpin.CmdClause) *prometheusConfig {
cmd.Flag("prometheus.url",
"URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URLVar(&pc.url)
cmd.Flag("prometheus.ready_timeout",
"Maximum time to wait for the Prometheus instance to start up").
Default("10m").DurationVar(&pc.readyTimeout)
return pc
}

type connConfig struct {
maxIdleConns int
maxIdleConnsPerHost int
}

func (cc *connConfig) registerFlag(cmd *kingpin.CmdClause) *connConfig {
cmd.Flag("receive.connection-pool-size",
"Controls the http MaxIdleConns. Default is 0, which is unlimited").
IntVar(&cc.maxIdleConns)
cmd.Flag("receive.connection-pool-size-per-host",
"Controls the http MaxIdleConnsPerHost").
Default("100").IntVar(&cc.maxIdleConnsPerHost)
return cc
}

type tsdbConfig struct {
path string
}

func (tc *tsdbConfig) registerFlag(cmd *kingpin.CmdClause) *tsdbConfig {
cmd.Flag("tsdb.path", "Data directory of TSDB.").Default("./data").StringVar(&tc.path)
return tc
}

type reloaderConfig struct {
confFile string
envVarConfFile string
ruleDirectories []string
}

func (rc *reloaderConfig) registerFlag(cmd *kingpin.CmdClause) *reloaderConfig {
cmd.Flag("reloader.config-file",
"Config file watched by the reloader.").
Default("").StringVar(&rc.confFile)
cmd.Flag("reloader.config-envsubst-file",
"Output file for environment variable substituted config file.").
Default("").StringVar(&rc.envVarConfFile)
cmd.Flag("reloader.rule-dir",
"Rule directories for the reloader to refresh (repeated field).").
StringsVar(&rc.ruleDirectories)
return rc
}

type shipperConfig struct {
uploadCompacted bool
ignoreBlockSize bool
}

func (sc *shipperConfig) registerFlag(cmd *kingpin.CmdClause) *shipperConfig {
cmd.Flag("shipper.upload-compacted",
"If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done.").
Default("false").BoolVar(&sc.uploadCompacted)
cmd.Flag("shipper.ignore-unequal-block-size",
"If true sidecar will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").
Default("false").Hidden().BoolVar(&sc.ignoreBlockSize)
return sc
}
3 changes: 1 addition & 2 deletions cmd/thanos/flags.go
Expand Up @@ -7,9 +7,8 @@ import (
"fmt"
"strings"

"github.com/thanos-io/thanos/pkg/extflag"

"github.com/prometheus/common/model"
"github.com/thanos-io/thanos/pkg/extflag"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down
138 changes: 53 additions & 85 deletions cmd/thanos/sidecar.go
Expand Up @@ -44,72 +44,27 @@ import (

func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(component.Sidecar.String(), "sidecar for Prometheus server")

httpBindAddr, httpGracePeriod := regHTTPFlags(cmd)
grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd)

promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URL()

promReadyTimeout := cmd.Flag("prometheus.ready_timeout", "Maximum time to wait for the Prometheus instance to start up").
Default("10m").Duration()

connectionPoolSize := cmd.Flag("receive.connection-pool-size", "Controls the http MaxIdleConns. Default is 0, which is unlimited").Int()
connectionPoolSizePerHost := cmd.Flag("receive.connection-pool-size-per-host", "Controls the http MaxIdleConnsPerHost").Default("100").Int()

dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

reloaderCfgFile := cmd.Flag("reloader.config-file", "Config file watched by the reloader.").
Default("").String()

reloaderCfgOutputFile := cmd.Flag("reloader.config-envsubst-file", "Output file for environment variable substituted config file.").
Default("").String()

reloaderRuleDirs := cmd.Flag("reloader.rule-dir", "Rule directories for the reloader to refresh (repeated field).").Strings()

objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

uploadCompacted := cmd.Flag("shipper.upload-compacted", "If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done.").Default("false").Bool()

ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true sidecar will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool()

minTime := thanosmodel.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
conf := &sidecarConfig{}
conf.registerFlag(cmd)

m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can now totally remove this item. Instead of registration we can invoke run directly I think 👍

rl := reloader.New(
log.With(logger, "component", "reloader"),
extprom.WrapRegistererWithPrefix("thanos_sidecar_", reg),
reloader.ReloadURLFromBase(*promURL),
*reloaderCfgFile,
*reloaderCfgOutputFile,
*reloaderRuleDirs,
reloader.ReloadURLFromBase(conf.prometheus.url),
conf.reloader.confFile,
conf.reloader.envVarConfFile,
conf.reloader.ruleDirectories,
)

return runSidecar(
g,
logger,
reg,
tracer,
*grpcBindAddr,
time.Duration(*grpcGracePeriod),
*grpcCert,
*grpcKey,
*grpcClientCA,
*httpBindAddr,
time.Duration(*httpGracePeriod),
*promURL,
*promReadyTimeout,
*dataDir,
objStoreConfig,
rl,
*uploadCompacted,
*ignoreBlockSize,
component.Sidecar,
*minTime,
*connectionPoolSize,
*connectionPoolSizePerHost,
*conf,
)
}
}
Expand All @@ -119,37 +74,22 @@ func runSidecar(
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
grpcGracePeriod time.Duration,
grpcCert string,
grpcKey string,
grpcClientCA string,
httpBindAddr string,
httpGracePeriod time.Duration,
promURL *url.URL,
promReadyTimeout time.Duration,
dataDir string,
objStoreConfig *extflag.PathOrContent,
reloader *reloader.Reloader,
uploadCompacted bool,
ignoreBlockSize bool,
comp component.Component,
limitMinTime thanosmodel.TimeOrDurationValue,
connectionPoolSize int,
connectionPoolSizePerHost int,
conf sidecarConfig,
) error {
var m = &promMetadata{
promURL: promURL,
promURL: conf.prometheus.url,

// Start out with the full time range. The shipper will constrain it later.
// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled.
mint: limitMinTime.PrometheusTimestamp(),
mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: limitMinTime,
limitMinTime: conf.limitMinTime,
}

confContentYaml, err := objStoreConfig.Content()
confContentYaml, err := conf.objStore.Content()
if err != nil {
return errors.Wrap(err, "getting object store config")
}
Expand All @@ -169,8 +109,8 @@ func runSidecar(
)

srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
httpserver.WithListen(conf.http.bindAddress),
httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)),
)

g.Add(func() error {
Expand Down Expand Up @@ -200,7 +140,7 @@ func runSidecar(
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, ignoreBlockSize, m); err != nil {
if err := validatePrometheus(ctx, logger, conf.shipper.ignoreBlockSize, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}
Expand Down Expand Up @@ -266,23 +206,24 @@ func runSidecar(

{
t := exthttp.NewTransport()
t.MaxIdleConnsPerHost = connectionPoolSizePerHost
t.MaxIdleConns = connectionPoolSize
t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost
t.MaxIdleConns = conf.connection.maxIdleConns
c := &http.Client{Transport: tracing.HTTPTripperware(logger, t)}

promStore, err := store.NewPrometheusStore(logger, c, promURL, component.Sidecar, m.Labels, m.Timestamps)
promStore, err := store.NewPrometheusStore(logger, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"),
conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
)
g.Add(func() error {
Expand All @@ -309,14 +250,15 @@ func runSidecar(
}
}()

if err := promclient.IsWALDirAccessible(dataDir); err != nil {
if err := promclient.IsWALDirAccessible(conf.tsdb.path); err != nil {
level.Error(logger).Log("err", err)
}

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

promReadyTimeout := conf.prometheus.readyTimeout
extLabelsCtx, cancel := context.WithTimeout(ctx, promReadyTimeout)
defer cancel()

Expand All @@ -330,10 +272,10 @@ func runSidecar(
}

var s *shipper.Shipper
if uploadCompacted {
s = shipper.NewWithCompacted(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
if conf.shipper.uploadCompacted {
s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource)
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource)
}

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
Expand Down Expand Up @@ -458,3 +400,29 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) {

return s.mint, s.maxt
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
connection connConfig
tsdb tsdbConfig
reloader reloaderConfig
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
}

func (sc *sidecarConfig) registerFlag(cmd *kingpin.CmdClause) *sidecarConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward for some tool for this, but let's do in separate PR (:

sc.http.registerFlag(cmd)
sc.grpc.registerFlag(cmd)
sc.prometheus.registerFlag(cmd)
sc.connection.registerFlag(cmd)
sc.tsdb.registerFlag(cmd)
sc.reloader.registerFlag(cmd)
sc.objStore = *regCommonObjStoreFlags(cmd, "", false)
sc.shipper.registerFlag(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
return sc
}