diff --git a/CHANGELOG.md b/CHANGELOG.md index 6496d6b18d..f9a2179035 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,13 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2667](https://github.com/thanos-io/thanos/pull/2667) Store: removed support to the legacy `index.cache.json`. The hidden flag `--store.disable-index-header` was removed. - [#2667](https://github.com/thanos-io/thanos/pull/2667) Compact: the deprecated flag `--index.generate-missing-cache-file` and the metric `thanos_compact_generated_index_total` were removed. - [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA). +- [#2671](https://github.com/thanos-io/thanos/pull/2671) *breaking* Tools: bucket replicate flag `--resolution` is now in Go duration format. +- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now replicates by default all blocks. + + +### Added + +- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now allows passing repeated `--compaction` and `--resolution` flags. ## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index bfb48c7a9a..9d7d27de4d 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -414,19 +414,17 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str // Provide a list of resolution, can not use Enum directly, since string does not implement int64 function. func listResLevel() []string { return []string{ - strconv.FormatInt(downsample.ResLevel0, 10), - strconv.FormatInt(downsample.ResLevel1, 10), - strconv.FormatInt(downsample.ResLevel2, 10)} + time.Duration(downsample.ResLevel0).String(), + time.Duration(downsample.ResLevel1).String(), + time.Duration(downsample.ResLevel2).String()} } func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) { cmd := root.Command("replicate", fmt.Sprintf("Replicate data from one object storage to another. NOTE: Currently it works only with Thanos blocks (%v has to have Thanos metadata).", block.MetaFilename)) httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) toObjStoreConfig := regCommonObjStoreFlags(cmd, "-to", false, "The object storage which replicate data to.") - // TODO(bwplotka): Allow to replicate many resolution levels. - resolution := cmd.Flag("resolution", "Only blocks with this resolution will be replicated.").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64() - // TODO(bwplotka): Allow to replicate many compaction levels. - compaction := cmd.Flag("compaction", "Only blocks with this compaction level will be replicated.").Default("1").Int() + resolutions := cmd.Flag("resolution", "Only blocks with these resolutions will be replicated. Repeated flag.").Default("0s", "5m", "1h").HintAction(listResLevel).DurationList() + compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints() matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings() singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool() @@ -436,6 +434,11 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na return errors.Wrap(err, "parse block label matchers") } + var resolutionLevels []compact.ResolutionLevel + for _, lvl := range *resolutions { + resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl.Milliseconds())) + } + return replicate.RunReplicate( g, logger, @@ -444,8 +447,8 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na *httpBindAddr, time.Duration(*httpGracePeriod), matchers, - compact.ResolutionLevel(*resolution), - *compaction, + resolutionLevels, + *compactions, objStoreConfig, toObjStoreConfig, *singleRun, diff --git a/docs/components/tools.md b/docs/components/tools.md index feb5ace116..a5ca3c853c 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -451,10 +451,10 @@ Flags: format details: https://thanos.io/storage.md/#configuration The object storage which replicate data to. - --resolution=0 Only blocks with this resolution will be - replicated. - --compaction=1 Only blocks with this compaction level will be - replicated. + --resolution=0s... ... Only blocks with these resolutions will be + replicated. Repeated flag. + --compaction=1... ... Only blocks with these compaction levels will + be replicated. Repeated flag. --matcher=key="value" ... Only blocks whose external labels exactly match this matcher will be replicated. --single-run Run replication only one time, then exit. diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 82455be46d..7f3b519fba 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -69,8 +69,8 @@ func RunReplicate( httpBindAddr string, httpGracePeriod time.Duration, labelSelector labels.Selector, - resolution compact.ResolutionLevel, - compaction int, + resolutions []compact.ResolutionLevel, + compactions []int, fromObjStoreConfig *extflag.PathOrContent, toObjStoreConfig *extflag.PathOrContent, singleRun bool, @@ -159,8 +159,8 @@ func RunReplicate( blockFilter := NewBlockFilter( logger, labelSelector, - resolution, - compaction, + resolutions, + compactions, ).Filter metrics := newReplicationMetrics(reg) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 5402dd9a78..2a97c0a5e8 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -6,6 +6,7 @@ package replicate import ( "bytes" "context" + "fmt" "io" "io/ioutil" "path" @@ -27,24 +28,32 @@ import ( // BlockFilter is block filter that filters out compacted and unselected blocks. type BlockFilter struct { - logger log.Logger - labelSelector labels.Selector - resolutionLevel compact.ResolutionLevel - compactionLevel int + logger log.Logger + labelSelector labels.Selector + resolutionLevels map[compact.ResolutionLevel]struct{} + compactionLevels map[int]struct{} } // NewBlockFilter returns block filter. func NewBlockFilter( logger log.Logger, labelSelector labels.Selector, - resolutionLevel compact.ResolutionLevel, - compactionLevel int, + resolutionLevels []compact.ResolutionLevel, + compactionLevels []int, ) *BlockFilter { + allowedResolutions := make(map[compact.ResolutionLevel]struct{}) + for _, resolutionLevel := range resolutionLevels { + allowedResolutions[resolutionLevel] = struct{}{} + } + allowedCompactions := make(map[int]struct{}) + for _, compactionLevel := range compactionLevels { + allowedCompactions[compactionLevel] = struct{}{} + } return &BlockFilter{ - labelSelector: labelSelector, - logger: logger, - resolutionLevel: resolutionLevel, - compactionLevel: compactionLevel, + labelSelector: labelSelector, + logger: logger, + resolutionLevels: allowedResolutions, + compactionLevels: allowedCompactions, } } @@ -77,20 +86,14 @@ func (bf *BlockFilter) Filter(b *metadata.Meta) bool { } gotResolution := compact.ResolutionLevel(b.Thanos.Downsample.Resolution) - expectedResolution := bf.resolutionLevel - - resolutionMatch := gotResolution == expectedResolution - if !resolutionMatch { - level.Debug(bf.logger).Log("msg", "filtering block", "reason", "resolutions don't match", "got_resolution", gotResolution, "expected_resolution", expectedResolution) + if _, ok := bf.resolutionLevels[gotResolution]; !ok { + level.Info(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", fmt.Sprintf("%v", bf.resolutionLevels)) return false } gotCompactionLevel := b.BlockMeta.Compaction.Level - expectedCompactionLevel := bf.compactionLevel - - compactionMatch := gotCompactionLevel == expectedCompactionLevel - if !compactionMatch { - level.Debug(bf.logger).Log("msg", "filtering block", "reason", "compaction levels don't match", "got_compaction_level", gotCompactionLevel, "expected_compaction_level", expectedCompactionLevel) + if _, ok := bf.compactionLevels[gotCompactionLevel]; !ok { + level.Info(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", fmt.Sprintf("%v", bf.compactionLevels)) return false } @@ -211,31 +214,23 @@ func (rs *replicationScheme) execute(ctx context.Context) error { return nil } - level.Debug(rs.logger).Log("msg", "adding block to available blocks", "block_uuid", id.String()) - - availableBlocks = append(availableBlocks, meta) + if rs.blockFilter(meta) { + level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String()) + availableBlocks = append(availableBlocks, meta) + } return nil }); err != nil { return errors.Wrap(err, "iterate over origin bucket") } - candidateBlocks := []*metadata.Meta{} - - for _, b := range availableBlocks { - if rs.blockFilter(b) { - level.Debug(rs.logger).Log("msg", "adding block to candidate blocks", "block_uuid", b.BlockMeta.ULID.String()) - candidateBlocks = append(candidateBlocks, b) - } - } - // In order to prevent races in compactions by the target environment, we // need to replicate oldest start timestamp first. - sort.Slice(candidateBlocks, func(i, j int) bool { - return candidateBlocks[i].BlockMeta.MinTime < candidateBlocks[j].BlockMeta.MinTime + sort.Slice(availableBlocks, func(i, j int) bool { + return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime }) - for _, b := range candidateBlocks { + for _, b := range availableBlocks { if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String()) } diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index fc9e66943d..e8686f90ef 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -311,7 +311,7 @@ func TestReplicationSchemeAll(t *testing.T) { selector = c.selector } - filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter + filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}).Filter fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil) testutil.Ok(t, err)