Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact/main: verify compacted and downsampled blocks #194

Merged
merged 2 commits into from
Mar 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 48 additions & 41 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
func registerCompact(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "continously compacts blocks in an object store bucket")

haltOnError := cmd.Flag("debug.halt-on-error", "halt the process if a critical compaction error is detected").
Hidden().Bool()

httpAddr := cmd.Flag("http-address", "listen host:port for HTTP endpoints").
Default(defaultHTTPAddr).String()

Expand All @@ -36,26 +39,35 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
gcsBucket := cmd.Flag("gcs.bucket", "Google Cloud Storage bucket name for stored blocks.").
PlaceHolder("<bucket>").Required().String()

s3Bucket := cmd.Flag("s3.bucket", "S3-Compatible API bucket name for stored blocks.").
PlaceHolder("<bucket>").Envar("S3_BUCKET").String()
var s3config s3.Config

cmd.Flag("s3.bucket", "S3-Compatible API bucket name for stored blocks.").
PlaceHolder("<bucket>").Envar("S3_BUCKET").StringVar(&s3config.Bucket)

s3Endpoint := cmd.Flag("s3.endpoint", "S3-Compatible API endpoint for stored blocks.").
PlaceHolder("<api-url>").Envar("S3_ENDPOINT").String()
cmd.Flag("s3.endpoint", "S3-Compatible API endpoint for stored blocks.").
PlaceHolder("<api-url>").Envar("S3_ENDPOINT").StringVar(&s3config.Endpoint)

s3AccessKey := cmd.Flag("s3.access-key", "Access key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_ACCESS_KEY").String()
cmd.Flag("s3.access-key", "Access key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_ACCESS_KEY").StringVar(&s3config.AccessKey)

s3SecretKey := cmd.Flag("s3.secret-key", "Secret key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_SECRET_KEY").String()
cmd.Flag("s3.secret-key", "Secret key for an S3-Compatible API.").
PlaceHolder("<key>").Envar("S3_SECRET_KEY").StringVar(&s3config.SecretKey)

s3Insecure := cmd.Flag("s3.insecure", "Whether to use an insecure connection with an S3-Compatible API.").
Default("false").Envar("S3_INSECURE").Bool()
cmd.Flag("s3.insecure", "Whether to use an insecure connection with an S3-Compatible API.").
Default("false").Envar("S3_INSECURE").BoolVar(&s3config.Insecure)

syncDelay := cmd.Flag("sync-delay", "minimum age of blocks before they are being processed.").
Default("2h").Duration()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer) error {
return runCompact(g, logger, reg, *httpAddr, *dataDir, *gcsBucket, *s3Bucket, *s3Endpoint, *s3AccessKey, *s3SecretKey, *s3Insecure, *syncDelay)
return runCompact(g, logger, reg,
*httpAddr,
*dataDir,
*gcsBucket,
&s3config,
*syncDelay,
*haltOnError,
)
}
}

Expand All @@ -66,25 +78,21 @@ func runCompact(
httpAddr string,
dataDir string,
gcsBucket string,
s3Bucket string,
s3Endpoint string,
s3AccessKey string,
s3SecretKey string,
s3Insecure bool,
s3Config *s3.Config,
syncDelay time.Duration,
haltOnError bool,
) error {
var (
bkt objstore.Bucket
bucket string
)

s3Config := &s3.Config{
Bucket: s3Bucket,
Endpoint: s3Endpoint,
AccessKey: s3AccessKey,
SecretKey: s3SecretKey,
Insecure: s3Insecure,
}
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Help: "Set to 1 if the compactor halted due to an unexpected error",
})
halted.Set(0)
reg.MustRegister(halted)

if gcsBucket != "" {
gcsClient, err := storage.NewClient(context.Background())
Expand Down Expand Up @@ -112,24 +120,6 @@ func runCompact(
return err
}
// Start cycle of syncing blocks from the bucket and garbage collecting the bucket.
{
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
if err := sy.SyncMetas(ctx); err != nil {
level.Error(logger).Log("msg", "sync failed", "err", err)
}
if err := sy.GarbageCollect(ctx); err != nil {
level.Error(logger).Log("msg", "garbage collection failed", "err", err)
}
return nil
})
}, func(error) {
cancel()
})
}
// Check grouped blocks and run compaction over them.
{
// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
Expand All @@ -147,9 +137,26 @@ func runCompact(

g.Add(func() error {
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
if err := sy.SyncMetas(ctx); err != nil {
level.Error(logger).Log("msg", "sync failed", "err", err)
}
if err := sy.GarbageCollect(ctx); err != nil {
level.Error(logger).Log("msg", "garbage collection failed", "err", err)
}
for _, g := range sy.Groups() {
if _, err := g.Compact(ctx, comp); err != nil {
level.Error(logger).Log("msg", "compaction failed", "err", err)
// The HaltError type signals that we hit a critical bug and should block
// for investigation.
if compact.IsHaltError(err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally see the reason here, but I feel like this need to be automated a little bit.. I can see cases when people will start it and don't have (or remember to have) alert for halted compactor.. And from orchestration point of view it will be seen as healthy..

Not sure what is the best way to handle it, maybe halting that as an option? (configured by flag) or maybe just return error and fail everything. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a debugging point of view this is the best course of action I think. Remember that it should[tm] not happen to begin with. But if it did and we just exit with an error, we'll get rescheduled and debugging gets harder.

A less severe alternative would be marking ourselves as unhealthy. But in that case k8s will also just kill us eventually and reschedule.

I get that it's a bit of a dirty approach. But we do log an error and we are setting a respective metric.
Once we are fully confident that the issue is resolved we can maybe just exit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I understand it, but let's hide it behind some debug flag then.

if haltOnError {
level.Error(logger).Log("msg", "critical error detected; halting")
halted.Set(1)
select {}
} else {
return errors.Wrap(err, "critical error detected")
}
}
}
}
return nil
Expand Down
13 changes: 11 additions & 2 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
}
level.Info(logger).Log("msg", "downloaded block", "id", m.ULID, "duration", time.Since(begin))

if err := block.VerifyIndex(filepath.Join(bdir, "index")); err != nil {
return errors.Wrap(err, "input block index not valid")
}

begin = time.Now()

var pool chunkenc.Pool
Expand All @@ -247,13 +251,18 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
if err != nil {
return errors.Wrapf(err, "downsample block %s to window %d", m.ULID, resolution)
}
resdir := filepath.Join(dir, id.String())

level.Info(logger).Log("msg", "downsampled block",
"from", m.ULID, "to", id, "duration", time.Since(begin))

if err := block.VerifyIndex(filepath.Join(resdir, "index")); err != nil {
return errors.Wrap(err, "output block index not valid")
}

begin = time.Now()

err = objstore.UploadDir(ctx, bkt, filepath.Join(dir, id.String()), id.String())
err = objstore.UploadDir(ctx, bkt, resdir, id.String())
if err != nil {
return errors.Wrapf(err, "upload downsampled block %s", id)
}
Expand All @@ -263,7 +272,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
begin = time.Now()

os.RemoveAll(bdir)
os.RemoveAll(filepath.Join(dir, id.String()))
os.RemoveAll(resdir)

return nil
}
46 changes: 46 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,28 @@ func (cg *Group) Compact(ctx context.Context, comp tsdb.Compactor) (ulid.ULID, e
cg.metrics.compactionDuration.Observe(time.Since(begin).Seconds())

return id, err
}

// HaltError is a type wrapper for errors that should halt any further progress on compactions.
type HaltError struct {
err error
}

func halt(err error) HaltError {
return HaltError{err: err}
}

func (e HaltError) Error() string {
return e.err.Error()
}

// IsHaltError returns true if the base error is a HaltError.
func IsHaltError(err error) bool {
_, ok1 := errors.Cause(err).(HaltError)
_, ok2 := errors.Cause(err).(*HaltError)
return ok1 || ok2
}

func (cg *Group) compact(ctx context.Context, comp tsdb.Compactor) (id ulid.ULID, err error) {
// Planning a compaction works purely based on the meta.json files in our group's dir.
cg.mtx.Lock()
Expand All @@ -547,6 +567,27 @@ func (cg *Group) compact(ctx context.Context, comp tsdb.Compactor) (id ulid.ULID
if len(plan) == 0 {
return id, nil
}
// Due to #183 we verify that none of the blocks in the plan have overlapping sources.
// This is one potential source of how we could end up with duplicated chunks.
uniqueSources := map[ulid.ULID]struct{}{}

for _, pdir := range plan {
meta, err := block.ReadMetaFile(pdir)
if err != nil {
return id, errors.Wrapf(err, "read meta from %s", pdir)
}
for _, s := range meta.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return id, halt(errors.Errorf("overlapping sources detected for plan %v", plan))
}
uniqueSources[s] = struct{}{}
}
// Ensure all input blocks are valid.
if err := block.VerifyIndex(filepath.Join(pdir, "index")); err != nil {
return id, errors.Wrapf(halt(err), "invalid plan block %s", pdir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section seems to claim this error: #246
Let's revisit this code to investigate. Looks like index is not broken, just index file is no longer there.
@fabxc

Will have time for it tomorrow.

}
}

// Once we have a plan we need to download the actual data. We don't touch
// the main directory but use an intermediate one instead.
wdir := filepath.Join(cg.dir, "tmp")
Expand Down Expand Up @@ -594,6 +635,11 @@ func (cg *Group) compact(ctx context.Context, comp tsdb.Compactor) (id ulid.ULID
return id, errors.Wrap(err, "write new meta")
}

// Ensure the output block is valid.
if err := block.VerifyIndex(filepath.Join(bdir, "index")); err != nil {
return id, errors.Wrapf(halt(err), "invalid result block %s", bdir)
}

begin = time.Now()

if err := objstore.UploadDir(ctx, cg.bkt, bdir, id.String()); err != nil {
Expand Down