Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd: compact: clean partial / marked blocks concurrently #3115

Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

### Added

- [#3115](https://github.com/thanos-io/thanos/pull/3115) compact: now deletes partially uploaded and blocks with deletion marks concurrently. It does that at the beginning and then every `--compact.cleanup-interval` time period. By default it is 5 minutes.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - in release process.

:warning: **WARNING** :warning: Thanos Rule's `/api/v1/rules` endpoint no longer returns the old, deprecated `partial_response_strategy`. The old, deprecated value has been fixed to `WARN` for quite some time. _Please_ use `partialResponseStrategy`.
Expand Down
57 changes: 51 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -295,6 +296,35 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

var cleanMtx sync.Mutex
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
cleanPartialMarked := func() error {
cleanMtx.Lock()
defer cleanMtx.Unlock()

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning marked blocks")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One nit, is it better to be cleaning marked blocks? error seems redundant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's do this. Still valid comment 👍

}

if err := sy.SyncMetas(ctx); err != nil {
level.Error(logger).Log("msg", "failed to sync metas", "err", err)
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// Do it once at the beginning to ensure that it runs at least once before
// the main loop.
if err := sy.SyncMetas(ctx); err != nil {
cancel()
return errors.Wrap(err, "syncing metas")
}
if err := cleanPartialMarked(); err != nil {
cancel()
return errors.Wrap(err, "cleaning partial and marked blocks")
}

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction")
Expand Down Expand Up @@ -339,12 +369,7 @@ func runCompact(
return errors.Wrap(err, "retention failed")
}

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}
return nil
return cleanPartialMarked()
}

g.Add(func() error {
Expand Down Expand Up @@ -416,6 +441,23 @@ func runCompact(

srv.Handle("/", r)

// Periodically remove partial blocks and blocks marked for deletion
// since one iteration potentially could take a long time.
if conf.cleanupBlocksInterval > 0 {
g.Add(func() error {
// Wait the whole period at the beginning because we've executed this on boot.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... why not just removing this and removing boot time execution? (: Same stuff right?

Copy link
Member Author

@GiedriusS GiedriusS Oct 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: actually gave it a second thought. We need to explicitly run it at boot time to make sure that we don't have flaky tests because we depend there on a failure happening and a cleanup. It's not guaranteed to happen if we do everything concurrently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds wrong that we do more complex code only because we don't want to change tests 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests would be much more complex and probably out of the scope of this PR. Actually, it's not just that, I think it's nice that we do this at least once. Imagine where someone doesn't use --wait and the whole Thanos Compact process ended before the clean-up has happened. Space usage would never go down in the remote object storage even though it could. And the user then could be charged more as a result of this not happening.

select {
case <-time.After(conf.cleanupBlocksInterval):
case <-ctx.Done():
}
return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), func() error {
return cleanPartialMarked()
})
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}, func(error) {
cancel()
})
}

g.Add(func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, conf.waitInterval)
_, _, _ = f.Fetch(iterCtx)
Expand Down Expand Up @@ -458,6 +500,7 @@ type compactConfig struct {
disableDownsampling bool
blockSyncConcurrency int
blockViewerSyncBlockInterval time.Duration
cleanupBlocksInterval time.Duration
compactionConcurrency int
deleteDelay model.Duration
dedupReplicaLabels []string
Expand Down Expand Up @@ -507,6 +550,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("20").IntVar(&cc.blockSyncConcurrency)
cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI.").
Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval)
cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration.").
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
Default("5m").DurationVar(&cc.cleanupBlocksInterval)

cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").IntVar(&cc.compactionConcurrency)
Expand Down
6 changes: 6 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ Flags:
Repeat interval for syncing the blocks between
local and remote view for /global Block Viewer
UI.
--compact.cleanup-interval=5m
How often we should clean up partially uploaded
blocks and blocks with deletion mark in the
background when --wait has been enabled. Setting
it to "0s" disables it - the cleaning will only
happen at the end of an iteration.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.
--delete-delay=48h Time before a block marked for deletion is
Expand Down
18 changes: 12 additions & 6 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,10 @@ func TestCompactWithStoreGateway(t *testing.T) {
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}},
}

t.Run("no replica label with overlaps should halt compactor", func(t *testing.T) {
// No replica label with overlaps should halt compactor. This test is sequential
// because we do not want two Thanos Compact instances deleting the same partially
// uploaded blocks and blocks with deletion marks.
{
c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
Expand All @@ -499,22 +502,25 @@ func TestCompactWithStoreGateway(t *testing.T) {

// We expect no ops.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_compaction_runs_started_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_group_compaction_runs_completed_total"))

// However, the blocks have been cleaned because that happens concurrently.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))

// Ensure bucket UI.
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global"))
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded"))

testutil.Ok(t, s.Stop(c))
})
}

t.Run("dedup enabled; compactor should work as expected", func(t *testing.T) {
// We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction.
c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica")
Expand All @@ -524,10 +530,10 @@ func TestCompactWithStoreGateway(t *testing.T) {
// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))
Expand Down