From 00800083cb37dad4c1c83b4acc323add0f6f07f9 Mon Sep 17 00:00:00 2001 From: Thomas Sidereal Date: Thu, 16 May 2024 18:24:23 +0300 Subject: [PATCH] Replicate: Add flag to mark blocks for deletion after replication Signed-off-by: Thomas Sidereal <53402621+Hatry1337@users.noreply.github.com> --- cmd/thanos/tools_bucket.go | 6 ++++- pkg/replicate/replicator.go | 3 ++- pkg/replicate/scheme.go | 47 ++++++++++++++++++++++++++++++++++--- 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 326e4b09eb..c5b8c86e83 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -131,6 +131,7 @@ type bucketReplicateConfig struct { compactions []int matcherStrs string singleRun bool + markAfter bool } type bucketDownsampleConfig struct { @@ -229,6 +230,8 @@ func (tbc *bucketReplicateConfig) registerBucketReplicateFlag(cmd extkingpin.Fla cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").BoolVar(&tbc.singleRun) + cmd.Flag("mark-after", "Mark replicated blocks for deletion in source bucket.").Default("false").BoolVar(&tbc.markAfter) + return tbc } @@ -782,10 +785,11 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P objStoreConfig, toObjStoreConfig, tbc.singleRun, + tbc.markAfter, minTime, maxTime, blockIDs, - *ignoreMarkedForDeletion, + *ignoreMarkedForDeletion || tbc.markAfter, ) }) } diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 668d64afce..d25dc6b80f 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -73,6 +73,7 @@ func RunReplicate( fromObjStoreConfig *extflag.PathOrContent, toObjStoreConfig *extflag.PathOrContent, singleRun bool, + markAfter bool, minTime, maxTime *thanosmodel.TimeOrDurationValue, blockIDs []ulid.ULID, ignoreMarkedForDeletion bool, @@ -188,7 +189,7 @@ func RunReplicate( logger := log.With(logger, "replication-run-id", runID.String()) level.Info(logger).Log("msg", "running replication attempt") - if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil { + if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg, markAfter).execute(ctx); err != nil { return errors.Wrap(err, "replication execute") } diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 31249fbf77..fa75ce8fe7 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" thanosblock "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -109,7 +110,7 @@ type blockFilterFunc func(b *metadata.Meta) bool // TODO: Add filters field. type replicationScheme struct { - fromBkt objstore.InstrumentedBucketReader + fromBkt objstore.InstrumentedBucket toBkt objstore.Bucket blockFilter blockFilterFunc @@ -119,6 +120,8 @@ type replicationScheme struct { metrics *replicationMetrics reg prometheus.Registerer + + markAfter bool } type replicationMetrics struct { @@ -150,9 +153,10 @@ func newReplicationScheme( metrics *replicationMetrics, blockFilter blockFilterFunc, fetcher thanosblock.MetadataFetcher, - from objstore.InstrumentedBucketReader, + from objstore.InstrumentedBucket, to objstore.Bucket, reg prometheus.Registerer, + markAfter bool, ) *replicationScheme { if logger == nil { logger = log.NewNopLogger() @@ -166,6 +170,7 @@ func newReplicationScheme( toBkt: to, metrics: metrics, reg: reg, + markAfter: markAfter, } } @@ -246,7 +251,26 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli // If the origin meta file content and target meta file content is // equal, we know we have already successfully replicated // previously. - level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID) + if rs.markAfter { + deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename) + deletionMarkExists, err := rs.fromBkt.Exists(ctx, deletionMarkFile) + if err != nil { + return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile) + } + + if !deletionMarkExists { + level.Debug(rs.logger).Log("msg", "marking block for deletion as already replicated", "block_uuid", blockID) + if err := block.MarkForDeletion(ctx, rs.logger, rs.fromBkt, id, "marked for deletion by thanos bucket replicate", promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil { + return errors.Wrapf(err, "mark %v for deletion", id) + } + } else { + level.Debug(rs.logger).Log("msg", "block already marked for deletion as already replicated", "block_uuid", blockID) + } + + } else { + level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID) + } + rs.metrics.blocksAlreadyReplicated.Inc() return nil @@ -274,6 +298,23 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli return errors.Wrap(err, "upload meta file") } + if rs.markAfter { + deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename) + deletionMarkExists, err := rs.fromBkt.Exists(ctx, deletionMarkFile) + if err != nil { + return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile) + } + + if !deletionMarkExists { + level.Debug(rs.logger).Log("msg", "marking block for deletion", "block_uuid", blockID) + if err := block.MarkForDeletion(ctx, rs.logger, rs.fromBkt, id, "marked for deletion by thanos bucket replicate", promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil { + return errors.Wrapf(err, "mark %v for deletion", id) + } + } else { + level.Debug(rs.logger).Log("msg", "block already marked for deletion", "block_uuid", blockID) + } + } + rs.metrics.blocksReplicated.Inc() return nil