Skip to content

Commit

Permalink
Compact: update "accept malformed index" flag to apply to downsampling (
Browse files Browse the repository at this point in the history
#5690)

* Compact: update "accept malformed index" flag to apply to downsampling

Signed-off-by: Marshall Lang <mt.lang72@gmail.com>

* Compact: update "Add entry to changelog

Signed-off-by: Marshall Lang <mt.lang72@gmail.com>

Signed-off-by: Marshall Lang <mt.lang72@gmail.com>
  • Loading branch information
mtlang committed Sep 15, 2022
1 parent 0d8562b commit 9237dd9
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Changed

- [#5255](https://github.com/thanos-io/thanos/pull/5296) Query: Use k-way merging for the proxying logic. The proxying sub-system now uses much less resources (~25-80% less CPU usage, ~30-50% less RAM usage according to our benchmarks). Reduces query duration by a few percent on queries with lots of series.
- [#5690](https://github.com/thanos-io/thanos/pull/5690) Compact: update `--debug.accept-malformed-index` flag to apply to downsampling. Previously the flag only applied to compaction, and fatal errors would still occur when downsampling was attempted.

### Removed

Expand Down
6 changes: 3 additions & 3 deletions cmd/thanos/compact.go
Expand Up @@ -438,15 +438,15 @@ func runCompact(
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc)); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc)); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
Expand Down Expand Up @@ -657,7 +657,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").BoolVar(&cc.haltOnError)
cmd.Flag("debug.accept-malformed-index",
"Compaction index verification will ignore out of order label names.").
"Compaction and downsampling index verification will ignore out of order label names.").
Hidden().Default("false").BoolVar(&cc.acceptMalformedIndex)
cmd.Flag("debug.max-compaction-level", fmt.Sprintf("Maximum compaction level, default is %d: %s", compactions.maxLevel(), compactions.String())).
Hidden().Default(strconv.Itoa(compactions.maxLevel())).IntVar(&cc.maxCompactionLevel)
Expand Down
12 changes: 7 additions & 5 deletions cmd/thanos/downsample.go
Expand Up @@ -126,7 +126,7 @@ func RunDownsample(
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc, false); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand All @@ -135,7 +135,7 @@ func RunDownsample(
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc, false); err != nil {
return errors.Wrap(err, "downsampling failed")
}
return nil
Expand Down Expand Up @@ -175,6 +175,7 @@ func downsampleBucket(
dir string,
downsampleConcurrency int,
hashFunc metadata.HashFunc,
acceptMalformedIndex bool,
) (rerr error) {
if err := os.MkdirAll(dir, 0750); err != nil {
return errors.Wrap(err, "create dir")
Expand Down Expand Up @@ -252,7 +253,7 @@ func downsampleBucket(
resolution = downsample.ResLevel2
errMsg = "downsampling to 60 min"
}
if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil {
if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics, acceptMalformedIndex); err != nil {
metrics.downsampleFailures.WithLabelValues(m.Thanos.GroupKey()).Inc()
errCh <- errors.Wrap(err, errMsg)

Expand Down Expand Up @@ -341,6 +342,7 @@ func processDownsampling(
resolution int64,
hashFunc metadata.HashFunc,
metrics *DownsampleMetrics,
acceptMalformedIndex bool,
) error {
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())
Expand All @@ -351,7 +353,7 @@ func processDownsampling(
}
level.Info(logger).Log("msg", "downloaded block", "id", m.ULID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

if err := block.VerifyIndex(logger, filepath.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil {
if err := block.VerifyIndex(logger, filepath.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil && !acceptMalformedIndex {
return errors.Wrap(err, "input block index not valid")
}

Expand Down Expand Up @@ -381,7 +383,7 @@ func processDownsampling(
"from", m.ULID, "to", id, "duration", downsampleDuration, "duration_ms", downsampleDuration.Milliseconds())
metrics.downsampleDuration.WithLabelValues(m.Thanos.GroupKey()).Observe(downsampleDuration.Seconds())

if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil {
if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil && !acceptMalformedIndex {
return errors.Wrap(err, "output block index not valid")
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/main_test.go
Expand Up @@ -157,7 +157,7 @@ func TestRegression4960_Deadlock(t *testing.T) {

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc)
err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc, false)
testutil.NotOk(t, err)

testutil.Assert(t, strings.Contains(err.Error(), "some random error has occurred"))
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc, false))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))

_, err = os.Stat(dir)
Expand Down

0 comments on commit 9237dd9

Please sign in to comment.