Skip to content

Commit

Permalink
cmd/thanos/receive: remote-write client+server TLS
Browse files Browse the repository at this point in the history
This commit gives the Thanos receive component the capability to use TLS
in both the remote-write client and server. This means that Thanos
receive can now authenticate all requests.

In order to accomplish this change, this commit abstracts the majority
of the logic of `defaultGRPCServerOpts` into a reusable func for gRPC
and HTTP servers and creates a similar func for TLS client
configurations.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
  • Loading branch information
squat committed Oct 18, 2019
1 parent 48a8fb6 commit 9d11e4c
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 73 deletions.
67 changes: 61 additions & 6 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,30 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{}))
}

func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
func defaultGRPCTLSServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
opts := []grpc.ServerOption{}
tlsCfg, err := defaultTLSServerOpts(log.With(logger, "protocol", "gRPC"), cert, key, clientCA)
if err != nil {
return opts, err
}
if tlsCfg != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg)))
}
return opts, nil
}

func defaultTLSServerOpts(logger log.Logger, cert, key, clientCA string) (*tls.Config, error) {
if key == "" && cert == "" {
if clientCA != "" {
return nil, errors.New("when a client CA is used a server key and certificate must also be provided")
}

level.Info(logger).Log("msg", "disabled TLS, key and cert must be set to enable")
return opts, nil
return nil, nil
}

level.Info(logger).Log("msg", "enabling server side TLS")

if key == "" || cert == "" {
return nil, errors.New("both server key and certificate must be provided")
}
Expand All @@ -271,8 +283,6 @@ func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grp
return nil, errors.Wrap(err, "server credentials")
}

level.Info(logger).Log("msg", "enabled gRPC server side TLS")

tlsCfg.Certificates = []tls.Certificate{tlsCert}

if clientCA != "" {
Expand All @@ -288,10 +298,55 @@ func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grp
tlsCfg.ClientCAs = certPool
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert

level.Info(logger).Log("msg", "gRPC server TLS client verification enabled")
level.Info(logger).Log("msg", "server TLS client verification enabled")
}

return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
return tlsCfg, nil
}

func defaultTLSClientOpts(logger log.Logger, cert, key, caCert, serverName string) (*tls.Config, error) {
var certPool *x509.CertPool
if caCert != "" {
caPEM, err := ioutil.ReadFile(caCert)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool = x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
level.Info(logger).Log("msg", "TLS client using provided certificate pool")
} else {
var err error
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, errors.Wrap(err, "reading system certificate pool")
}
level.Info(logger).Log("msg", "TLS client using system certificate pool")
}

tlsCfg := &tls.Config{
RootCAs: certPool,
}

if serverName != "" {
tlsCfg.ServerName = serverName
}

if (key != "") != (cert != "") {
return nil, errors.New("both client key and certificate must be provided")
}

if cert != "" {
cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "client credentials")
}
tlsCfg.Certificates = []tls.Certificate{cert}
level.Info(logger).Log("msg", "TLS client authentication enabled")
}
return tlsCfg, nil
}

func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
Expand Down
54 changes: 7 additions & 47 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
Expand Down Expand Up @@ -164,7 +161,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
}
}

func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert string, serverName string) ([]grpc.DialOption, error) {
func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
Expand Down Expand Up @@ -199,50 +196,13 @@ func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer ope
return append(dialOpts, grpc.WithInsecure()), nil
}

level.Info(logger).Log("msg", "Enabling client to server TLS")
level.Info(logger).Log("msg", "enabling client to server TLS")

var certPool *x509.CertPool

if caCert != "" {
caPEM, err := ioutil.ReadFile(caCert)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool = x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
level.Info(logger).Log("msg", "TLS Client using provided certificate pool")
} else {
var err error
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, errors.Wrap(err, "reading system certificate pool")
}
level.Info(logger).Log("msg", "TLS Client using system certificate pool")
}

tlsCfg := &tls.Config{
RootCAs: certPool,
}

if serverName != "" {
tlsCfg.ServerName = serverName
}

if cert != "" {
cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "client credentials")
}
tlsCfg.Certificates = []tls.Certificate{cert}
level.Info(logger).Log("msg", "TLS Client authentication enabled")
tlsCfg, err := defaultTLSClientOpts(logger, cert, key, caCert, serverName)
if err != nil {
return nil, err
}

creds := credentials.NewTLS(tlsCfg)

return append(dialOpts, grpc.WithTransportCredentials(creds)), nil
return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil
}

// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
Expand Down Expand Up @@ -428,7 +388,7 @@ func runQuery(
}
logger := log.With(logger, "component", component.Query.String())

opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA)
opts, err := defaultGRPCTLSServerOpts(logger, srvCert, srvKey, srvClientCA)
if err != nil {
return errors.Wrap(err, "build gRPC server")
}
Expand Down
57 changes: 44 additions & 13 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Receive
cmd := app.Command(comp.String(), "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)")

grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)
httpBindAddr := regHTTPAddrFlag(cmd)
grpcBindAddr, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd)

remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
rwAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Default("0.0.0.0:19291").String()
rwServerCert := cmd.Flag("remote-write.server-tls-cert", "TLS Certificate for HTTP server, leave blank to disable TLS").Default("").String()
rwServerKey := cmd.Flag("remote-write.server-tls-key", "TLS Key for the HTTP server, leave blank to disable TLS").Default("").String()
rwServerClientCA := cmd.Flag("remote-write.server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String()
rwClientCert := cmd.Flag("remote-write.client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String()
rwClientKey := cmd.Flag("remote-write.client-tls-key", "TLS Key for the client's certificate").Default("").String()
rwClientServerCA := cmd.Flag("remote-write.client-tls-ca", "TLS CA Certificates to use to verify servers").Default("").String()
rwClientServerName := cmd.Flag("remote-write.client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String()

dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()
Expand Down Expand Up @@ -87,7 +94,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
if hostname == "" || err != nil {
return errors.New("--receive.local-endpoint is empty and host could not be determined.")
}
parts := strings.Split(*remoteWriteAddress, ":")
parts := strings.Split(*rwAddress, ":")
port := parts[len(parts)-1]
*local = fmt.Sprintf("http://%s:%s/api/v1/receive", hostname, port)
}
Expand All @@ -98,11 +105,18 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
reg,
tracer,
*grpcBindAddr,
*cert,
*key,
*clientCA,
*grpcCert,
*grpcKey,
*grpcClientCA,
*httpBindAddr,
*remoteWriteAddress,
*rwAddress,
*rwServerCert,
*rwServerKey,
*rwServerClientCA,
*rwClientCert,
*rwClientKey,
*rwClientServerCA,
*rwClientServerName,
*dataDir,
objStoreConfig,
lset,
Expand All @@ -124,11 +138,18 @@ func runReceive(
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
cert string,
key string,
clientCA string,
grpcCert string,
grpcKey string,
grpcClientCA string,
httpBindAddr string,
remoteWriteAddress string,
rwAddress string,
rwServerCert string,
rwServerKey string,
rwServerClientCA string,
rwClientCert string,
rwClientKey string,
rwClientServerCA string,
rwClientServerName string,
dataDir string,
objStoreConfig *extflag.PathOrContent,
lset labels.Labels,
Expand All @@ -153,14 +174,24 @@ func runReceive(
}

localStorage := &tsdb.ReadyStorage{}
rwTLSConfig, err := defaultTLSServerOpts(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA)
if err != nil {
return err
}
rwTLSClientConfig, err := defaultTLSClientOpts(logger, rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName)
if err != nil {
return err
}
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
ListenAddress: remoteWriteAddress,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
Tracer: tracer,
TLSConfig: rwTLSConfig,
TLSClientConfig: rwTLSClientConfig,
})

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down Expand Up @@ -317,7 +348,7 @@ func runReceive(
startGRPC := make(chan struct{})
g.Add(func() error {
defer close(startGRPC)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
opts, err := defaultGRPCTLSServerOpts(logger, grpcCert, grpcKey, grpcClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func runRule(

store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)

opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func runSidecar(
return errors.Wrap(err, "create Prometheus store")
}

opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func runStore(
return errors.Wrap(err, "listen API address")
}

opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package receive
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io/ioutil"
stdlog "log"
Expand Down Expand Up @@ -49,6 +50,8 @@ type Options struct {
ReplicaHeader string
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
TLSClientConfig *tls.Config
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand All @@ -72,9 +75,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
logger = log.NewNopLogger()
}

client := &http.Client{}
transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = o.TLSClientConfig
client := &http.Client{Transport: transport}
if o.Tracer != nil {
client.Transport = tracing.HTTPTripperware(logger, http.DefaultTransport)
client.Transport = tracing.HTTPTripperware(logger, client.Transport)
}

h := &Handler{
Expand Down Expand Up @@ -186,8 +191,9 @@ func (h *Handler) Run() error {
errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0)

httpSrv := &http.Server{
Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName),
ErrorLog: errlog,
Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName),
ErrorLog: errlog,
TLSConfig: h.options.TLSConfig,
}

return httpSrv.Serve(h.listener)
Expand Down

0 comments on commit 9d11e4c

Please sign in to comment.