Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/thanos/receive: remote-write client+server TLS #1668

Merged
merged 1 commit into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 61 additions & 6 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,30 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{}))
}

func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
func defaultGRPCTLSServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
opts := []grpc.ServerOption{}
tlsCfg, err := defaultTLSServerOpts(log.With(logger, "protocol", "gRPC"), cert, key, clientCA)
if err != nil {
return opts, err
}
if tlsCfg != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg)))
}
return opts, nil
}

func defaultTLSServerOpts(logger log.Logger, cert, key, clientCA string) (*tls.Config, error) {
if key == "" && cert == "" {
if clientCA != "" {
return nil, errors.New("when a client CA is used a server key and certificate must also be provided")
}

level.Info(logger).Log("msg", "disabled TLS, key and cert must be set to enable")
return opts, nil
return nil, nil
}

level.Info(logger).Log("msg", "enabling server side TLS")

if key == "" || cert == "" {
return nil, errors.New("both server key and certificate must be provided")
}
Expand All @@ -271,8 +283,6 @@ func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grp
return nil, errors.Wrap(err, "server credentials")
}

level.Info(logger).Log("msg", "enabled gRPC server side TLS")

tlsCfg.Certificates = []tls.Certificate{tlsCert}

if clientCA != "" {
Expand All @@ -288,10 +298,55 @@ func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grp
tlsCfg.ClientCAs = certPool
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert

level.Info(logger).Log("msg", "gRPC server TLS client verification enabled")
level.Info(logger).Log("msg", "server TLS client verification enabled")
}

return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
return tlsCfg, nil
}

func defaultTLSClientOpts(logger log.Logger, cert, key, caCert, serverName string) (*tls.Config, error) {
var certPool *x509.CertPool
if caCert != "" {
caPEM, err := ioutil.ReadFile(caCert)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool = x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
level.Info(logger).Log("msg", "TLS client using provided certificate pool")
} else {
var err error
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, errors.Wrap(err, "reading system certificate pool")
}
level.Info(logger).Log("msg", "TLS client using system certificate pool")
}

tlsCfg := &tls.Config{
RootCAs: certPool,
}

if serverName != "" {
tlsCfg.ServerName = serverName
}

if (key != "") != (cert != "") {
return nil, errors.New("both client key and certificate must be provided")
}

if cert != "" {
cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "client credentials")
}
tlsCfg.Certificates = []tls.Certificate{cert}
level.Info(logger).Log("msg", "TLS client authentication enabled")
}
return tlsCfg, nil
}

func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
Expand Down
54 changes: 7 additions & 47 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
Expand Down Expand Up @@ -164,7 +161,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
}
}

func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert string, serverName string) ([]grpc.DialOption, error) {
func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand Down Expand Up @@ -199,50 +196,13 @@ func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer ope
return append(dialOpts, grpc.WithInsecure()), nil
}

level.Info(logger).Log("msg", "Enabling client to server TLS")
level.Info(logger).Log("msg", "enabling client to server TLS")

var certPool *x509.CertPool

if caCert != "" {
caPEM, err := ioutil.ReadFile(caCert)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool = x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
level.Info(logger).Log("msg", "TLS Client using provided certificate pool")
} else {
var err error
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, errors.Wrap(err, "reading system certificate pool")
}
level.Info(logger).Log("msg", "TLS Client using system certificate pool")
}

tlsCfg := &tls.Config{
RootCAs: certPool,
}

if serverName != "" {
tlsCfg.ServerName = serverName
}

if cert != "" {
cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "client credentials")
}
tlsCfg.Certificates = []tls.Certificate{cert}
level.Info(logger).Log("msg", "TLS Client authentication enabled")
tlsCfg, err := defaultTLSClientOpts(logger, cert, key, caCert, serverName)
if err != nil {
return nil, err
}

creds := credentials.NewTLS(tlsCfg)

return append(dialOpts, grpc.WithTransportCredentials(creds)), nil
return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil
}

// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
Expand Down Expand Up @@ -428,7 +388,7 @@ func runQuery(
}
logger := log.With(logger, "component", component.Query.String())

opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA)
opts, err := defaultGRPCTLSServerOpts(logger, srvCert, srvKey, srvClientCA)
if err != nil {
return errors.Wrap(err, "build gRPC server")
}
Expand Down
57 changes: 44 additions & 13 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Receive
cmd := app.Command(comp.String(), "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)")

grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)
httpBindAddr := regHTTPAddrFlag(cmd)
grpcBindAddr, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd)

remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
rwAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Default("0.0.0.0:19291").String()
rwServerCert := cmd.Flag("remote-write.server-tls-cert", "TLS Certificate for HTTP server, leave blank to disable TLS").Default("").String()
rwServerKey := cmd.Flag("remote-write.server-tls-key", "TLS Key for the HTTP server, leave blank to disable TLS").Default("").String()
rwServerClientCA := cmd.Flag("remote-write.server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String()
rwClientCert := cmd.Flag("remote-write.client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String()
rwClientKey := cmd.Flag("remote-write.client-tls-key", "TLS Key for the client's certificate").Default("").String()
rwClientServerCA := cmd.Flag("remote-write.client-tls-ca", "TLS CA Certificates to use to verify servers").Default("").String()
rwClientServerName := cmd.Flag("remote-write.client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String()

dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()
Expand Down Expand Up @@ -87,7 +94,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
if hostname == "" || err != nil {
return errors.New("--receive.local-endpoint is empty and host could not be determined.")
}
parts := strings.Split(*remoteWriteAddress, ":")
parts := strings.Split(*rwAddress, ":")
port := parts[len(parts)-1]
*local = fmt.Sprintf("http://%s:%s/api/v1/receive", hostname, port)
}
Expand All @@ -98,11 +105,18 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
reg,
tracer,
*grpcBindAddr,
*cert,
*key,
*clientCA,
*grpcCert,
*grpcKey,
*grpcClientCA,
*httpBindAddr,
*remoteWriteAddress,
*rwAddress,
*rwServerCert,
*rwServerKey,
*rwServerClientCA,
*rwClientCert,
*rwClientKey,
*rwClientServerCA,
*rwClientServerName,
*dataDir,
objStoreConfig,
lset,
Expand All @@ -124,11 +138,18 @@ func runReceive(
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
cert string,
key string,
clientCA string,
grpcCert string,
grpcKey string,
grpcClientCA string,
httpBindAddr string,
remoteWriteAddress string,
rwAddress string,
rwServerCert string,
rwServerKey string,
rwServerClientCA string,
rwClientCert string,
rwClientKey string,
rwClientServerCA string,
rwClientServerName string,
dataDir string,
objStoreConfig *extflag.PathOrContent,
lset labels.Labels,
Expand All @@ -153,14 +174,24 @@ func runReceive(
}

localStorage := &tsdb.ReadyStorage{}
rwTLSConfig, err := defaultTLSServerOpts(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA)
if err != nil {
return err
}
rwTLSClientConfig, err := defaultTLSClientOpts(logger, rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName)
if err != nil {
return err
}
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
ListenAddress: remoteWriteAddress,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
Tracer: tracer,
TLSConfig: rwTLSConfig,
TLSClientConfig: rwTLSClientConfig,
})

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down Expand Up @@ -317,7 +348,7 @@ func runReceive(
startGRPC := make(chan struct{})
g.Add(func() error {
defer close(startGRPC)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
opts, err := defaultGRPCTLSServerOpts(logger, grpcCert, grpcKey, grpcClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func runRule(

store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)

opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func runSidecar(
return errors.Wrap(err, "create Prometheus store")
}

opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func runStore(
return errors.Wrap(err, "listen API address")
}

opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package receive
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io/ioutil"
stdlog "log"
Expand Down Expand Up @@ -49,6 +50,8 @@ type Options struct {
ReplicaHeader string
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
TLSClientConfig *tls.Config
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand All @@ -72,9 +75,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
logger = log.NewNopLogger()
}

client := &http.Client{}
transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = o.TLSClientConfig
client := &http.Client{Transport: transport}
if o.Tracer != nil {
client.Transport = tracing.HTTPTripperware(logger, http.DefaultTransport)
client.Transport = tracing.HTTPTripperware(logger, client.Transport)
}

h := &Handler{
Expand Down Expand Up @@ -186,8 +191,9 @@ func (h *Handler) Run() error {
errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0)

httpSrv := &http.Server{
Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName),
ErrorLog: errlog,
Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName),
ErrorLog: errlog,
TLSConfig: h.options.TLSConfig,
}

return httpSrv.Serve(h.listener)
Expand Down