Skip to content

Commit

Permalink
compact/main: verify compacted and downsampled blocks (#194)
Browse files Browse the repository at this point in the history
* compact/main: verify compacted and downsampled blocks

This runs the verification procedure whenever we encounter a block that
is not valid. The compactor will block indefinitely and the downsample
run will terminate with an error.

* compact: guard halt with debug flag
  • Loading branch information
fabxc committed Mar 1, 2018
1 parent ed6099d commit b77877f
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 43 deletions.
89 changes: 48 additions & 41 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
func registerCompact(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "continously compacts blocks in an object store bucket")

haltOnError := cmd.Flag("debug.halt-on-error", "halt the process if a critical compaction error is detected").
Hidden().Bool()

httpAddr := cmd.Flag("http-address", "listen host:port for HTTP endpoints").
Default(defaultHTTPAddr).String()

Expand All @@ -36,26 +39,35 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
gcsBucket := cmd.Flag("gcs.bucket", "Google Cloud Storage bucket name for stored blocks.").
PlaceHolder("<bucket>").String()

s3Bucket := cmd.Flag("s3.bucket", "S3-Compatible API bucket name for stored blocks.").
PlaceHolder("<bucket>").Envar("S3_BUCKET").String()
var s3config s3.Config

cmd.Flag("s3.bucket", "S3-Compatible API bucket name for stored blocks.").
PlaceHolder("<bucket>").Envar("S3_BUCKET").StringVar(&s3config.Bucket)

s3Endpoint := cmd.Flag("s3.endpoint", "S3-Compatible API endpoint for stored blocks.").
PlaceHolder("<api-url>").Envar("S3_ENDPOINT").String()
cmd.Flag("s3.endpoint", "S3-Compatible API endpoint for stored blocks.").
PlaceHolder("<api-url>").Envar("S3_ENDPOINT").StringVar(&s3config.Endpoint)

s3AccessKey := cmd.Flag("s3.access-key", "Access key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_ACCESS_KEY").String()
cmd.Flag("s3.access-key", "Access key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_ACCESS_KEY").StringVar(&s3config.AccessKey)

s3SecretKey := cmd.Flag("s3.secret-key", "Secret key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_SECRET_KEY").String()
cmd.Flag("s3.secret-key", "Secret key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_SECRET_KEY").StringVar(&s3config.SecretKey)

s3Insecure := cmd.Flag("s3.insecure", "Whether to use an insecure connection with an S3-Compatible API.").
Default("false").Envar("S3_INSECURE").Bool()
cmd.Flag("s3.insecure", "Whether to use an insecure connection with an S3-Compatible API.").
Default("false").Envar("S3_INSECURE").BoolVar(&s3config.Insecure)

syncDelay := cmd.Flag("sync-delay", "minimum age of blocks before they are being processed.").
Default("2h").Duration()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer) error {
return runCompact(g, logger, reg, *httpAddr, *dataDir, *gcsBucket, *s3Bucket, *s3Endpoint, *s3AccessKey, *s3SecretKey, *s3Insecure, *syncDelay)
return runCompact(g, logger, reg,
*httpAddr,
*dataDir,
*gcsBucket,
&s3config,
*syncDelay,
*haltOnError,
)
}
}

Expand All @@ -66,25 +78,21 @@ func runCompact(
httpAddr string,
dataDir string,
gcsBucket string,
s3Bucket string,
s3Endpoint string,
s3AccessKey string,
s3SecretKey string,
s3Insecure bool,
s3Config *s3.Config,
syncDelay time.Duration,
haltOnError bool,
) error {
var (
bkt objstore.Bucket
bucket string
)

s3Config := &s3.Config{
Bucket: s3Bucket,
Endpoint: s3Endpoint,
AccessKey: s3AccessKey,
SecretKey: s3SecretKey,
Insecure: s3Insecure,
}
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Help: "Set to 1 if the compactor halted due to an unexpected error",
})
halted.Set(0)
reg.MustRegister(halted)

if gcsBucket != "" {
gcsClient, err := storage.NewClient(context.Background())
Expand Down Expand Up @@ -112,24 +120,6 @@ func runCompact(
return err
}
// Start cycle of syncing blocks from the bucket and garbage collecting the bucket.
{
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
if err := sy.SyncMetas(ctx); err != nil {
level.Error(logger).Log("msg", "sync failed", "err", err)
}
if err := sy.GarbageCollect(ctx); err != nil {
level.Error(logger).Log("msg", "garbage collection failed", "err", err)
}
return nil
})
}, func(error) {
cancel()
})
}
// Check grouped blocks and run compaction over them.
{
// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
Expand All @@ -147,9 +137,26 @@ func runCompact(

g.Add(func() error {
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
if err := sy.SyncMetas(ctx); err != nil {
level.Error(logger).Log("msg", "sync failed", "err", err)
}
if err := sy.GarbageCollect(ctx); err != nil {
level.Error(logger).Log("msg", "garbage collection failed", "err", err)
}
for _, g := range sy.Groups() {
if _, err := g.Compact(ctx, comp); err != nil {
level.Error(logger).Log("msg", "compaction failed", "err", err)
// The HaltError type signals that we hit a critical bug and should block
// for investigation.
if compact.IsHaltError(err) {
if haltOnError {
level.Error(logger).Log("msg", "critical error detected; halting")
halted.Set(1)
select {}
} else {
return errors.Wrap(err, "critical error detected")
}
}
}
}
return nil
Expand Down
13 changes: 11 additions & 2 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
}
level.Info(logger).Log("msg", "downloaded block", "id", m.ULID, "duration", time.Since(begin))

if err := block.VerifyIndex(filepath.Join(bdir, "index")); err != nil {
return errors.Wrap(err, "input block index not valid")
}

begin = time.Now()

var pool chunkenc.Pool
Expand All @@ -247,13 +251,18 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
if err != nil {
return errors.Wrapf(err, "downsample block %s to window %d", m.ULID, resolution)
}
resdir := filepath.Join(dir, id.String())

level.Info(logger).Log("msg", "downsampled block",
"from", m.ULID, "to", id, "duration", time.Since(begin))

if err := block.VerifyIndex(filepath.Join(resdir, "index")); err != nil {
return errors.Wrap(err, "output block index not valid")
}

begin = time.Now()

err = objstore.UploadDir(ctx, bkt, filepath.Join(dir, id.String()), id.String())
err = objstore.UploadDir(ctx, bkt, resdir, id.String())
if err != nil {
return errors.Wrapf(err, "upload downsampled block %s", id)
}
Expand All @@ -263,7 +272,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
begin = time.Now()

os.RemoveAll(bdir)
os.RemoveAll(filepath.Join(dir, id.String()))
os.RemoveAll(resdir)

return nil
}
46 changes: 46 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,28 @@ func (cg *Group) Compact(ctx context.Context, comp tsdb.Compactor) (ulid.ULID, e
cg.metrics.compactionDuration.Observe(time.Since(begin).Seconds())

return id, err
}

// HaltError is a type wrapper for errors that should halt any further progress on compactions.
type HaltError struct {
err error
}

func halt(err error) HaltError {
return HaltError{err: err}
}

func (e HaltError) Error() string {
return e.err.Error()
}

// IsHaltError returns true if the base error is a HaltError.
func IsHaltError(err error) bool {
_, ok1 := errors.Cause(err).(HaltError)
_, ok2 := errors.Cause(err).(*HaltError)
return ok1 || ok2
}

func (cg *Group) compact(ctx context.Context, comp tsdb.Compactor) (id ulid.ULID, err error) {
// Planning a compaction works purely based on the meta.json files in our group's dir.
cg.mtx.Lock()
Expand All @@ -547,6 +567,27 @@ func (cg *Group) compact(ctx context.Context, comp tsdb.Compactor) (id ulid.ULID
if len(plan) == 0 {
return id, nil
}
// Due to #183 we verify that none of the blocks in the plan have overlapping sources.
// This is one potential source of how we could end up with duplicated chunks.
uniqueSources := map[ulid.ULID]struct{}{}

for _, pdir := range plan {
meta, err := block.ReadMetaFile(pdir)
if err != nil {
return id, errors.Wrapf(err, "read meta from %s", pdir)
}
for _, s := range meta.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return id, halt(errors.Errorf("overlapping sources detected for plan %v", plan))
}
uniqueSources[s] = struct{}{}
}
// Ensure all input blocks are valid.
if err := block.VerifyIndex(filepath.Join(pdir, "index")); err != nil {
return id, errors.Wrapf(halt(err), "invalid plan block %s", pdir)
}
}

// Once we have a plan we need to download the actual data. We don't touch
// the main directory but use an intermediate one instead.
wdir := filepath.Join(cg.dir, "tmp")
Expand Down Expand Up @@ -594,6 +635,11 @@ func (cg *Group) compact(ctx context.Context, comp tsdb.Compactor) (id ulid.ULID
return id, errors.Wrap(err, "write new meta")
}

// Ensure the output block is valid.
if err := block.VerifyIndex(filepath.Join(bdir, "index")); err != nil {
return id, errors.Wrapf(halt(err), "invalid result block %s", bdir)
}

begin = time.Now()

if err := objstore.UploadDir(ctx, cg.bkt, bdir, id.String()); err != nil {
Expand Down

0 comments on commit b77877f

Please sign in to comment.