Skip to content

Commit

Permalink
Replicate: Add flag to mark blocks for deletion after replication
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Sidereal <53402621+Hatry1337@users.noreply.github.com>
  • Loading branch information
Thomas Sidereal authored and Hatry1337 committed May 16, 2024
1 parent 9707a4f commit 0080008
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 5 deletions.
6 changes: 5 additions & 1 deletion cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type bucketReplicateConfig struct {
compactions []int
matcherStrs string
singleRun bool
markAfter bool
}

type bucketDownsampleConfig struct {
Expand Down Expand Up @@ -229,6 +230,8 @@ func (tbc *bucketReplicateConfig) registerBucketReplicateFlag(cmd extkingpin.Fla

cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").BoolVar(&tbc.singleRun)

cmd.Flag("mark-after", "Mark replicated blocks for deletion in source bucket.").Default("false").BoolVar(&tbc.markAfter)

return tbc
}

Expand Down Expand Up @@ -782,10 +785,11 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
objStoreConfig,
toObjStoreConfig,
tbc.singleRun,
tbc.markAfter,
minTime,
maxTime,
blockIDs,
*ignoreMarkedForDeletion,
*ignoreMarkedForDeletion || tbc.markAfter,
)
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func RunReplicate(
fromObjStoreConfig *extflag.PathOrContent,
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
markAfter bool,
minTime, maxTime *thanosmodel.TimeOrDurationValue,
blockIDs []ulid.ULID,
ignoreMarkedForDeletion bool,
Expand Down Expand Up @@ -188,7 +189,7 @@ func RunReplicate(
logger := log.With(logger, "replication-run-id", runID.String())
level.Info(logger).Log("msg", "running replication attempt")

if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil {
if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg, markAfter).execute(ctx); err != nil {
return errors.Wrap(err, "replication execute")
}

Expand Down
47 changes: 44 additions & 3 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block"
thanosblock "github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand Down Expand Up @@ -109,7 +110,7 @@ type blockFilterFunc func(b *metadata.Meta) bool

// TODO: Add filters field.
type replicationScheme struct {
fromBkt objstore.InstrumentedBucketReader
fromBkt objstore.InstrumentedBucket
toBkt objstore.Bucket

blockFilter blockFilterFunc
Expand All @@ -119,6 +120,8 @@ type replicationScheme struct {
metrics *replicationMetrics

reg prometheus.Registerer

markAfter bool
}

type replicationMetrics struct {
Expand Down Expand Up @@ -150,9 +153,10 @@ func newReplicationScheme(
metrics *replicationMetrics,
blockFilter blockFilterFunc,
fetcher thanosblock.MetadataFetcher,
from objstore.InstrumentedBucketReader,
from objstore.InstrumentedBucket,
to objstore.Bucket,
reg prometheus.Registerer,
markAfter bool,
) *replicationScheme {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -166,6 +170,7 @@ func newReplicationScheme(
toBkt: to,
metrics: metrics,
reg: reg,
markAfter: markAfter,
}
}

Expand Down Expand Up @@ -246,7 +251,26 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
// If the origin meta file content and target meta file content is
// equal, we know we have already successfully replicated
// previously.
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID)
if rs.markAfter {
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
deletionMarkExists, err := rs.fromBkt.Exists(ctx, deletionMarkFile)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile)
}

if !deletionMarkExists {
level.Debug(rs.logger).Log("msg", "marking block for deletion as already replicated", "block_uuid", blockID)
if err := block.MarkForDeletion(ctx, rs.logger, rs.fromBkt, id, "marked for deletion by thanos bucket replicate", promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for deletion", id)
}
} else {
level.Debug(rs.logger).Log("msg", "block already marked for deletion as already replicated", "block_uuid", blockID)
}

} else {
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID)
}

rs.metrics.blocksAlreadyReplicated.Inc()

return nil
Expand Down Expand Up @@ -274,6 +298,23 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
return errors.Wrap(err, "upload meta file")
}

if rs.markAfter {
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
deletionMarkExists, err := rs.fromBkt.Exists(ctx, deletionMarkFile)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile)
}

if !deletionMarkExists {
level.Debug(rs.logger).Log("msg", "marking block for deletion", "block_uuid", blockID)
if err := block.MarkForDeletion(ctx, rs.logger, rs.fromBkt, id, "marked for deletion by thanos bucket replicate", promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for deletion", id)
}
} else {
level.Debug(rs.logger).Log("msg", "block already marked for deletion", "block_uuid", blockID)
}
}

rs.metrics.blocksReplicated.Inc()

return nil
Expand Down

0 comments on commit 0080008

Please sign in to comment.