Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stateless mode for Thanos Ruler #4250

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/strutil"
"github.com/thanos-io/thanos/pkg/errutil"
Expand All @@ -50,6 +52,7 @@ import (
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/query"
thanosrules "github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/rules/remotewrite"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand All @@ -75,6 +78,8 @@ type ruleConfig struct {
alertQueryURL *url.URL
alertRelabelConfigYAML []byte

rwConfig *extflag.PathOrContent

resendDelay time.Duration
evalInterval time.Duration
ruleFiles []string
Expand Down Expand Up @@ -117,6 +122,8 @@ func registerRule(app *extkingpin.App) {
cmd.Flag("eval-interval", "The default evaluation interval to use.").
Default("30s").DurationVar(&conf.evalInterval)

conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution())

reqLogDecision := cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall: Logs the finish call of the requests. LogStartAndFinishCall: Logs the start and finish call of the requests. NoLogCall: Disable request logging.").Default("").Enum("NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "")

conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
Expand Down Expand Up @@ -319,25 +326,52 @@ func runRule(
// Discover and resolve query addresses.
addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval)
}
var (
appendable storage.Appendable
queryable storage.Queryable
db *tsdb.DB
)

db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil)
rwCfgYAML, err := conf.rwConfig.Content()
if err != nil {
return errors.Wrap(err, "open TSDB")
return err
}

level.Debug(logger).Log("msg", "removing storage lock file if any")
if err := removeLockfileIfAny(logger, conf.dataDir); err != nil {
return errors.Wrap(err, "remove storage lock files")
}
if len(rwCfgYAML) > 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squat @bwplotka

In this case though, an empty --remote-write.config (or --remote-write.config-file)will fall back to using the TSDB-backed ruler. This is because len(rwCfgYAML) == 0 either means:

  • The flag was provided but it's an empty file or empty string OR
  • The flag was not provided at all.

I couldn't think of a reliable way to distinguish between them. I've also documented this behavior in the flag help, but I don't know if it is something we want. An alternative would be to provide a separate flag for enabling stateless mode (e.g --remote.write or --remote-write.enabled).

var rwCfg config.RemoteWriteConfig
rwCfg, err = remotewrite.LoadRemoteWriteConfig(rwCfgYAML)
if err != nil {
return err
}
walDir := filepath.Join(conf.dataDir, rwCfg.Name)
remoteStore, err := remotewrite.NewFanoutStorage(logger, reg, walDir, &rwCfg)
if err != nil {
return errors.Wrap(err, "set up remote-write store for ruler")
}
appendable = remoteStore
queryable = remoteStore
} else {
db, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil)
if err != nil {
return errors.Wrap(err, "open TSDB")
}

{
done := make(chan struct{})
g.Add(func() error {
<-done
return db.Close()
}, func(error) {
close(done)
})
level.Debug(logger).Log("msg", "removing storage lock file if any")
if err := removeLockfileIfAny(logger, conf.dataDir); err != nil {
return errors.Wrap(err, "remove storage lock files")
}

{
done := make(chan struct{})
g.Add(func() error {
<-done
return db.Close()
}, func(error) {
close(done)
})
}
appendable = db
queryable = db
}

// Build the Alertmanager clients.
Expand Down Expand Up @@ -435,9 +469,9 @@ func runRule(
rules.ManagerOptions{
NotifyFunc: notifyFunc,
Logger: logger,
Appendable: db,
Appendable: appendable,
ExternalURL: nil,
Queryable: db,
Queryable: queryable,
ResendDelay: conf.resendDelay,
},
queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod),
Expand Down Expand Up @@ -522,7 +556,7 @@ func runRule(
)

// Start gRPC server.
{
if db != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I missed it last time. Even if it is stateless mode, we still need to start the gRPC server because we need the prober and the gRPC rules feature, we can just remove the StoreServer in this case.

Example code:

	// Start gRPC server.
	tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)
	if err != nil {
		return errors.Wrap(err, "setup gRPC server")
	}

	options := []grpcserver.Option{
		grpcserver.WithServer(thanosrules.RegisterRulesServer(ruleMgr)),
		grpcserver.WithListen(conf.grpc.bindAddress),
		grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
		grpcserver.WithTLSConfig(tlsCfg),
	}
	if db != nil {
		tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset)
		options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore)))
	}
	// TODO: Add rules API implementation when ready.
	s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, options...)

	g.Add(func() error {
		statusProber.Ready()
		return s.ListenAndServe()
	}, func(err error) {
		statusProber.NotReady(err)
		s.Shutdown(err)
	})

tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset)

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)
Expand All @@ -547,6 +581,7 @@ func runRule(
s.Shutdown(err)
})
}

// Start UI & metrics HTTP server.
{
router := route.New()
Expand Down
20 changes: 20 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,26 @@ Flags:
(repeatable).
--query.sd-interval=5m Refresh interval to re-read file SD files.
(used as a fallback)
--remote-write.config=<content>
Alternative to 'remote-write.config-file' flag
(mutually exclusive). Content of YAML config
for the remote-write server where samples
should be sent to (see
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
This automatically enables stateless mode for
ruler and no series will be stored in the
ruler's TSDB. If an empty config (or file) is
provided, the flag is ignored and ruler is run
with its own TSDB.
--remote-write.config-file=<file-path>
Path to YAML config for the remote-write server
where samples should be sent to (see
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
This automatically enables stateless mode for
ruler and no series will be stored in the
ruler's TSDB. If an empty config (or file) is
provided, the flag is ignored and ruler is run
with its own TSDB.
--request.logging-config=<content>
Alternative to 'request.logging-config-file'
flag (mutually exclusive). Content of YAML file
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ require (
github.com/prometheus/common v0.29.0
github.com/prometheus/exporter-toolkit v0.6.0
github.com/prometheus/prometheus v1.8.2-0.20210720123808-b1ed4a0a663d
github.com/stretchr/testify v1.7.0
github.com/tencentyun/cos-go-sdk-v5 v0.7.31
github.com/uber/jaeger-client-go v2.29.1+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
Expand Down
43 changes: 43 additions & 0 deletions pkg/rules/remotewrite/remotewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package remotewrite

import (
"time"

"github.com/pkg/errors"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"gopkg.in/yaml.v2"
)

// LoadRemoteWriteConfig prepares a RemoteWriteConfig instance from a given YAML config.
func LoadRemoteWriteConfig(configYAML []byte) (config.RemoteWriteConfig, error) {
var cfg config.RemoteWriteConfig
if err := yaml.Unmarshal(configYAML, &cfg); err != nil {
return cfg, err
}
return cfg, nil
}

// NewFanoutStorage creates a storage that fans-out to both the WAL and a configured remote storage.
// The remote storage tails the WAL and sends the metrics it reads using Prometheus' remote_write.
func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir string, rwConfig *config.RemoteWriteConfig) (storage.Storage, error) {
walStore, err := NewStorage(logger, reg, walDir)
if err != nil {
return nil, err
}
remoteStore := remote.NewStorage(logger, reg, walStore.StartTime, walStore.Directory(), 1*time.Minute, nil)
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
if err := remoteStore.ApplyConfig(&config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{rwConfig},
}); err != nil {
return nil, errors.Wrap(err, "applying config to remote storage")
}
return storage.NewFanout(logger, walStore, remoteStore), nil
}
Loading