diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index af3fe781293..acec837eb84 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -147,8 +147,8 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_compact_blocks_marked_total", Help: "Total number of blocks marked in compactor.", - }, []string{"marker"}) - m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename) + }, []string{"marker", "reason"}) + m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason) m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename) m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -349,13 +349,14 @@ func runCompact( reg, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), compactMetrics.garbageCollectedBlocks, + compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason), metadata.HashFunc(conf.hashFunc), ) planner := compact.WithLargeTotalIndexSizeFilter( compact.NewPlanner(logger, levels, noCompactMarkerFilter), bkt, int64(conf.maxBlockIndexSize), - compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), + compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason), ) blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) compactor, err := compact.NewBucketCompactor( diff --git a/pkg/block/index.go b/pkg/block/index.go index 2b6ece295e5..851dfa9d98a 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error { return nil } -// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure. -func (i HealthStats) CriticalErr() error { - var errMsg []string - - if i.OutOfOrderSeries > 0 { - errMsg = append(errMsg, fmt.Sprintf( +func (i HealthStats) OutOfOrderChunksErr() error { + if i.OutOfOrderChunks > 0 { + return errors.New(fmt.Sprintf( "%d/%d series have an average of %.3f out-of-order chunks: "+ "%.3f of these are exact duplicates (in terms of data and time range)", i.OutOfOrderSeries, @@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error { float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks), )) } + return nil +} + +// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure. +func (i HealthStats) CriticalErr() error { + var errMsg []string n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks) if n > 0 { @@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error { errMsg = append(errMsg, err.Error()) } + if err := i.OutOfOrderChunksErr(); err != nil { + errMsg = append(errMsg, err.Error()) + } + if len(errMsg) > 0 { return errors.New(strings.Join(errMsg, ", ")) } diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go index 6db6554f444..044c0478740 100644 --- a/pkg/block/index_test.go +++ b/pkg/block/index_test.go @@ -6,6 +6,7 @@ package block import ( "context" "io/ioutil" + "math" "os" "path/filepath" "testing" @@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) { testutil.Ok(t, ir2.Series(p.At(), &lset, &chks)) testutil.Equals(t, 1, len(chks)) } +} + +func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) { + blockDir, err := ioutil.TempDir("", "test-ooo-index") + testutil.Ok(t, err) + err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64) + testutil.Ok(t, err) + + stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64) + + testutil.Ok(t, err) + testutil.Equals(t, 1, stats.OutOfOrderChunks) + testutil.NotOk(t, stats.OutOfOrderChunksErr()) } diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go index b3c8b9d1f05..f2d40bd0452 100644 --- a/pkg/block/metadata/markers.go +++ b/pkg/block/metadata/markers.go @@ -67,6 +67,8 @@ const ( // IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424) // This reason can be ignored when vertical block sharding will be implemented. IndexSizeExceedingNoCompactReason = "index-size-exceeding" + // OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked. + OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk" ) // NoCompactMark marker stores reason of block being excluded from compaction if needed. diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index a0f62fbc619..03f1cad9b38 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -240,6 +240,7 @@ type DefaultGrouper struct { verticalCompactions *prometheus.CounterVec garbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter hashFunc metadata.HashFunc } @@ -252,6 +253,7 @@ func NewDefaultGrouper( reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, ) *DefaultGrouper { return &DefaultGrouper{ @@ -279,9 +281,10 @@ func NewDefaultGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}), - garbageCollectedBlocks: garbageCollectedBlocks, - blocksMarkedForDeletion: blocksMarkedForDeletion, - hashFunc: hashFunc, + blocksMarkedForNoCompact: blocksMarkedForNoCompact, + garbageCollectedBlocks: garbageCollectedBlocks, + blocksMarkedForDeletion: blocksMarkedForDeletion, + hashFunc: hashFunc, } } @@ -309,6 +312,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.verticalCompactions.WithLabelValues(groupKey), g.garbageCollectedBlocks, g.blocksMarkedForDeletion, + g.blocksMarkedForNoCompact, g.hashFunc, ) if err != nil { @@ -346,6 +350,7 @@ type Group struct { verticalCompactions prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter hashFunc metadata.HashFunc } @@ -365,6 +370,7 @@ func NewGroup( verticalCompactions prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, blocksMarkedForDeletion prometheus.Counter, + blockMakredForNoCopmact prometheus.Counter, hashFunc metadata.HashFunc, ) (*Group, error) { if logger == nil { @@ -385,6 +391,7 @@ func NewGroup( verticalCompactions: verticalCompactions, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksMarkedForNoCompact: blockMakredForNoCopmact, hashFunc: hashFunc, } return g, nil @@ -541,6 +548,27 @@ func IsIssue347Error(err error) bool { return ok } +// OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index. +type OutOfOrderChunksError struct { + err error + + id ulid.ULID +} + +func (e OutOfOrderChunksError) Error() string { + return e.err.Error() +} + +func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError { + return OutOfOrderChunksError{err: err, id: brokenBlock} +} + +// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError. +func IsOutOfOrderChunkError(err error) bool { + _, ok := errors.Cause(err).(OutOfOrderChunksError) + return ok +} + // HaltError is a type wrapper for errors that should halt any further progress on compactions. type HaltError struct { err error @@ -749,6 +777,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) } + if err := stats.OutOfOrderChunksErr(); err != nil { + return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks should be dropped from compaction: %s", bdir), meta.ULID) + } + if err := stats.Issue347OutsideChunksErr(); err != nil { return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) } @@ -939,6 +971,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { continue } } + // if block has out of order chunk, mark the block for no copmaction and continue + if IsOutOfOrderChunkError(err) { + if err := block.MarkForNoCompact( + ctx, + c.logger, + c.bkt, + err.(OutOfOrderChunksError).id, + metadata.OutOfOrderChunksNoCompactReason, + "OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil { + mtx.Lock() + finishedAllGroups = false + mtx.Unlock() + continue + } + } errChan <- errors.Wrapf(err, "group %s", g.Key()) return } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index cc690188cc8..5c183ace21e 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -102,6 +102,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency) sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 1) testutil.Ok(t, err) @@ -138,7 +139,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.GarbageCollect(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc) + grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc) groups, err := grouper.Groups(sy.Metas()) testutil.Ok(t, err) @@ -195,13 +196,16 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter() + noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2) metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, + noCompactMarkerFilter, }, nil) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5) testutil.Ok(t, err) @@ -209,9 +213,9 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc) testutil.Ok(t, err) - planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000}) + planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter) - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc) bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2) testutil.Ok(t, err) @@ -220,6 +224,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) + testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.blocksMarkedForNoCompact)) testutil.Equals(t, 0, MetricCount(grouper.compactions)) testutil.Equals(t, 0, MetricCount(grouper.compactionRunsStarted)) testutil.Equals(t, 0, MetricCount(grouper.compactionRunsCompleted)) @@ -233,7 +238,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg extLabels2 := labels.Labels{{Name: "e1", Value: "1"}} metas := createAndUpload(t, bkt, []blockgenSpec{ { - numSamples: 100, mint: 0, maxt: 1000, extLset: extLabels, res: 124, + numSamples: 100, mint: 500, maxt: 1000, extLset: extLabels, res: 124, series: []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}}, @@ -303,11 +308,22 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg {{Name: "a", Value: "7"}}, }, }, + }, []blockgenSpec{ + { + numSamples: 100, mint: 0, maxt: 499, extLset: extLabels, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + }, + }, }) testutil.Ok(t, bComp.Compact(ctx)) testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion)) + testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.blocksMarkedForNoCompact)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) testutil.Equals(t, 4, MetricCount(grouper.compactions)) testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) @@ -315,19 +331,19 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) testutil.Equals(t, 4, MetricCount(grouper.compactionRunsStarted)) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) // TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate. - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) testutil.Equals(t, 4, MetricCount(grouper.compactionRunsCompleted)) testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) // TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate. - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) testutil.Equals(t, 4, MetricCount(grouper.compactionFailures)) - testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) + testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) @@ -342,6 +358,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg metas[4].ULID: false, metas[5].ULID: false, metas[8].ULID: false, + metas[9].ULID: false, } others := map[string]metadata.Meta{} testutil.Ok(t, bkt.Iter(ctx, "", func(n string) error { @@ -374,7 +391,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg meta, ok := others[defaultGroupKey(124, extLabels)] testutil.Assert(t, ok, "meta not found") - testutil.Equals(t, int64(0), meta.MinTime) + testutil.Equals(t, int64(500), meta.MinTime) testutil.Equals(t, int64(3000), meta.MaxTime) testutil.Equals(t, uint64(6), meta.Stats.NumSeries) testutil.Equals(t, uint64(2*4*100), meta.Stats.NumSamples) // Only 2 times 4*100 because one block was empty. @@ -413,7 +430,7 @@ type blockgenSpec struct { res int64 } -func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (metas []*metadata.Meta) { +func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, blocksWithOutOfOrderChunks []blockgenSpec) (metas []*metadata.Meta) { prepareDir, err := ioutil.TempDir("", "test-compact-prepare") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(prepareDir)) }() @@ -422,23 +439,35 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( defer cancel() for _, b := range blocks { - var id ulid.ULID - var err error - if b.numSamples == 0 { - id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) - } else { - id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res, metadata.NoneFunc) - } - testutil.Ok(t, err) + id, meta := createBlock(t, ctx, prepareDir, b) + metas = append(metas, meta) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), metadata.NoneFunc)) + } + for _, b := range blocksWithOutOfOrderChunks { + id, meta := createBlock(t, ctx, prepareDir, b) - meta, err := metadata.ReadFromDir(filepath.Join(prepareDir, id.String())) + err := testutil.PutOutOfOrderIndex(filepath.Join(prepareDir, id.String()), b.mint, b.maxt) testutil.Ok(t, err) - metas = append(metas, meta) + metas = append(metas, meta) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), metadata.NoneFunc)) } + return metas } +func createBlock(t testing.TB, ctx context.Context, prepareDir string, b blockgenSpec) (id ulid.ULID, meta *metadata.Meta) { + var err error + if b.numSamples == 0 { + id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) + } else { + id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res, metadata.NoneFunc) + } + testutil.Ok(t, err) + + meta, err = metadata.ReadFromDir(filepath.Join(prepareDir, id.String())) + testutil.Ok(t, err) + return +} // Regression test for #2459 issue. func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T) { diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 7f0e0a89110..84d9481800d 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -4,13 +4,21 @@ package testutil import ( + "context" "fmt" + "math/rand" "path/filepath" "reflect" "runtime" "runtime/debug" + "sort" "testing" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/davecgh/go-spew/spew" "github.com/pkg/errors" "github.com/pmezard/go-difflib/difflib" @@ -203,3 +211,106 @@ func FaultOrPanicToErr(f func()) (err error) { return err } + +var indexFilename = "index" + +type indexWriterSeries struct { + labels labels.Labels + chunks []chunks.Meta // series file offset of chunks +} + +type indexWriterSeriesSlice []*indexWriterSeries + +// putOutOfOrderIndex updates the index in blockDir with an index containing an out-of-order chunk +// copied from https://github.com/prometheus/prometheus/blob/b1ed4a0a663d0c62526312311c7529471abbc565/tsdb/index/index_test.go#L346 +func PutOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error { + + if minTime >= maxTime || minTime+4 >= maxTime { + return fmt.Errorf("minTime must be at least 4 less than maxTime to not create overlapping chunks") + } + + lbls := []labels.Labels{ + []labels.Label{ + {Name: "lbl1", Value: "1"}, + }, + } + + // Sort labels as the index writer expects series in sorted order. + sort.Sort(labels.Slice(lbls)) + + symbols := map[string]struct{}{} + for _, lset := range lbls { + for _, l := range lset { + symbols[l.Name] = struct{}{} + symbols[l.Value] = struct{}{} + } + } + + var input indexWriterSeriesSlice + + // Generate ChunkMetas for every label set. + for _, lset := range lbls { + var metas []chunks.Meta + // only need two chunks that are out-of-order + chk1 := chunks.Meta{ + MinTime: maxTime - 2, + MaxTime: maxTime - 1, + Ref: rand.Uint64(), + Chunk: chunkenc.NewXORChunk(), + } + metas = append(metas, chk1) + chk2 := chunks.Meta{ + MinTime: minTime + 1, + MaxTime: minTime + 2, + Ref: rand.Uint64(), + Chunk: chunkenc.NewXORChunk(), + } + metas = append(metas, chk2) + + input = append(input, &indexWriterSeries{ + labels: lset, + chunks: metas, + }) + } + + iw, err := index.NewWriter(context.Background(), filepath.Join(blockDir, indexFilename)) + if err != nil { + return err + } + + syms := []string{} + for s := range symbols { + syms = append(syms, s) + } + sort.Strings(syms) + for _, s := range syms { + if err := iw.AddSymbol(s); err != nil { + return err + } + } + + // Population procedure as done by compaction. + var ( + postings = index.NewMemPostings() + values = map[string]map[string]struct{}{} + ) + + for i, s := range input { + if err := iw.AddSeries(uint64(i), s.labels, s.chunks...); err != nil { + return err + } + + for _, l := range s.labels { + valset, ok := values[l.Name] + if !ok { + valset = map[string]struct{}{} + values[l.Name] = valset + } + valset[l.Value] = struct{}{} + } + postings.Add(uint64(i), s.labels) + } + + err = iw.Close() + return err +}