Skip to content

Commit

Permalink
Fixed compactor tests; Moved to full e2e compact test; Cleaned metric…
Browse files Browse the repository at this point in the history
…s. (#1666)

* Fixed compactor tests; Moved to full e2e compact test; Cleaned metrics.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Removed block after each compaction group run.

Fixes: #1499

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Moved to label hash for dir names for compactor groups.

Fixes: #1661

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Addressed comments.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Addressed comments, rebased.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Oct 21, 2019
1 parent 2f88fca commit 06bd4ee
Show file tree
Hide file tree
Showing 9 changed files with 424 additions and 208 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1656](https://github.com/thanos-io/thanos/pull/1656) Thanos Store now starts metric and status probe HTTP server earlier in its start-up sequence. `/-/healthy` endpoint now starts to respond with success earlier. `/metrics` endpoint starts serving metrics earlier as well. Make sure to point your readiness probes to the `/-/ready` endpoint rather than `/metrics`.

### Changed

- [#1666](https://github.com/thanos-io/thanos/pull/1666) `thanos_compact_group_compactions_total` now counts block compactions, so operations that resulted in a compacted block. The old behaviour
is now exposed by new metric: `thanos_compact_group_compaction_runs_started_total` and `thanos_compact_group_compaction_runs_completed_total` which counts compaction runs overall.

## [v0.8.1](https://github.com/thanos-io/thanos/releases/tag/v0.8.1) - 2019.10.14

### Fixed
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,15 @@ func runCompact(
}

const (
MetricIndexGenerateName = "thanos_compact_generated_index_total"
MetricIndexGenerateHelp = "Total number of generated indexes."
metricIndexGenerateName = "thanos_compact_generated_index_total"
metricIndexGenerateHelp = "Total number of generated indexes."
)

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
reg.MustRegister(genIndex)

Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m)).Inc()
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 5 min")
}
metrics.downsamples.WithLabelValues(compact.GroupKey(*m)).Inc()
metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos)).Inc()

case downsample.ResLevel1:
missing := false
Expand All @@ -237,10 +237,10 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m))
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos))
return errors.Wrap(err, "downsampling to 60 min")
}
metrics.downsamples.WithLabelValues(compact.GroupKey(*m))
metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos))
}
}
return nil
Expand Down
128 changes: 60 additions & 68 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"

"github.com/thanos-io/thanos/pkg/block/metadata"

"testing"
"time"
Expand All @@ -13,116 +16,105 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestCleanupCompactCacheFolder(t *testing.T) {
ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

sy, err := compact.NewSyncer(logger, actReg, bkt, 0*time.Second, 1, false, nil)
testutil.Ok(t, err)

expReg := prometheus.NewRegistry()
syncExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: compact.MetricSyncMetaName,
Help: compact.MetricSyncMetaHelp,
})
expReg.MustRegister(syncExp)

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

comp, err := tsdb.NewLeveledCompactor(ctx, nil, logger, []int64{1}, nil)
testutil.Ok(t, err)

bComp, err := compact.NewBucketCompactor(logger, sy, comp, dir, bkt, 1)
func TestCleanupIndexCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

// Even with with a single uploaded block the bucker compactor needs to
// downloads the meta file to plan the compaction groups.
testutil.Ok(t, bComp.Compact(ctx))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

syncExp.Inc()
bkt := inmem.NewBucket()

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)
// Upload one compaction lvl = 2 block, one compaction lvl = 1.
// We generate index cache files only for lvl > 1 blocks.
{
id, err := testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
meta, err := metadata.Read(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

}
meta.Compaction.Level = 2

func TestCleanupIndexCacheFolder(t *testing.T) {
ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
testutil.Ok(t, metadata.Write(logger, filepath.Join(dir, id.String()), meta))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
}
{
id, err := testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
}

reg := prometheus.NewRegistry()
expReg := prometheus.NewRegistry()
genIndexExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
expReg.MustRegister(genIndexExp)

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, actReg, bkt, dir))
testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, dir))

genIndexExp.Inc()
testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

_, err := os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func TestCleanupDownsampleCacheFolder(t *testing.T) {
ctx, logger, dir, blckID, bkt, reg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

meta, err := block.DownloadMeta(ctx, logger, bkt, blckID)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(reg)
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))
testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName)

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func bootstrap(t *testing.T) (context.Context, log.Logger, string, ulid.ULID, objstore.Bucket, *prometheus.Registry) {
func TestCleanupDownsampleCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := inmem.NewBucket()
var blckID ulid.ULID

// Create and upload a single block to the bucker.
// The compaction will download the meta block of
// this block to plan the compaction groups.
var id ulid.ULID
{
blckID, err = testutil.CreateBlock(
id, err = testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{
{{Name: "a", Value: "1"}},
},
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, blckID.String())))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
}

return ctx, logger, dir, blckID, bkt, prometheus.NewRegistry()
meta, err := block.DownloadMeta(ctx, logger, bkt, id)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

0 comments on commit 06bd4ee

Please sign in to comment.