Skip to content

Commit

Permalink
compact: do not cleanup blocks on boot
Browse files Browse the repository at this point in the history
Do not cleanup blocks on boot because in some very bad cases there could
be thousands of blocks ready-to-be deleted and doing that makes Thanos
Compact exceed `initialDelaySeconds` on k8s.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Dec 2, 2020
1 parent a57bbfb commit cf9ef37
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 25 deletions.
30 changes: 10 additions & 20 deletions cmd/thanos/compact.go
Expand Up @@ -113,6 +113,10 @@ func runCompact(
Name: "thanos_compact_iterations_total",
Help: "Total number of iterations that were executed successfully.",
})
cleanups := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_block_cleanup_loops_total",
Help: "Total number of concurrent cleanup loops of partially uploaded blocks and marked blocks that were executed successfully.",
})
partialUploadDeleteAttempts := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
Expand Down Expand Up @@ -339,29 +343,20 @@ func runCompact(
cleanMtx.Lock()
defer cleanMtx.Unlock()

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
if err := sy.SyncMetas(ctx); err != nil {
cancel()
return errors.Wrap(err, "syncing metas")
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "cleaning marked blocks")
}
cleanups.Inc()

if err := sy.SyncMetas(ctx); err != nil {
level.Error(logger).Log("msg", "failed to sync metas", "err", err)
}
return nil
}

// Do it once at the beginning to ensure that it runs at least once before
// the main loop.
if err := sy.SyncMetas(ctx); err != nil {
cancel()
return errors.Wrap(err, "syncing metas")
}
if err := cleanPartialMarked(); err != nil {
cancel()
return errors.Wrap(err, "cleaning partial and marked blocks")
}

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction")
Expand Down Expand Up @@ -481,11 +476,6 @@ func runCompact(
// since one iteration potentially could take a long time.
if conf.cleanupBlocksInterval > 0 {
g.Add(func() error {
// Wait the whole period at the beginning because we've executed this on boot.
select {
case <-time.After(conf.cleanupBlocksInterval):
case <-ctx.Done():
}
return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), cleanPartialMarked)
}, func(error) {
cancel()
Expand Down
12 changes: 7 additions & 5 deletions test/e2e/compact_test.go
Expand Up @@ -535,12 +535,14 @@ func TestCompactWithStoreGateway(t *testing.T) {
c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

// Expect compactor halted.
// Expect compactor halted and for one cleanup iteration to happen.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_halted"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_block_cleanup_loops_total"))

testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+5)), "thanos_blocks_meta_synced"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

// The compact directory is still there.
dataDir := filepath.Join(s.SharedDir(), "data", "compact", "expect-to-halt")
Expand All @@ -559,8 +561,8 @@ func TestCompactWithStoreGateway(t *testing.T) {
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_group_compaction_runs_completed_total"))

// However, the blocks have been cleaned because that happens concurrently.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_blocks_cleaned_total"))

// Ensure bucket UI.
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global"))
Expand Down

0 comments on commit cf9ef37

Please sign in to comment.