Skip to content

Commit

Permalink
compact: Added index size limiting planner detecting output index siz…
Browse files Browse the repository at this point in the history
…e over 64GB.

Fixes: #1424

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Nov 4, 2020
1 parent 8cf0af5 commit 99f0ee5
Show file tree
Hide file tree
Showing 5 changed files with 403 additions and 2 deletions.
32 changes: 32 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,35 @@ func gatherFileStats(blockDir string) (res []metadata.File, _ error) {
// TODO(bwplotka): Add optional files like tombstones?
return res, err
}

// MarkForNoCompact creates a file which marks block to be not compacted.
func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, noCompactDetails string, markedForNoCompact prometheus.Counter) error {
m := path.Join(id.String(), metadata.NoCompactMarkFilename)
noCompactMarkExists, err := bkt.Exists(ctx, m)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", m)
}
if noCompactMarkExists {
level.Warn(logger).Log("msg", "requested to mark for no compaction, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", m))
return nil
}

noCompactMark, err := json.Marshal(metadata.NoCompactMark{
ID: id,
Version: metadata.NoCompactMarkVersion1,

Time: time.Now().Unix(),
Reason: reason,
Details: noCompactDetails,
})
if err != nil {
return errors.Wrap(err, "json encode no compact mark")
}

if err := bkt.Upload(ctx, m, bytes.NewBuffer(noCompactMark)); err != nil {
return errors.Wrapf(err, "upload file %s to bucket", m)
}
markedForNoCompact.Inc()
level.Info(logger).Log("msg", "block has been marked for no compaction", "block", id)
return nil
}
56 changes: 56 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,59 @@ func TestMarkForDeletion(t *testing.T) {
})
}
}

func TestMarkForNoCompact(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)
ctx := context.Background()

tmpDir, err := ioutil.TempDir("", "test-block-mark-for-no-compact")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

for _, tcase := range []struct {
name string
preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)

blocksMarked int
}{
{
name: "block marked",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksMarked: 1,
},
{
name: "block with no-compact mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoCompactMark{
ID: id,
Time: time.Now().Unix(),
Version: metadata.NoCompactMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m)))
},
blocksMarked: 0,
},
} {
t.Run(tcase.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)

tcase.preUpload(t, id, bkt)

testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String())))

c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = MarkForNoCompact(ctx, log.NewNopLogger(), bkt, id, metadata.ManualNoCompactReason, "", c)
testutil.Ok(t, err)
testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
})
}
}
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type NoCompactMark struct {
// Version of the file.
Version int `json:"version"`

// Time is a unix timestamp of when the block was marked for no compact.
Time int64 `json:"time"`
Reason NoCompactReason `json:"reason"`
// Details is a human readable string giving details of reason.
Details string `json:"details"`
Expand Down
91 changes: 90 additions & 1 deletion pkg/compact/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@ package compact

import (
"context"
"fmt"
"math"
"path/filepath"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
)

type tsdbBasedPlanner struct {
Expand Down Expand Up @@ -36,7 +43,10 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact

// TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405
func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
noCompactMarked := p.noCompBlocksFunc()
return p.plan(p.noCompBlocksFunc(), metasByMinTime)
}

func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime))
for _, meta := range metasByMinTime {
if _, excluded := noCompactMarked[meta.ULID]; excluded {
Expand Down Expand Up @@ -203,3 +213,82 @@ func splitByRange(metasByMinTime []*metadata.Meta, tr int64) [][]*metadata.Meta

return splitDirs
}

type largeTotalIndexSizeFilter struct {
*tsdbBasedPlanner

bkt objstore.Bucket
markedForNoCompact prometheus.Counter
totalMaxIndexSizeBytes int64
}

var _ Planner = &largeTotalIndexSizeFilter{}

// WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size.
// When found, it marks block for no compaction by placing no-compact.json and updating cache.
// NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes.
// Adjust limit accordingly reducing to some % of actual limit you want to give.
// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390.
func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, totalMaxIndexSizeBytes int64, markedForNoCompact prometheus.Counter) *largeTotalIndexSizeFilter {
return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact}
}

func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
noCompactMarked := t.noCompBlocksFunc()
copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked))
for k, v := range noCompactMarked {
copiedNoCompactMarked[k] = v
}

PlanLoop:
for {
plan, err := t.plan(copiedNoCompactMarked, metasByMinTime)
if err != nil {
return nil, err
}
var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64
var biggestIndex int
for i, p := range plan {
indexSize := int64(-1)
for _, f := range p.Thanos.Files {
if f.RelPath == block.IndexFilename {
indexSize = f.SizeBytes
}
}
if indexSize <= 0 {
// Get size from bkt instead.
attr, err := t.bkt.Attributes(ctx, filepath.Join(p.ULID.String(), block.IndexFilename))
if err != nil {
return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(p.ULID.String(), block.IndexFilename))
}
indexSize = attr.Size
}

if maxIndexSize < indexSize {
maxIndexSize = indexSize
biggestIndex = i
}
totalIndexBytes += indexSize
if totalIndexBytes >= t.totalMaxIndexSizeBytes {
// Marking blocks for no compact to limit size.
// TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408
if err := block.MarkForNoCompact(
ctx,
t.logger,
t.bkt,
plan[biggestIndex].ULID,
metadata.IndexSizeExceedingNoCompactReason,
fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes),
t.markedForNoCompact,
); err != nil {
return nil, errors.Wrapf(err, "mark %v for no compaction", plan[biggestIndex].ULID.String())
}
// Make sure wrapped planner exclude this block.
copiedNoCompactMarked[plan[biggestIndex].ULID] = &metadata.NoCompactMark{ID: plan[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1}
continue PlanLoop
}
}
// Planned blocks should not exceed limit.
return plan, nil
}
}

0 comments on commit 99f0ee5

Please sign in to comment.