Skip to content

Commit

Permalink
*: Add initial remote-write-receive component (#811)
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz authored and bwplotka committed Mar 12, 2019
1 parent 432785e commit b74717a
Show file tree
Hide file tree
Showing 19 changed files with 1,261 additions and 152 deletions.
26 changes: 19 additions & 7 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,37 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

func regCommonServerFlags(cmd *kingpin.CmdClause) (
func regGRPCFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) {

) {
grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
Default("0.0.0.0:10901").String()

grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String()

return grpcBindAddr,
grpcTLSSrvCert,
grpcTLSSrvKey,
grpcTLSSrvClientCA
}

func regCommonServerFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) {

httpBindAddr = regHTTPAddrFlag(cmd)
grpcBindAddr, grpcTLSSrvCert, grpcTLSSrvKey, grpcTLSSrvClientCA = regGRPCFlags(cmd)
grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

clusterBindAddr := cmd.Flag("cluster.address", "Listen ip:port address for gossip cluster.").
Default("0.0.0.0:10900").String()
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func main() {
registerCompact(cmds, app, "compact")
registerBucket(cmds, app, "bucket")
registerDownsample(cmds, app, "downsample")
registerReceive(cmds, app, "receive")

cmd, err := app.Parse(os.Args[1:])
if err != nil {
Expand Down
231 changes: 231 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package main

import (
"context"
"fmt"
"net"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/receive"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/tsdb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerReceive(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)")

grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)
httpMetricsBindAddr := regHTTPAddrFlag(cmd)

remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Default("0.0.0.0:19291").String()

dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runReceive(
g,
logger,
reg,
tracer,
*grpcBindAddr,
*cert,
*key,
*clientCA,
*httpMetricsBindAddr,
*remoteWriteAddress,
*dataDir,
)
}
}

func runReceive(
g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
cert string,
key string,
clientCA string,
httpMetricsBindAddr string,
remoteWriteAddress string,
dataDir string,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

tsdbCfg := &tsdb.Options{
Retention: model.Duration(time.Hour * 24 * 15),
NoLockfile: true,
MinBlockDuration: model.Duration(time.Hour * 2),
MaxBlockDuration: model.Duration(time.Hour * 2),
}

localStorage := &tsdb.ReadyStorage{}
receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Receiver: receiver,
ListenAddress: remoteWriteAddress,
Registry: reg,
ReadyStorage: localStorage,
})

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})

// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}

level.Debug(logger).Log("msg", "setting up endpoint readiness")
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
select {
case <-dbOpen:
break
case <-cancel:
reloadReady.Close()
return nil
}

reloadReady.Close()

webHandler.Ready()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}

level.Debug(logger).Log("msg", "setting up tsdb")
{
// TSDB.
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "starting TSDB ...")
db, err := tsdb.Open(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)
if err != nil {
return fmt.Errorf("opening storage failed: %s", err)
}
level.Info(logger).Log("msg", "tsdb started")

startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
close(dbOpen)
<-cancel
return nil
},
func(err error) {
if err := localStorage.Close(); err != nil {
level.Error(logger).Log("msg", "error stopping storage", "err", err)
}
close(cancel)
},
)
}

level.Debug(logger).Log("msg", "setting up metric http listen-group")
if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil {
return err
}

level.Debug(logger).Log("msg", "setting up grpc server")
{
var (
s *grpc.Server
l net.Listener
err error
)
g.Add(func() error {
select {
case <-dbOpen:
break
}

l, err = net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s = grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, tsdbStore)

level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
if s != nil {
s.Stop()
}
if l != nil {
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
}
})
}

level.Debug(logger).Log("msg", "setting up receive http handler")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
if err := webHandler.Run(ctx); err != nil {
return fmt.Errorf("error starting web server: %s", err)
}
return nil
},
func(err error) {
cancel()
},
)
}
level.Info(logger).Log("msg", "starting receiver")

return nil
}
12 changes: 6 additions & 6 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,14 @@ Flags:
If 0 no trace will be sent periodically, unless
forced by baggage item. See
`pkg/tracing/tracing.go` for details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
from other components if you use gossip,
'grpc-advertise-address' is empty and you
require cross-node connection.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to
disable TLS
--grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to
Expand All @@ -166,8 +164,10 @@ Flags:
TLS CA to verify clients against. If no client
CA is specified, there is no client
verification on server side. (tls.NoClientCert)
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--cluster.address="0.0.0.0:10900"
Listen ip:port address for gossip cluster.
--cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS
Expand Down
12 changes: 6 additions & 6 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,14 @@ Flags:
If 0 no trace will be sent periodically, unless
forced by baggage item. See
`pkg/tracing/tracing.go` for details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
from other components if you use gossip,
'grpc-advertise-address' is empty and you
require cross-node connection.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to
disable TLS
--grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to
Expand All @@ -73,8 +71,10 @@ Flags:
TLS CA to verify clients against. If no client
CA is specified, there is no client
verification on server side. (tls.NoClientCert)
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--cluster.address="0.0.0.0:10900"
Listen ip:port address for gossip cluster.
--cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS
Expand Down
12 changes: 6 additions & 6 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,14 @@ Flags:
If 0 no trace will be sent periodically, unless
forced by baggage item. See
`pkg/tracing/tracing.go` for details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
from other components if you use gossip,
'grpc-advertise-address' is empty and you
require cross-node connection.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to
disable TLS
--grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to
Expand All @@ -77,8 +75,10 @@ Flags:
TLS CA to verify clients against. If no client
CA is specified, there is no client
verification on server side. (tls.NoClientCert)
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--cluster.address="0.0.0.0:10900"
Listen ip:port address for gossip cluster.
--cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS
Expand Down

0 comments on commit b74717a

Please sign in to comment.