From 8d3ea521598cf8d37b6079eabe4e562939a3c66d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 6 Jul 2021 12:00:37 -0700 Subject: [PATCH] Add bucket tool retention command (#4406) * add bucket tool retention command Signed-off-by: ben.ye * update changelog Signed-off-by: ben.ye * add sync meta Signed-off-by: ben.ye --- CHANGELOG.md | 1 + cmd/thanos/tools_bucket.go | 116 +++++++++++++++++++++++++++++++++++++ docs/components/tools.md | 8 +++ 3 files changed, 125 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f9ce2ddec..972e5322e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4292](https://github.com/thanos-io/thanos/pull/4292) Receive: Enable exemplars ingestion and querying. - [#4392](https://github.com/thanos-io/thanos/pull/4392) Tools: Added `--delete-blocks` to bucket rewrite tool to mark the original blocks for deletion after rewriting is done. - [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token. +- [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket. ### Fixed diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index cad437f2b9..bc34a6a80a 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -27,6 +27,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + prommodel "github.com/prometheus/common/model" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" @@ -85,6 +86,7 @@ func registerBucket(app extkingpin.AppClause) { registerBucketCleanup(cmd, objStoreConfig) registerBucketMarkBlock(cmd, objStoreConfig) registerBucketRewrite(cmd, objStoreConfig) + registerBucketRetention(cmd, objStoreConfig) } func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { @@ -988,3 +990,117 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat return nil }) } + +func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { + var ( + retentionRaw, retentionFiveMin, retentionOneHr prommodel.Duration + ) + + cmd := app.Command("retention", "Retention applies retention policies on the given bucket. Please make sure no compactor is running on the same bucket at the same time.") + deleteDelay := cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket.").Default("48h").Duration() + consistencyDelay := cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)). + Default("30m").Duration() + blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). + Default("20").Int() + selectorRelabelConf := extkingpin.RegisterSelectorRelabelFlags(cmd) + cmd.Flag("retention.resolution-raw", + "How long to retain raw samples in bucket. Setting this to 0d will retain samples of this resolution forever"). + Default("0d").SetValue(&retentionRaw) + cmd.Flag("retention.resolution-5m", "How long to retain samples of resolution 1 (5 minutes) in bucket. Setting this to 0d will retain samples of this resolution forever"). + Default("0d").SetValue(&retentionFiveMin) + cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever"). + Default("0d").SetValue(&retentionOneHr) + cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { + retentionByResolution := map[compact.ResolutionLevel]time.Duration{ + compact.ResolutionLevelRaw: time.Duration(retentionRaw), + compact.ResolutionLevel5m: time.Duration(retentionFiveMin), + compact.ResolutionLevel1h: time.Duration(retentionOneHr), + } + + if retentionByResolution[compact.ResolutionLevelRaw].Seconds() != 0 { + level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw]) + } + if retentionByResolution[compact.ResolutionLevel5m].Seconds() != 0 { + level.Info(logger).Log("msg", "retention policy of 5 min aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel5m]) + } + if retentionByResolution[compact.ResolutionLevel1h].Seconds() != 0 { + level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) + } + + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + + relabelContentYaml, err := selectorRelabelConf.Content() + if err != nil { + return errors.Wrap(err, "get content of relabel configuration") + } + + relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions) + if err != nil { + return err + } + + bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Rewrite.String()) + if err != nil { + return err + } + + // Dummy actor to immediately kill the group after the run function returns. + g.Add(func() error { return nil }, func(error) {}) + + defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + + // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. + // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. + // This is to make sure compactor will not accidentally perform compactions with gap instead. + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, *deleteDelay/2, block.FetcherConcurrency) + duplicateBlocksFilter := block.NewDeduplicateFilter() + stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + + var sy *compact.Syncer + { + baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + if err != nil { + return errors.Wrap(err, "create meta fetcher") + } + cf := baseMetaFetcher.NewMetaFetcher( + extprom.WrapRegistererWithPrefix(extpromPrefix, reg), []block.MetadataFilter{ + block.NewLabelShardedMetaFilter(relabelConfig), + block.NewConsistencyDelayMetaFilter(logger, *consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)), + duplicateBlocksFilter, + ignoreDeletionMarkFilter, + }, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, make([]string, 0))}, + ) + sy, err = compact.NewMetaSyncer( + logger, + reg, + bkt, + cf, + duplicateBlocksFilter, + ignoreDeletionMarkFilter, + stubCounter, + stubCounter, + *blockSyncConcurrency) + if err != nil { + return errors.Wrap(err, "create syncer") + } + } + + ctx := context.Background() + level.Info(logger).Log("msg", "syncing blocks metadata") + if err := sy.SyncMetas(ctx); err != nil { + return errors.Wrap(err, "sync blocks") + } + + level.Info(logger).Log("msg", "synced blocks done") + + level.Warn(logger).Log("msg", "GLOBAL COMPACTOR SHOULD __NOT__ BE RUNNING ON THE SAME BUCKET") + + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, stubCounter); err != nil { + return errors.Wrap(err, "retention failed") + } + return nil + }) +} diff --git a/docs/components/tools.md b/docs/components/tools.md index 86bd401f89..a6dc7925ee 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -80,6 +80,10 @@ Subcommands: *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first. + tools bucket retention [] + Retention applies retention policies on the given bucket. Please make sure + no compactor is running on the same bucket at the same time. + tools rules-check --rules=RULES Check if the rule files are valid or not. @@ -184,6 +188,10 @@ Subcommands: *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first. + tools bucket retention [] + Retention applies retention policies on the given bucket. Please make sure + no compactor is running on the same bucket at the same time. + ```