Skip to content

Commit

Permalink
compact: guard halt with debug flag
Browse files Browse the repository at this point in the history
  • Loading branch information
fabxc committed Feb 27, 2018
1 parent 209dfdb commit c84beb3
Showing 1 changed file with 32 additions and 28 deletions.
60 changes: 32 additions & 28 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
func registerCompact(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "continously compacts blocks in an object store bucket")

haltOnError := cmd.Flag("debug.halt-on-error", "halt the process if a critical compaction error is detected").
Hidden().Bool()

httpAddr := cmd.Flag("http-address", "listen host:port for HTTP endpoints").
Default(defaultHTTPAddr).String()

Expand All @@ -36,26 +39,35 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
gcsBucket := cmd.Flag("gcs.bucket", "Google Cloud Storage bucket name for stored blocks.").
PlaceHolder("<bucket>").Required().String()

s3Bucket := cmd.Flag("s3.bucket", "S3-Compatible API bucket name for stored blocks.").
PlaceHolder("<bucket>").Envar("S3_BUCKET").String()
var s3config s3.Config

cmd.Flag("s3.bucket", "S3-Compatible API bucket name for stored blocks.").
PlaceHolder("<bucket>").Envar("S3_BUCKET").StringVar(&s3config.Bucket)

s3Endpoint := cmd.Flag("s3.endpoint", "S3-Compatible API endpoint for stored blocks.").
PlaceHolder("<api-url>").Envar("S3_ENDPOINT").String()
cmd.Flag("s3.endpoint", "S3-Compatible API endpoint for stored blocks.").
PlaceHolder("<api-url>").Envar("S3_ENDPOINT").StringVar(&s3config.Endpoint)

s3AccessKey := cmd.Flag("s3.access-key", "Access key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_ACCESS_KEY").String()
cmd.Flag("s3.access-key", "Access key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_ACCESS_KEY").StringVar(&s3config.AccessKey)

s3SecretKey := cmd.Flag("s3.secret-key", "Secret key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_SECRET_KEY").String()
cmd.Flag("s3.secret-key", "Secret key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_SECRET_KEY").StringVar(&s3config.SecretKey)

s3Insecure := cmd.Flag("s3.insecure", "Whether to use an insecure connection with an S3-Compatible API.").
Default("false").Envar("S3_INSECURE").Bool()
cmd.Flag("s3.insecure", "Whether to use an insecure connection with an S3-Compatible API.").
Default("false").Envar("S3_INSECURE").BoolVar(&s3config.Insecure)

syncDelay := cmd.Flag("sync-delay", "minimum age of blocks before they are being processed.").
Default("2h").Duration()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer) error {
return runCompact(g, logger, reg, *httpAddr, *dataDir, *gcsBucket, *s3Bucket, *s3Endpoint, *s3AccessKey, *s3SecretKey, *s3Insecure, *syncDelay)
return runCompact(g, logger, reg,
*httpAddr,
*dataDir,
*gcsBucket,
&s3config,
*syncDelay,
*haltOnError,
)
}
}

Expand All @@ -66,12 +78,9 @@ func runCompact(
httpAddr string,
dataDir string,
gcsBucket string,
s3Bucket string,
s3Endpoint string,
s3AccessKey string,
s3SecretKey string,
s3Insecure bool,
s3Config *s3.Config,
syncDelay time.Duration,
haltOnError bool,
) error {
var (
bkt objstore.Bucket
Expand All @@ -83,17 +92,8 @@ func runCompact(
Help: "Set to 1 if the compactor halted due to an unexpected error",
})
halted.Set(0)

reg.MustRegister(halted)

s3Config := &s3.Config{
Bucket: s3Bucket,
Endpoint: s3Endpoint,
AccessKey: s3AccessKey,
SecretKey: s3SecretKey,
Insecure: s3Insecure,
}

if gcsBucket != "" {
gcsClient, err := storage.NewClient(context.Background())
if err != nil {
Expand Down Expand Up @@ -149,9 +149,13 @@ func runCompact(
// The HaltError type signals that we hit a critical bug and should block
// for investigation.
if compact.IsHaltError(err) {
level.Error(logger).Log("msg", "critical error detected; halting")
halted.Set(1)
select {}
if haltOnError {
level.Error(logger).Log("msg", "critical error detected; halting")
halted.Set(1)
select {}
} else {
return errors.Wrap(err, "critical error detected")
}
}
}
}
Expand Down

0 comments on commit c84beb3

Please sign in to comment.