Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Initialize gRPC server metrics #1478

Merged
merged 3 commits into from Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions CHANGELOG.md
Expand Up @@ -20,9 +20,10 @@ Accepted into CNCF:

### Added

- [#1478](https://github.com/thanos-io/thanos/pull/1478) Thanos components now exposes gRPC server metrics as soon as server starts, to provide more reliable data for instrumentation.
- [#1378](https://github.com/thanos-io/thanos/pull/1378) Thanos Receive now exposes `thanos_receive_config_hash`, `thanos_receive_config_last_reload_successful` and `thanos_receive_config_last_reload_success_timestamp_seconds` metrics to track latest configuration change
- [#1268](https://github.com/thanos-io/thanos/pull/1268) Thanos Sidecar added support for newest Prometheus streaming remote read added [here](https://github.com/prometheus/prometheus/pull/5703). This massively improves memory required by single
request for both Prometheus and sidecar. Single requests now should take constant amount of memory on sidecar, so resource consumption prediction is now straightforward. This will be used if you have Prometheus `2.13` or `2.12-master`.
- [#1268](https://github.com/thanos-io/thanos/pull/1268) Thanos Sidecar added support for newest Prometheus streaming remote read added [here](https://github.com/prometheus/prometheus/pull/5703). This massively improves memory required by single
request for both Prometheus and sidecar. Single requests now should take constant amount of memory on sidecar, so resource consumption prediction is now straightforward. This will be used if you have Prometheus `2.13` or `2.12-master`.
- [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type
- [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings
- [#1395](https://github.com/thanos-io/thanos/pull/1395) Thanos Sidecar added `/-/ready` and `/-/healthy` endpoints to Thanos sidecar.
Expand All @@ -39,7 +40,7 @@ Accepted into CNCF:
- [BUGFIX] prometheus_tsdb_compactions_failed_total is now incremented on any compaction failure. tsdb#613
- [BUGFIX] PromQL: Correctly display {__name__="a"}.
- [#1338](https://github.com/thanos-io/thanos/pull/1338) Thanos Query still warns on store API duplicate, but allows a single one from duplicated set. This is gracefully warn about the problematic logic and not disrupt immediately.
- [#1385](https://github.com/thanos-io/thanos/pull/1385) Thanos Compact exposes flag to disable downsampling `downsampling.disable`.
- [#1385](https://github.com/thanos-io/thanos/pull/1385) Thanos Compact exposes flag to disable downsampling `downsampling.disable`.

### Fixed

Expand Down
77 changes: 42 additions & 35 deletions cmd/thanos/main.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/tracing/client"
"go.uber.org/automaxprocs/maxprocs"
Expand Down Expand Up @@ -244,41 +245,8 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{}))
}

// defaultGRPCServerOpts returns default gRPC server opts that includes:
// - request histogram
// - tracing
// - panic recovery with panic counter
func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, cert, key, clientCA string) ([]grpc.ServerOption, error) {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4,
}),
)

panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
})
grpcPanicRecoveryHandler := func(p interface{}) (err error) {
panicsTotal.Inc()
level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
reg.MustRegister(met, panicsTotal)
opts := []grpc.ServerOption{
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
tracing.UnaryServerInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc_middleware.WithStreamServerChain(
met.StreamServerInterceptor(),
tracing.StreamServerInterceptor(tracer),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
}
func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
opts := []grpc.ServerOption{}

if key == "" && cert == "" {
if clientCA != "" {
Expand Down Expand Up @@ -325,6 +293,45 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4,
}),
)
panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
})
reg.MustRegister(met, panicsTotal)

grpcPanicRecoveryHandler := func(p interface{}) (err error) {
panicsTotal.Inc()
level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
opts = append(opts,
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
tracing.UnaryServerInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc_middleware.WithStreamServerChain(
met.StreamServerInterceptor(),
tracing.StreamServerInterceptor(tracer),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
)

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, srv)
met.InitializeMetrics(s)

return s
}

// TODO Remove once all components are migrated to the new defaultHTTPListener.
// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
Expand Down
7 changes: 2 additions & 5 deletions cmd/thanos/query.go
Expand Up @@ -34,7 +34,6 @@ import (
v1 "github.com/thanos-io/thanos/pkg/query/api"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
"google.golang.org/grpc"
Expand Down Expand Up @@ -441,13 +440,11 @@ func runQuery(
}
logger := log.With(logger, "component", component.Query.String())

opts, err := defaultGRPCServerOpts(logger, reg, tracer, srvCert, srvKey, srvClientCA)
opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA)
if err != nil {
return errors.Wrapf(err, "build gRPC server")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, proxy)
s := newStoreGRPCServer(logger, reg, tracer, proxy, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
Expand Down
6 changes: 2 additions & 4 deletions cmd/thanos/receive.go
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand Down Expand Up @@ -267,12 +266,11 @@ func runReceive(
db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s = grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, tsdbStore)
s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts)

level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
Expand Down
6 changes: 2 additions & 4 deletions cmd/thanos/rule.go
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -493,12 +492,11 @@ func runRule(

store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)
s := newStoreGRPCServer(logger, reg, tracer, store, opts)

g.Add(func() error {
return errors.Wrap(s.Serve(l), "serve gRPC")
Expand Down
6 changes: 2 additions & 4 deletions cmd/thanos/sidecar.go
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -220,12 +219,11 @@ func runSidecar(
return errors.Wrap(err, "create Prometheus store")
}

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, promStore)
s := newStoreGRPCServer(logger, reg, tracer, promStore, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
Expand Down
8 changes: 2 additions & 6 deletions cmd/thanos/store.go
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -187,13 +185,11 @@ func runStore(
return errors.Wrap(err, "listen API address")
}

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, bs)
s := newStoreGRPCServer(logger, reg, tracer, bs, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
Expand Down