Skip to content

Commit

Permalink
skip blocks with out-of-order chunk during compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Hu <yhuz@amazon.com>
  • Loading branch information
huyan0 committed Aug 11, 2021
1 parent ca75179 commit 6197376
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 82 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4487](https://github.com/thanos-io/thanos/pull/4487) Query: Add memcached auto discovery support.
- [#4444](https://github.com/thanos-io/thanos/pull/4444) UI: Add search block UI.
- [#4509](https://github.com/thanos-io/thanos/pull/4509) Logging: Adds duration_ms in int64 to the logs.
- [#4469](https://github.com/thanos-io/thanos/pull/4482) Compact: Add flag `compact.skip-block-with-out-of-order-chunks` to skip blocks with out-of-order chunks during compaction instead of halting

### Fixed

Expand Down
12 changes: 9 additions & 3 deletions cmd/thanos/compact.go
Expand Up @@ -147,8 +147,8 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com
m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_blocks_marked_total",
Help: "Total number of blocks marked in compactor.",
}, []string{"marker"})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename)
}, []string{"marker", "reason"})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason)
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)

m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -349,13 +349,15 @@ func runCompact(
reg,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.garbageCollectedBlocks,
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
metadata.HashFunc(conf.hashFunc),
conf.skipBlockWithOutOfOrderChunks,
)
planner := compact.WithLargeTotalIndexSizeFilter(
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
bkt,
int64(conf.maxBlockIndexSize),
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
Expand Down Expand Up @@ -585,6 +587,7 @@ type compactConfig struct {
hashFunc string
enableVerticalCompaction bool
dedupFunc string
skipBlockWithOutOfOrderChunks bool
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -668,6 +671,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)

cmd.Flag("compact.skip-block-with-out-of-order-chunks", "When set to true, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction").
Hidden().Default("false").BoolVar(&cc.skipBlockWithOutOfOrderChunks)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&cc.hashFunc, "SHA256", "")

Expand Down
19 changes: 13 additions & 6 deletions pkg/block/index.go
Expand Up @@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error {
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

if i.OutOfOrderSeries > 0 {
errMsg = append(errMsg, fmt.Sprintf(
func (i HealthStats) OutOfOrderChunksErr() error {
if i.OutOfOrderChunks > 0 {
return errors.New(fmt.Sprintf(
"%d/%d series have an average of %.3f out-of-order chunks: "+
"%.3f of these are exact duplicates (in terms of data and time range)",
i.OutOfOrderSeries,
Expand All @@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error {
float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks),
))
}
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks)
if n > 0 {
Expand Down Expand Up @@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.OutOfOrderChunksErr(); err != nil {
errMsg = append(errMsg, err.Error())
}

if len(errMsg) > 0 {
return errors.New(strings.Join(errMsg, ", "))
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/block/index_test.go
Expand Up @@ -6,6 +6,7 @@ package block
import (
"context"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) {
testutil.Ok(t, ir2.Series(p.At(), &lset, &chks))
testutil.Equals(t, 1, len(chks))
}
}

func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) {
blockDir, err := ioutil.TempDir("", "test-ooo-index")
testutil.Ok(t, err)

err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64)
testutil.Ok(t, err)

stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64)

testutil.Ok(t, err)
testutil.Equals(t, 1, stats.OutOfOrderChunks)
testutil.NotOk(t, stats.OutOfOrderChunksErr())
}
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Expand Up @@ -67,6 +67,8 @@ const (
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
// OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked.
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
Expand Down
148 changes: 101 additions & 47 deletions pkg/compact/compact.go
Expand Up @@ -229,18 +229,20 @@ func defaultGroupKey(res int64, lbls labels.Labels) string {
// DefaultGrouper is the Thanos built-in grouper. It groups blocks based on downsample
// resolution and block's labels.
type DefaultGrouper struct {
bkt objstore.Bucket
logger log.Logger
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions *prometheus.CounterVec
compactionRunsStarted *prometheus.CounterVec
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
hashFunc metadata.HashFunc
bkt objstore.Bucket
logger log.Logger
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions *prometheus.CounterVec
compactionRunsStarted *prometheus.CounterVec
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
skipChunksWithOutOfOrderBlocks bool
}

// NewDefaultGrouper makes a new DefaultGrouper.
Expand All @@ -252,7 +254,9 @@ func NewDefaultGrouper(
reg prometheus.Registerer,
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
hashFunc metadata.HashFunc,
skipChunksWithOutOfOrderBlocks bool,
) *DefaultGrouper {
return &DefaultGrouper{
bkt: bkt,
Expand All @@ -279,9 +283,11 @@ func NewDefaultGrouper(
Name: "thanos_compact_group_vertical_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
}, []string{"group"}),
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
skipChunksWithOutOfOrderBlocks: skipChunksWithOutOfOrderBlocks,
}
}

Expand Down Expand Up @@ -309,7 +315,9 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.verticalCompactions.WithLabelValues(groupKey),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
g.hashFunc,
g.skipChunksWithOutOfOrderBlocks,
)
if err != nil {
return nil, errors.Wrap(err, "create compaction group")
Expand All @@ -330,23 +338,25 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
// Group captures a set of blocks that have the same origin labels and downsampling resolution.
// Those blocks generally contain the same series and can thus efficiently be compacted.
type Group struct {
logger log.Logger
bkt objstore.Bucket
key string
labels labels.Labels
resolution int64
mtx sync.Mutex
metasByMinTime []*metadata.Meta
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions prometheus.Counter
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionFailures prometheus.Counter
verticalCompactions prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
hashFunc metadata.HashFunc
logger log.Logger
bkt objstore.Bucket
key string
labels labels.Labels
resolution int64
mtx sync.Mutex
metasByMinTime []*metadata.Meta
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions prometheus.Counter
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionFailures prometheus.Counter
verticalCompactions prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
skipChunksWithOutofOrderBlocks bool
}

// NewGroup returns a new compaction group.
Expand All @@ -365,27 +375,31 @@ func NewGroup(
verticalCompactions prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
blockMakredForNoCopmact prometheus.Counter,
hashFunc metadata.HashFunc,
skipChunksWithOutOfOrderChunks bool,
) (*Group, error) {
if logger == nil {
logger = log.NewNopLogger()
}
g := &Group{
logger: logger,
bkt: bkt,
key: key,
labels: lset,
resolution: resolution,
acceptMalformedIndex: acceptMalformedIndex,
enableVerticalCompaction: enableVerticalCompaction,
compactions: compactions,
compactionRunsStarted: compactionRunsStarted,
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
logger: logger,
bkt: bkt,
key: key,
labels: lset,
resolution: resolution,
acceptMalformedIndex: acceptMalformedIndex,
enableVerticalCompaction: enableVerticalCompaction,
compactions: compactions,
compactionRunsStarted: compactionRunsStarted,
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blockMakredForNoCopmact,
hashFunc: hashFunc,
skipChunksWithOutofOrderBlocks: skipChunksWithOutOfOrderChunks,
}
return g, nil
}
Expand Down Expand Up @@ -541,6 +555,27 @@ func IsIssue347Error(err error) bool {
return ok
}

// OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index.
type OutOfOrderChunksError struct {
err error

id ulid.ULID
}

func (e OutOfOrderChunksError) Error() string {
return e.err.Error()
}

func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError {
return OutOfOrderChunksError{err: err, id: brokenBlock}
}

// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError.
func IsOutOfOrderChunkError(err error) bool {
_, ok := errors.Cause(err).(OutOfOrderChunksError)
return ok
}

// HaltError is a type wrapper for errors that should halt any further progress on compactions.
type HaltError struct {
err error
Expand Down Expand Up @@ -749,6 +784,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.OutOfOrderChunksErr(); cg.skipChunksWithOutofOrderBlocks && err != nil {
return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
Expand Down Expand Up @@ -939,6 +978,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
continue
}
}
// if block has out of order chunk, mark the block for no copmaction and continue
if IsOutOfOrderChunkError(err) {
if err := block.MarkForNoCompact(
ctx,
c.logger,
c.bkt,
err.(OutOfOrderChunksError).id,
metadata.OutOfOrderChunksNoCompactReason,
"OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
errChan <- errors.Wrapf(err, "group %s", g.Key())
return
}
Expand Down

0 comments on commit 6197376

Please sign in to comment.