diff --git a/CHANGELOG.md b/CHANGELOG.md index 85450122a95..0aeeb4eae12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#4453](https://github.com/thanos-io/thanos/pull/4453) Tools: Add flag `--selector.relabel-config-file` / `--selector.relabel-config` / `--max-time` / `--min-time` to filter served blocks. - [#4482](https://github.com/thanos-io/thanos/pull/4482) COS: Add http_config for cos object store client. +- [#4469](https://github.com/thanos-io/thanos/pull/4482) Add flag `compact.skip-block-with-out-of-order-chunks` to skip blocks with out-of-order chunks during compaction instead of halting ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index af3fe781293..97317502dfc 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -147,8 +147,8 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_compact_blocks_marked_total", Help: "Total number of blocks marked in compactor.", - }, []string{"marker"}) - m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename) + }, []string{"marker", "reason"}) + m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason) m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename) m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -349,13 +349,15 @@ func runCompact( reg, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), compactMetrics.garbageCollectedBlocks, + compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason), metadata.HashFunc(conf.hashFunc), + conf.skipBlockWithOutOfOrderChunks, ) planner := compact.WithLargeTotalIndexSizeFilter( compact.NewPlanner(logger, levels, noCompactMarkerFilter), bkt, int64(conf.maxBlockIndexSize), - compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), + compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason), ) blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) compactor, err := compact.NewBucketCompactor( @@ -585,6 +587,7 @@ type compactConfig struct { hashFunc string enableVerticalCompaction bool dedupFunc string + skipBlockWithOutOfOrderChunks bool } func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -668,6 +671,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size."). Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize) + cmd.Flag("compact.skip-block-with-out-of-order-chunks", "When set to true, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction"). + Hidden().Default("false").BoolVar(&cc.skipBlockWithOutOfOrderChunks) + cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). Default("").EnumVar(&cc.hashFunc, "SHA256", "") diff --git a/docs/components/compact.md b/docs/components/compact.md index 176ea4950c2..fee9cae6975 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -301,6 +301,10 @@ Flags: happen at the end of an iteration. --compact.concurrency=1 Number of goroutines to use when compacting groups. + --compact.skip-block-with-out-of-order-chunks=false + Mark blocks containing index with out-of-order + chunks for no compact instead of halting the + compaction. --consistency-delay=30m Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of diff --git a/pkg/block/index.go b/pkg/block/index.go index 2b6ece295e5..851dfa9d98a 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error { return nil } -// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure. -func (i HealthStats) CriticalErr() error { - var errMsg []string - - if i.OutOfOrderSeries > 0 { - errMsg = append(errMsg, fmt.Sprintf( +func (i HealthStats) OutOfOrderChunksErr() error { + if i.OutOfOrderChunks > 0 { + return errors.New(fmt.Sprintf( "%d/%d series have an average of %.3f out-of-order chunks: "+ "%.3f of these are exact duplicates (in terms of data and time range)", i.OutOfOrderSeries, @@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error { float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks), )) } + return nil +} + +// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure. +func (i HealthStats) CriticalErr() error { + var errMsg []string n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks) if n > 0 { @@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error { errMsg = append(errMsg, err.Error()) } + if err := i.OutOfOrderChunksErr(); err != nil { + errMsg = append(errMsg, err.Error()) + } + if len(errMsg) > 0 { return errors.New(strings.Join(errMsg, ", ")) } diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go index 6db6554f444..044c0478740 100644 --- a/pkg/block/index_test.go +++ b/pkg/block/index_test.go @@ -6,6 +6,7 @@ package block import ( "context" "io/ioutil" + "math" "os" "path/filepath" "testing" @@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) { testutil.Ok(t, ir2.Series(p.At(), &lset, &chks)) testutil.Equals(t, 1, len(chks)) } +} + +func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) { + blockDir, err := ioutil.TempDir("", "test-ooo-index") + testutil.Ok(t, err) + err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64) + testutil.Ok(t, err) + + stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64) + + testutil.Ok(t, err) + testutil.Equals(t, 1, stats.OutOfOrderChunks) + testutil.NotOk(t, stats.OutOfOrderChunksErr()) } diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go index b3c8b9d1f05..f2d40bd0452 100644 --- a/pkg/block/metadata/markers.go +++ b/pkg/block/metadata/markers.go @@ -67,6 +67,8 @@ const ( // IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424) // This reason can be ignored when vertical block sharding will be implemented. IndexSizeExceedingNoCompactReason = "index-size-exceeding" + // OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked. + OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk" ) // NoCompactMark marker stores reason of block being excluded from compaction if needed. diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index a0f62fbc619..e8823b4653b 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -229,18 +229,20 @@ func defaultGroupKey(res int64, lbls labels.Labels) string { // DefaultGrouper is the Thanos built-in grouper. It groups blocks based on downsample // resolution and block's labels. type DefaultGrouper struct { - bkt objstore.Bucket - logger log.Logger - acceptMalformedIndex bool - enableVerticalCompaction bool - compactions *prometheus.CounterVec - compactionRunsStarted *prometheus.CounterVec - compactionRunsCompleted *prometheus.CounterVec - compactionFailures *prometheus.CounterVec - verticalCompactions *prometheus.CounterVec - garbageCollectedBlocks prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - hashFunc metadata.HashFunc + bkt objstore.Bucket + logger log.Logger + acceptMalformedIndex bool + enableVerticalCompaction bool + compactions *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec + compactionFailures *prometheus.CounterVec + verticalCompactions *prometheus.CounterVec + garbageCollectedBlocks prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter + hashFunc metadata.HashFunc + skipChunksWithOutOfOrderBlocks bool } // NewDefaultGrouper makes a new DefaultGrouper. @@ -252,7 +254,9 @@ func NewDefaultGrouper( reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, + skipChunksWithOutOfOrderBlocks bool, ) *DefaultGrouper { return &DefaultGrouper{ bkt: bkt, @@ -279,9 +283,11 @@ func NewDefaultGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}), - garbageCollectedBlocks: garbageCollectedBlocks, - blocksMarkedForDeletion: blocksMarkedForDeletion, - hashFunc: hashFunc, + blocksMarkedForNoCompact: blocksMarkedForNoCompact, + garbageCollectedBlocks: garbageCollectedBlocks, + blocksMarkedForDeletion: blocksMarkedForDeletion, + hashFunc: hashFunc, + skipChunksWithOutOfOrderBlocks: skipChunksWithOutOfOrderBlocks, } } @@ -309,7 +315,9 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.verticalCompactions.WithLabelValues(groupKey), g.garbageCollectedBlocks, g.blocksMarkedForDeletion, + g.blocksMarkedForNoCompact, g.hashFunc, + g.skipChunksWithOutOfOrderBlocks, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -330,23 +338,25 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro // Group captures a set of blocks that have the same origin labels and downsampling resolution. // Those blocks generally contain the same series and can thus efficiently be compacted. type Group struct { - logger log.Logger - bkt objstore.Bucket - key string - labels labels.Labels - resolution int64 - mtx sync.Mutex - metasByMinTime []*metadata.Meta - acceptMalformedIndex bool - enableVerticalCompaction bool - compactions prometheus.Counter - compactionRunsStarted prometheus.Counter - compactionRunsCompleted prometheus.Counter - compactionFailures prometheus.Counter - verticalCompactions prometheus.Counter - groupGarbageCollectedBlocks prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - hashFunc metadata.HashFunc + logger log.Logger + bkt objstore.Bucket + key string + labels labels.Labels + resolution int64 + mtx sync.Mutex + metasByMinTime []*metadata.Meta + acceptMalformedIndex bool + enableVerticalCompaction bool + compactions prometheus.Counter + compactionRunsStarted prometheus.Counter + compactionRunsCompleted prometheus.Counter + compactionFailures prometheus.Counter + verticalCompactions prometheus.Counter + groupGarbageCollectedBlocks prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter + hashFunc metadata.HashFunc + skipChunksWithOutofOrderBlocks bool } // NewGroup returns a new compaction group. @@ -365,27 +375,31 @@ func NewGroup( verticalCompactions prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, blocksMarkedForDeletion prometheus.Counter, + blockMakredForNoCopmact prometheus.Counter, hashFunc metadata.HashFunc, + skipChunksWithOutOfOrderChunks bool, ) (*Group, error) { if logger == nil { logger = log.NewNopLogger() } g := &Group{ - logger: logger, - bkt: bkt, - key: key, - labels: lset, - resolution: resolution, - acceptMalformedIndex: acceptMalformedIndex, - enableVerticalCompaction: enableVerticalCompaction, - compactions: compactions, - compactionRunsStarted: compactionRunsStarted, - compactionRunsCompleted: compactionRunsCompleted, - compactionFailures: compactionFailures, - verticalCompactions: verticalCompactions, - groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, - blocksMarkedForDeletion: blocksMarkedForDeletion, - hashFunc: hashFunc, + logger: logger, + bkt: bkt, + key: key, + labels: lset, + resolution: resolution, + acceptMalformedIndex: acceptMalformedIndex, + enableVerticalCompaction: enableVerticalCompaction, + compactions: compactions, + compactionRunsStarted: compactionRunsStarted, + compactionRunsCompleted: compactionRunsCompleted, + compactionFailures: compactionFailures, + verticalCompactions: verticalCompactions, + groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, + blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksMarkedForNoCompact: blockMakredForNoCopmact, + hashFunc: hashFunc, + skipChunksWithOutofOrderBlocks: skipChunksWithOutOfOrderChunks, } return g, nil } @@ -541,6 +555,27 @@ func IsIssue347Error(err error) bool { return ok } +// OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index. +type OutOfOrderChunksError struct { + err error + + id ulid.ULID +} + +func (e OutOfOrderChunksError) Error() string { + return e.err.Error() +} + +func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError { + return OutOfOrderChunksError{err: err, id: brokenBlock} +} + +// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError. +func IsOutOfOrderChunkError(err error) bool { + _, ok := errors.Cause(err).(OutOfOrderChunksError) + return ok +} + // HaltError is a type wrapper for errors that should halt any further progress on compactions. type HaltError struct { err error @@ -749,6 +784,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) } + if err := stats.OutOfOrderChunksErr(); cg.skipChunksWithOutofOrderBlocks && err != nil { + return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) + } + if err := stats.Issue347OutsideChunksErr(); err != nil { return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) } @@ -939,6 +978,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { continue } } + // if block has out of order chunk, mark the block for no copmaction and continue + if IsOutOfOrderChunkError(err) { + if err := block.MarkForNoCompact( + ctx, + c.logger, + c.bkt, + err.(OutOfOrderChunksError).id, + metadata.OutOfOrderChunksNoCompactReason, + "OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil { + mtx.Lock() + finishedAllGroups = false + mtx.Unlock() + continue + } + } errChan <- errors.Wrapf(err, "group %s", g.Key()) return } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index cc690188cc8..bfd0cb9fe51 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -102,6 +102,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency) sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 1) testutil.Ok(t, err) @@ -138,7 +139,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.GarbageCollect(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc) + grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, true) groups, err := grouper.Groups(sy.Metas()) testutil.Ok(t, err) @@ -195,13 +196,16 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter() + noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2) metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, + noCompactMarkerFilter, }, nil) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5) testutil.Ok(t, err) @@ -209,9 +213,8 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc) testutil.Ok(t, err) - planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000}) - - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc) + planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, true) bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2) testutil.Ok(t, err) @@ -220,6 +223,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) + testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.blocksMarkedForNoCompact)) testutil.Equals(t, 0, MetricCount(grouper.compactions)) testutil.Equals(t, 0, MetricCount(grouper.compactionRunsStarted)) testutil.Equals(t, 0, MetricCount(grouper.compactionRunsCompleted)) @@ -233,7 +237,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg extLabels2 := labels.Labels{{Name: "e1", Value: "1"}} metas := createAndUpload(t, bkt, []blockgenSpec{ { - numSamples: 100, mint: 0, maxt: 1000, extLset: extLabels, res: 124, + numSamples: 100, mint: 500, maxt: 1000, extLset: extLabels, res: 124, series: []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}}, @@ -303,11 +307,22 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg {{Name: "a", Value: "7"}}, }, }, + }, []blockgenSpec{ + { + numSamples: 100, mint: 0, maxt: 499, extLset: extLabels, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + }, + }, }) testutil.Ok(t, bComp.Compact(ctx)) testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion)) + testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.blocksMarkedForNoCompact)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) testutil.Equals(t, 4, MetricCount(grouper.compactions)) testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) @@ -315,19 +330,19 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) testutil.Equals(t, 4, MetricCount(grouper.compactionRunsStarted)) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) // TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate. - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) testutil.Equals(t, 4, MetricCount(grouper.compactionRunsCompleted)) testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) // TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate. - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) testutil.Equals(t, 4, MetricCount(grouper.compactionFailures)) - testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) + testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) @@ -342,6 +357,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg metas[4].ULID: false, metas[5].ULID: false, metas[8].ULID: false, + metas[9].ULID: false, } others := map[string]metadata.Meta{} testutil.Ok(t, bkt.Iter(ctx, "", func(n string) error { @@ -374,7 +390,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg meta, ok := others[defaultGroupKey(124, extLabels)] testutil.Assert(t, ok, "meta not found") - testutil.Equals(t, int64(0), meta.MinTime) + testutil.Equals(t, int64(500), meta.MinTime) testutil.Equals(t, int64(3000), meta.MaxTime) testutil.Equals(t, uint64(6), meta.Stats.NumSeries) testutil.Equals(t, uint64(2*4*100), meta.Stats.NumSamples) // Only 2 times 4*100 because one block was empty. @@ -413,7 +429,7 @@ type blockgenSpec struct { res int64 } -func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (metas []*metadata.Meta) { +func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, blocksWithOutOfOrderChunks []blockgenSpec) (metas []*metadata.Meta) { prepareDir, err := ioutil.TempDir("", "test-compact-prepare") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(prepareDir)) }() @@ -422,23 +438,35 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( defer cancel() for _, b := range blocks { - var id ulid.ULID - var err error - if b.numSamples == 0 { - id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) - } else { - id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res, metadata.NoneFunc) - } - testutil.Ok(t, err) + id, meta := createBlock(t, ctx, prepareDir, b) + metas = append(metas, meta) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), metadata.NoneFunc)) + } + for _, b := range blocksWithOutOfOrderChunks { + id, meta := createBlock(t, ctx, prepareDir, b) - meta, err := metadata.ReadFromDir(filepath.Join(prepareDir, id.String())) + err := testutil.PutOutOfOrderIndex(filepath.Join(prepareDir, id.String()), b.mint, b.maxt) testutil.Ok(t, err) - metas = append(metas, meta) + metas = append(metas, meta) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), metadata.NoneFunc)) } + return metas } +func createBlock(t testing.TB, ctx context.Context, prepareDir string, b blockgenSpec) (id ulid.ULID, meta *metadata.Meta) { + var err error + if b.numSamples == 0 { + id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) + } else { + id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res, metadata.NoneFunc) + } + testutil.Ok(t, err) + + meta, err = metadata.ReadFromDir(filepath.Join(prepareDir, id.String())) + testutil.Ok(t, err) + return +} // Regression test for #2459 issue. func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T) { diff --git a/pkg/query/querier.go b/pkg/query/querier.go index b1aa95bee2f..30202d5b5e4 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -207,7 +207,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match } // The querier has a context but it gets canceled, as soon as query evaluation is completed, by the engine. - // We want to prevent this from happening for the async storea API calls we make while preserving tracing context. + // We want to prevent this from happening for the async store API calls we make while preserving tracing context. ctx := tracing.CopyTraceContext(context.Background(), q.ctx) ctx, cancel := context.WithTimeout(ctx, q.selectTimeout) span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{ diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 7f0e0a89110..84d9481800d 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -4,13 +4,21 @@ package testutil import ( + "context" "fmt" + "math/rand" "path/filepath" "reflect" "runtime" "runtime/debug" + "sort" "testing" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/davecgh/go-spew/spew" "github.com/pkg/errors" "github.com/pmezard/go-difflib/difflib" @@ -203,3 +211,106 @@ func FaultOrPanicToErr(f func()) (err error) { return err } + +var indexFilename = "index" + +type indexWriterSeries struct { + labels labels.Labels + chunks []chunks.Meta // series file offset of chunks +} + +type indexWriterSeriesSlice []*indexWriterSeries + +// putOutOfOrderIndex updates the index in blockDir with an index containing an out-of-order chunk +// copied from https://github.com/prometheus/prometheus/blob/b1ed4a0a663d0c62526312311c7529471abbc565/tsdb/index/index_test.go#L346 +func PutOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error { + + if minTime >= maxTime || minTime+4 >= maxTime { + return fmt.Errorf("minTime must be at least 4 less than maxTime to not create overlapping chunks") + } + + lbls := []labels.Labels{ + []labels.Label{ + {Name: "lbl1", Value: "1"}, + }, + } + + // Sort labels as the index writer expects series in sorted order. + sort.Sort(labels.Slice(lbls)) + + symbols := map[string]struct{}{} + for _, lset := range lbls { + for _, l := range lset { + symbols[l.Name] = struct{}{} + symbols[l.Value] = struct{}{} + } + } + + var input indexWriterSeriesSlice + + // Generate ChunkMetas for every label set. + for _, lset := range lbls { + var metas []chunks.Meta + // only need two chunks that are out-of-order + chk1 := chunks.Meta{ + MinTime: maxTime - 2, + MaxTime: maxTime - 1, + Ref: rand.Uint64(), + Chunk: chunkenc.NewXORChunk(), + } + metas = append(metas, chk1) + chk2 := chunks.Meta{ + MinTime: minTime + 1, + MaxTime: minTime + 2, + Ref: rand.Uint64(), + Chunk: chunkenc.NewXORChunk(), + } + metas = append(metas, chk2) + + input = append(input, &indexWriterSeries{ + labels: lset, + chunks: metas, + }) + } + + iw, err := index.NewWriter(context.Background(), filepath.Join(blockDir, indexFilename)) + if err != nil { + return err + } + + syms := []string{} + for s := range symbols { + syms = append(syms, s) + } + sort.Strings(syms) + for _, s := range syms { + if err := iw.AddSymbol(s); err != nil { + return err + } + } + + // Population procedure as done by compaction. + var ( + postings = index.NewMemPostings() + values = map[string]map[string]struct{}{} + ) + + for i, s := range input { + if err := iw.AddSeries(uint64(i), s.labels, s.chunks...); err != nil { + return err + } + + for _, l := range s.labels { + valset, ok := values[l.Name] + if !ok { + valset = map[string]struct{}{} + values[l.Name] = valset + } + valset[l.Value] = struct{}{} + } + postings.Add(uint64(i), s.labels) + } + + err = iw.Close() + return err +}