Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Add initial remote-write-receive component #811

Merged
merged 1 commit into from
Mar 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 19 additions & 7 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,37 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

func regCommonServerFlags(cmd *kingpin.CmdClause) (
func regGRPCFlags(cmd *kingpin.CmdClause) (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hahah I was starring at monitor 1m and thinking why you removed Gossip for everyone in this PR... then seen that actually you added this function.... Github troll =D

grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) {

) {
grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
Default("0.0.0.0:10901").String()

grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String()

return grpcBindAddr,
grpcTLSSrvCert,
grpcTLSSrvKey,
grpcTLSSrvClientCA
}

func regCommonServerFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) {

httpBindAddr = regHTTPAddrFlag(cmd)
grpcBindAddr, grpcTLSSrvCert, grpcTLSSrvKey, grpcTLSSrvClientCA = regGRPCFlags(cmd)
grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

clusterBindAddr := cmd.Flag("cluster.address", "Listen ip:port address for gossip cluster.").
Default("0.0.0.0:10900").String()
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func main() {
registerCompact(cmds, app, "compact")
registerBucket(cmds, app, "bucket")
registerDownsample(cmds, app, "downsample")
registerReceive(cmds, app, "receive")

cmd, err := app.Parse(os.Args[1:])
if err != nil {
Expand Down
231 changes: 231 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package main

import (
"context"
"fmt"
"net"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/receive"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/tsdb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerReceive(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)")

grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd)
httpMetricsBindAddr := regHTTPAddrFlag(cmd)

remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Default("0.0.0.0:19291").String()

dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runReceive(
g,
logger,
reg,
tracer,
*grpcBindAddr,
*cert,
*key,
*clientCA,
*httpMetricsBindAddr,
*remoteWriteAddress,
*dataDir,
)
}
}

func runReceive(
g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
cert string,
key string,
clientCA string,
httpMetricsBindAddr string,
remoteWriteAddress string,
dataDir string,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

tsdbCfg := &tsdb.Options{
Retention: model.Duration(time.Hour * 24 * 15),
NoLockfile: true,
MinBlockDuration: model.Duration(time.Hour * 2),
MaxBlockDuration: model.Duration(time.Hour * 2),
}

localStorage := &tsdb.ReadyStorage{}
receiver := receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Receiver: receiver,
ListenAddress: remoteWriteAddress,
Registry: reg,
ReadyStorage: localStorage,
})

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})

// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}

level.Debug(logger).Log("msg", "setting up endpoint readiness")
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
select {
case <-dbOpen:
break
case <-cancel:
reloadReady.Close()
return nil
}

reloadReady.Close()

webHandler.Ready()
level.Info(logger).Log("msg", "server is ready to receive web requests.")
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}

level.Debug(logger).Log("msg", "setting up tsdb")
{
// TSDB.
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "starting TSDB ...")
db, err := tsdb.Open(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)
if err != nil {
return fmt.Errorf("opening storage failed: %s", err)
}
level.Info(logger).Log("msg", "tsdb started")

startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
close(dbOpen)
<-cancel
return nil
},
func(err error) {
if err := localStorage.Close(); err != nil {
level.Error(logger).Log("msg", "error stopping storage", "err", err)
}
close(cancel)
},
)
}

level.Debug(logger).Log("msg", "setting up metric http listen-group")
if err := metricHTTPListenGroup(g, logger, reg, httpMetricsBindAddr); err != nil {
return err
}

level.Debug(logger).Log("msg", "setting up grpc server")
{
var (
s *grpc.Server
l net.Listener
err error
)
g.Add(func() error {
select {
case <-dbOpen:
break
}

l, err = net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, nil)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s = grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, tsdbStore)

level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
if s != nil {
s.Stop()
}
if l != nil {
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
}
})
}

level.Debug(logger).Log("msg", "setting up receive http handler")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
if err := webHandler.Run(ctx); err != nil {
return fmt.Errorf("error starting web server: %s", err)
}
return nil
},
func(err error) {
cancel()
},
)
}
level.Info(logger).Log("msg", "starting receiver")

return nil
}
12 changes: 6 additions & 6 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,14 @@ Flags:
If 0 no trace will be sent periodically, unless
forced by baggage item. See
`pkg/tracing/tracing.go` for details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
from other components if you use gossip,
'grpc-advertise-address' is empty and you
require cross-node connection.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to
disable TLS
--grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to
Expand All @@ -166,8 +164,10 @@ Flags:
TLS CA to verify clients against. If no client
CA is specified, there is no client
verification on server side. (tls.NoClientCert)
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--cluster.address="0.0.0.0:10900"
Listen ip:port address for gossip cluster.
--cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS
Expand Down
12 changes: 6 additions & 6 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,14 @@ Flags:
If 0 no trace will be sent periodically, unless
forced by baggage item. See
`pkg/tracing/tracing.go` for details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
from other components if you use gossip,
'grpc-advertise-address' is empty and you
require cross-node connection.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to
disable TLS
--grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to
Expand All @@ -73,8 +71,10 @@ Flags:
TLS CA to verify clients against. If no client
CA is specified, there is no client
verification on server side. (tls.NoClientCert)
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--cluster.address="0.0.0.0:10900"
Listen ip:port address for gossip cluster.
--cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS
Expand Down
12 changes: 6 additions & 6 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,14 @@ Flags:
If 0 no trace will be sent periodically, unless
forced by baggage item. See
`pkg/tracing/tracing.go` for details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
from other components if you use gossip,
'grpc-advertise-address' is empty and you
require cross-node connection.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to
disable TLS
--grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to
Expand All @@ -77,8 +75,10 @@ Flags:
TLS CA to verify clients against. If no client
CA is specified, there is no client
verification on server side. (tls.NoClientCert)
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--grpc-advertise-address=GRPC-ADVERTISE-ADDRESS
Explicit (external) host:port address to
advertise for gRPC StoreAPI in gossip cluster.
If empty, 'grpc-address' will be used.
--cluster.address="0.0.0.0:10900"
Listen ip:port address for gossip cluster.
--cluster.advertise-address=CLUSTER.ADVERTISE-ADDRESS
Expand Down