Skip to content

Commit

Permalink
receive: use grpc to forward remote write requests
Browse files Browse the repository at this point in the history
Signed-off-by: Brett Jones <blockloop@users.noreply.github.com>
  • Loading branch information
blockloop committed Jan 8, 2020
1 parent 43331ac commit e1153af
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1969](https://github.com/thanos-io/thanos/pull/1969) Sidecar: allow setting http connection pool size via flags
- [#1967](https://github.com/thanos-io/thanos/issues/1967) Receive: Allow local TSDB compaction
- [#1970](https://github.com/thanos-io/thanos/issues/1970) Receive: Use gRPC for forwarding requests between peers

### Fixed

Expand Down
53 changes: 4 additions & 49 deletions cmd/thanos/query.go
Expand Up @@ -10,8 +10,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -21,9 +19,12 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/prober"
Expand All @@ -34,11 +35,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -165,48 +162,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
}
}

func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
)
dialOpts := []grpc.DialOption{
// We want to make sure that we can receive huge gRPC messages from storeAPI.
// On TCP level we can be fine, but the gRPC overhead for huge messages could be significant.
// Current limit is ~2GB.
// TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcMets.UnaryClientInterceptor(),
tracing.UnaryClientInterceptor(tracer),
),
),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
),
),
}

if reg != nil {
reg.MustRegister(grpcMets)
}

if !secure {
return append(dialOpts, grpc.WithInsecure()), nil
}

level.Info(logger).Log("msg", "enabling client to server TLS")

tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName)
if err != nil {
return nil, err
}
return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil
}

// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
// store nodes, merging and duplicating the data to satisfy user query.
func runQuery(
Expand Down Expand Up @@ -251,7 +206,7 @@ func runQuery(
})
reg.MustRegister(duplicatedStores)

dialOpts, err := storeClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName)
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName)
if err != nil {
return errors.Wrap(err, "building gRPC client")
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/thanos/receive.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/receive"
Expand Down Expand Up @@ -190,6 +191,11 @@ func runReceive(
if err != nil {
return err
}
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, rwServerCert != "", rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName)
if err != nil {
return err
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
ListenAddress: rwAddress,
Registry: reg,
Expand All @@ -200,6 +206,7 @@ func runReceive(
Tracer: tracer,
TLSConfig: rwTLSConfig,
TLSClientConfig: rwTLSClientConfig,
DialOpts: dialOpts,
})

statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down
59 changes: 59 additions & 0 deletions pkg/extgrpc/client.go
@@ -0,0 +1,59 @@
package extgrpc

import (
"math"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// StoreClientGRPCOpts creates grpc dial options for connectiong to a store client.
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
)
dialOpts := []grpc.DialOption{
// We want to make sure that we can receive huge gRPC messages from storeAPI.
// On TCP level we can be fine, but the gRPC overhead for huge messages could be significant.
// Current limit is ~2GB.
// TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcMets.UnaryClientInterceptor(),
tracing.UnaryClientInterceptor(tracer),
),
),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
),
),
}

if reg != nil {
reg.MustRegister(grpcMets)
}

if !secure {
return append(dialOpts, grpc.WithInsecure()), nil
}

level.Info(logger).Log("msg", "enabling client to server TLS")

tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName)
if err != nil {
return nil, err
}
return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil
}
87 changes: 52 additions & 35 deletions pkg/receive/handler.go
@@ -1,7 +1,6 @@
package receive

import (
"bytes"
"context"
"crypto/tls"
"fmt"
Expand All @@ -24,11 +23,13 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -47,18 +48,18 @@ type Options struct {
Writer *Writer
ListenAddress string
Registry prometheus.Registerer
Endpoint string
TenantHeader string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
TLSClientConfig *tls.Config
DialOpts []grpc.DialOption
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
type Handler struct {
client *http.Client
logger log.Logger
writer *Writer
router *route.Router
Expand All @@ -67,6 +68,7 @@ type Handler struct {

mtx sync.RWMutex
hashring Hashring
peers *peerGroup

// Metrics.
forwardRequestsTotal *prometheus.CounterVec
Expand All @@ -77,19 +79,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
logger = log.NewNopLogger()
}

transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = o.TLSClientConfig
client := &http.Client{Transport: transport}
if o.Tracer != nil {
client.Transport = tracing.HTTPTripperware(logger, client.Transport)
}

h := &Handler{
client: client,
logger: logger,
writer: o.Writer,
router: route.New(),
options: o,
peers: newPeerGroup(o.DialOpts...),
forwardRequestsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_forward_requests_total",
Expand Down Expand Up @@ -355,20 +350,7 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
}
// Make a request to the specified endpoint.
go func(endpoint string) {
buf, err := proto.Marshal(wreqs[endpoint])
if err != nil {
level.Error(h.logger).Log("msg", "marshaling proto", "err", err, "endpoint", endpoint)
ec <- err
return
}
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(snappy.Encode(nil, buf)))
if err != nil {
level.Error(h.logger).Log("msg", "creating request", "err", err, "endpoint", endpoint)
ec <- err
return
}
req.Header.Add(h.options.TenantHeader, tenant)
req.Header.Add(h.options.ReplicaHeader, strconv.FormatUint(replicas[endpoint].n, 10))
var err error

// Increment the counters as necessary now that
// the requests will go out.
Expand All @@ -380,25 +362,27 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
h.forwardRequestsTotal.WithLabelValues("success").Inc()
}()

cl, err := h.peers.Get(ctx, endpoint)
if err != nil {
level.Error(h.logger).Log("msg", "failed to get peer connection to forward request", "err", err, "endpoint", endpoint)
ec <- err
return
}

// Create a span to track the request made to another receive node.
span, ctx := tracing.StartSpan(ctx, "thanos_receive_forward")
defer span.Finish()

// Actually make the request against the endpoint
// we determined should handle these time series.
var res *http.Response
res, err = h.client.Do(req.WithContext(ctx))
_, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{
Timeseries: wreqs[endpoint].Timeseries,
Tenant: tenant,
Replica: int64(replicas[endpoint].n),
})
if err != nil {
level.Error(h.logger).Log("msg", "forwarding request", "err", err, "endpoint", endpoint)
ec <- err
return
}
if res.StatusCode != http.StatusOK {
err = errors.New(strconv.Itoa(res.StatusCode))
level.Error(h.logger).Log("msg", "forwarding returned non-200 status", "err", err, "endpoint", endpoint)
ec <- err
return
}
ec <- nil
}(endpoint)
}
Expand Down Expand Up @@ -489,3 +473,36 @@ func isConflict(err error) bool {
err.Error() == strconv.Itoa(http.StatusConflict) ||
status.Code(err) == codes.FailedPrecondition
}

func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup {
return &peerGroup{
dialOpts: dialOpts,
cache: map[string]storepb.WriteableStoreClient{},
m: sync.Mutex{},
}
}

type peerGroup struct {
dialOpts []grpc.DialOption
cache map[string]storepb.WriteableStoreClient
m sync.Mutex
}

func (p *peerGroup) Get(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) {
p.m.Lock()
defer p.m.Unlock()

c, ok := p.cache[addr]
if ok {
return c, nil
}

conn, err := grpc.DialContext(ctx, addr, p.dialOpts...)
if err != nil {
return nil, errors.Wrap(err, "failed to dial peer")
}

client := storepb.NewWriteableStoreClient(conn)
p.cache[addr] = client
return client, nil
}

0 comments on commit e1153af

Please sign in to comment.