From f5b7c6bdd1e0d0fe022891d04fb5971a2f59c76f Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Wed, 27 May 2020 17:24:33 +0200 Subject: [PATCH 1/8] feat tools: bucket replicate allows to filter multiple compaction and resolution lvls Signed-off-by: Martin Chodur --- CHANGELOG.md | 3 +++ cmd/thanos/tools_bucket.go | 15 ++++++++----- docs/components/tools.md | 6 ++--- pkg/objstore/swift/swift.go | 2 +- pkg/replicate/replicator.go | 8 +++---- pkg/replicate/scheme.go | 44 ++++++++++++++++++++++--------------- 6 files changed, 46 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6496d6b18d..9fd5aaf5fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,9 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2667](https://github.com/thanos-io/thanos/pull/2667) Compact: the deprecated flag `--index.generate-missing-cache-file` and the metric `thanos_compact_generated_index_total` were removed. - [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA). +### Added +- [#TBA](https://github.com/thanos-io/thanos/pull/TBA) Tools: bucket replicate now allows passing repeated `--compaction` and `--resolution` flags. + ## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS ### Fixed diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index bfb48c7a9a..c3945cac47 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -423,13 +423,16 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na cmd := root.Command("replicate", fmt.Sprintf("Replicate data from one object storage to another. NOTE: Currently it works only with Thanos blocks (%v has to have Thanos metadata).", block.MetaFilename)) httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) toObjStoreConfig := regCommonObjStoreFlags(cmd, "-to", false, "The object storage which replicate data to.") - // TODO(bwplotka): Allow to replicate many resolution levels. - resolution := cmd.Flag("resolution", "Only blocks with this resolution will be replicated.").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64() - // TODO(bwplotka): Allow to replicate many compaction levels. - compaction := cmd.Flag("compaction", "Only blocks with this compaction level will be replicated.").Default("1").Int() + resolutions := cmd.Flag("resolution", "Only blocks with this resolution will be replicated. (Resolution in ms)").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64List() + compactions := cmd.Flag("compaction", "Only blocks with this compaction level will be replicated.").Default("1").Ints() matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings() singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool() + var resolutionLevels []compact.ResolutionLevel + for _, lvl := range *resolutions { + resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl)) + } + m[name+" replicate"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { matchers, err := replicate.ParseFlagMatchers(*matcherStrs) if err != nil { @@ -444,8 +447,8 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na *httpBindAddr, time.Duration(*httpGracePeriod), matchers, - compact.ResolutionLevel(*resolution), - *compaction, + resolutionLevels, + *compactions, objStoreConfig, toObjStoreConfig, *singleRun, diff --git a/docs/components/tools.md b/docs/components/tools.md index feb5ace116..e225730c50 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -451,9 +451,9 @@ Flags: format details: https://thanos.io/storage.md/#configuration The object storage which replicate data to. - --resolution=0 Only blocks with this resolution will be - replicated. - --compaction=1 Only blocks with this compaction level will be + --resolution=0 ... Only blocks with this resolution will be + replicated. (Resolution in ms) + --compaction=1 ... Only blocks with this compaction level will be replicated. --matcher=key="value" ... Only blocks whose external labels exactly match this matcher will be replicated. diff --git a/pkg/objstore/swift/swift.go b/pkg/objstore/swift/swift.go index c11715ce4f..f309150c50 100644 --- a/pkg/objstore/swift/swift.go +++ b/pkg/objstore/swift/swift.go @@ -251,7 +251,7 @@ func configFromEnv() SwiftConfig { ProjectName: os.Getenv("OS_PROJECT_NAME"), UserDomainID: os.Getenv("OS_USER_DOMAIN_ID"), UserDomainName: os.Getenv("OS_USER_DOMAIN_NAME"), - ProjectDomainID: os.Getenv("OS_PROJET_DOMAIN_ID"), + ProjectDomainID: os.Getenv("OS_PROJECT_DOMAIN_ID"), ProjectDomainName: os.Getenv("OS_PROJECT_DOMAIN_NAME"), } diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 82455be46d..7f3b519fba 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -69,8 +69,8 @@ func RunReplicate( httpBindAddr string, httpGracePeriod time.Duration, labelSelector labels.Selector, - resolution compact.ResolutionLevel, - compaction int, + resolutions []compact.ResolutionLevel, + compactions []int, fromObjStoreConfig *extflag.PathOrContent, toObjStoreConfig *extflag.PathOrContent, singleRun bool, @@ -159,8 +159,8 @@ func RunReplicate( blockFilter := NewBlockFilter( logger, labelSelector, - resolution, - compaction, + resolutions, + compactions, ).Filter metrics := newReplicationMetrics(reg) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 5402dd9a78..d1695e08e3 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -27,24 +27,24 @@ import ( // BlockFilter is block filter that filters out compacted and unselected blocks. type BlockFilter struct { - logger log.Logger - labelSelector labels.Selector - resolutionLevel compact.ResolutionLevel - compactionLevel int + logger log.Logger + labelSelector labels.Selector + resolutionLevels []compact.ResolutionLevel + compactionLevels []int } // NewBlockFilter returns block filter. func NewBlockFilter( logger log.Logger, labelSelector labels.Selector, - resolutionLevel compact.ResolutionLevel, - compactionLevel int, + resolutionLevels []compact.ResolutionLevel, + compactionLevels []int, ) *BlockFilter { return &BlockFilter{ - labelSelector: labelSelector, - logger: logger, - resolutionLevel: resolutionLevel, - compactionLevel: compactionLevel, + labelSelector: labelSelector, + logger: logger, + resolutionLevels: resolutionLevels, + compactionLevels: compactionLevels, } } @@ -77,20 +77,28 @@ func (bf *BlockFilter) Filter(b *metadata.Meta) bool { } gotResolution := compact.ResolutionLevel(b.Thanos.Downsample.Resolution) - expectedResolution := bf.resolutionLevel - - resolutionMatch := gotResolution == expectedResolution + resolutionMatch := false + for _, allowedResolution := range bf.resolutionLevels { + if gotResolution == allowedResolution { + resolutionMatch = true + break + } + } if !resolutionMatch { - level.Debug(bf.logger).Log("msg", "filtering block", "reason", "resolutions don't match", "got_resolution", gotResolution, "expected_resolution", expectedResolution) + level.Debug(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", bf.resolutionLevels) return false } gotCompactionLevel := b.BlockMeta.Compaction.Level - expectedCompactionLevel := bf.compactionLevel - - compactionMatch := gotCompactionLevel == expectedCompactionLevel + compactionMatch := false + for _, allowedCompactionLevel := range bf.compactionLevels { + if gotCompactionLevel == allowedCompactionLevel { + compactionMatch = true + break + } + } if !compactionMatch { - level.Debug(bf.logger).Log("msg", "filtering block", "reason", "compaction levels don't match", "got_compaction_level", gotCompactionLevel, "expected_compaction_level", expectedCompactionLevel) + level.Debug(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", bf.compactionLevels) return false } From 71cad331ec40b26973b3e22a01603a7d51596214 Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Thu, 28 May 2020 09:20:20 +0200 Subject: [PATCH 2/8] chore: update changelog Signed-off-by: Martin Chodur --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fd5aaf5fe..56f1a61db2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA). ### Added -- [#TBA](https://github.com/thanos-io/thanos/pull/TBA) Tools: bucket replicate now allows passing repeated `--compaction` and `--resolution` flags. +- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now allows passing repeated `--compaction` and `--resolution` flags. ## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS From 6871581b47c4b320aa14e3c1e501a5d198737df5 Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Thu, 28 May 2020 09:30:11 +0200 Subject: [PATCH 3/8] Update cmd/thanos/tools_bucket.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Martin Chodur --- cmd/thanos/tools_bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index c3945cac47..d3492dc5f2 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -424,7 +424,7 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) toObjStoreConfig := regCommonObjStoreFlags(cmd, "-to", false, "The object storage which replicate data to.") resolutions := cmd.Flag("resolution", "Only blocks with this resolution will be replicated. (Resolution in ms)").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64List() - compactions := cmd.Flag("compaction", "Only blocks with this compaction level will be replicated.").Default("1").Ints() + compactions := cmd.Flag("compaction", "Only blocks with those compaction levels will be replicated. Repeated flag.").Default("1").Ints() matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings() singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool() From f3f8cafec00cd0785a1c1d71939fdb05c8221554 Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Thu, 28 May 2020 09:33:00 +0200 Subject: [PATCH 4/8] Update cmd/thanos/tools_bucket.go Co-authored-by: Bartlomiej Plotka Signed-off-by: Martin Chodur --- cmd/thanos/tools_bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index d3492dc5f2..1dbacdae7e 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -423,7 +423,7 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na cmd := root.Command("replicate", fmt.Sprintf("Replicate data from one object storage to another. NOTE: Currently it works only with Thanos blocks (%v has to have Thanos metadata).", block.MetaFilename)) httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) toObjStoreConfig := regCommonObjStoreFlags(cmd, "-to", false, "The object storage which replicate data to.") - resolutions := cmd.Flag("resolution", "Only blocks with this resolution will be replicated. (Resolution in ms)").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64List() + resolutions := cmd.Flag("resolution", "Only blocks with those resolutions will be replicated. (Resolution in ms). Repeated flag.").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64List() compactions := cmd.Flag("compaction", "Only blocks with those compaction levels will be replicated. Repeated flag.").Default("1").Ints() matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings() singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool() From 6630a22e9042d6bd0ba3a9293b922075b9e0556f Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Thu, 28 May 2020 09:36:21 +0200 Subject: [PATCH 5/8] fix: revert of unrelated swift ENV variable fix Signed-off-by: Martin Chodur --- pkg/objstore/swift/swift.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/objstore/swift/swift.go b/pkg/objstore/swift/swift.go index f309150c50..c11715ce4f 100644 --- a/pkg/objstore/swift/swift.go +++ b/pkg/objstore/swift/swift.go @@ -251,7 +251,7 @@ func configFromEnv() SwiftConfig { ProjectName: os.Getenv("OS_PROJECT_NAME"), UserDomainID: os.Getenv("OS_USER_DOMAIN_ID"), UserDomainName: os.Getenv("OS_USER_DOMAIN_NAME"), - ProjectDomainID: os.Getenv("OS_PROJECT_DOMAIN_ID"), + ProjectDomainID: os.Getenv("OS_PROJET_DOMAIN_ID"), ProjectDomainName: os.Getenv("OS_PROJECT_DOMAIN_NAME"), } From 679a23459d0e9556168e2c206ea2827a89d9f769 Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Thu, 28 May 2020 10:26:30 +0200 Subject: [PATCH 6/8] refactor tools: changed check of valid retention and compaction lvls to use map Signed-off-by: Martin Chodur --- pkg/replicate/scheme.go | 34 ++++++++++++++-------------------- pkg/replicate/scheme_test.go | 2 +- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index d1695e08e3..bae7fa1f6e 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -29,8 +29,8 @@ import ( type BlockFilter struct { logger log.Logger labelSelector labels.Selector - resolutionLevels []compact.ResolutionLevel - compactionLevels []int + resolutionLevels map[compact.ResolutionLevel]struct{} + compactionLevels map[int]struct{} } // NewBlockFilter returns block filter. @@ -40,11 +40,19 @@ func NewBlockFilter( resolutionLevels []compact.ResolutionLevel, compactionLevels []int, ) *BlockFilter { + allowedResolutions := make(map[compact.ResolutionLevel]struct{}) + for _, resolutionLevel := range resolutionLevels { + allowedResolutions[resolutionLevel] = struct{}{} + } + allowedCompactions := make(map[int]struct{}) + for _, compactionLevel := range compactionLevels { + allowedCompactions[compactionLevel] = struct{}{} + } return &BlockFilter{ labelSelector: labelSelector, logger: logger, - resolutionLevels: resolutionLevels, - compactionLevels: compactionLevels, + resolutionLevels: allowedResolutions, + compactionLevels: allowedCompactions, } } @@ -77,27 +85,13 @@ func (bf *BlockFilter) Filter(b *metadata.Meta) bool { } gotResolution := compact.ResolutionLevel(b.Thanos.Downsample.Resolution) - resolutionMatch := false - for _, allowedResolution := range bf.resolutionLevels { - if gotResolution == allowedResolution { - resolutionMatch = true - break - } - } - if !resolutionMatch { + if _, ok := bf.resolutionLevels[gotResolution]; !ok { level.Debug(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", bf.resolutionLevels) return false } gotCompactionLevel := b.BlockMeta.Compaction.Level - compactionMatch := false - for _, allowedCompactionLevel := range bf.compactionLevels { - if gotCompactionLevel == allowedCompactionLevel { - compactionMatch = true - break - } - } - if !compactionMatch { + if _, ok := bf.compactionLevels[gotCompactionLevel]; !ok { level.Debug(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", bf.compactionLevels) return false } diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index fc9e66943d..e8686f90ef 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -311,7 +311,7 @@ func TestReplicationSchemeAll(t *testing.T) { selector = c.selector } - filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter + filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}).Filter fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil) testutil.Ok(t, err) From 55689a76009a517cdd64f1b1c142073fbc1a69dd Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Thu, 28 May 2020 10:37:26 +0200 Subject: [PATCH 7/8] fix: update docs Signed-off-by: Martin Chodur --- docs/components/tools.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/components/tools.md b/docs/components/tools.md index e225730c50..0a06702e53 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -451,10 +451,10 @@ Flags: format details: https://thanos.io/storage.md/#configuration The object storage which replicate data to. - --resolution=0 ... Only blocks with this resolution will be - replicated. (Resolution in ms) - --compaction=1 ... Only blocks with this compaction level will be - replicated. + --resolution=0 ... Only blocks with those resolutions will be + replicated. (Resolution in ms). Repeated flag. + --compaction=1 ... Only blocks with those compaction levels will + be replicated. Repeated flag. --matcher=key="value" ... Only blocks whose external labels exactly match this matcher will be replicated. --single-run Run replication only one time, then exit. From 1f4ce5e65c32dbf613b3b98bd7d6bc1a56b0af48 Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Thu, 28 May 2020 14:36:50 +0200 Subject: [PATCH 8/8] feat tools: bucket replicate update flag defaults and input format of resolution Signed-off-by: Martin Chodur --- CHANGELOG.md | 4 ++++ cmd/thanos/tools_bucket.go | 20 ++++++++++---------- docs/components/tools.md | 6 +++--- pkg/replicate/scheme.go | 27 ++++++++++----------------- 4 files changed, 27 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56f1a61db2..f9a2179035 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,8 +24,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2667](https://github.com/thanos-io/thanos/pull/2667) Store: removed support to the legacy `index.cache.json`. The hidden flag `--store.disable-index-header` was removed. - [#2667](https://github.com/thanos-io/thanos/pull/2667) Compact: the deprecated flag `--index.generate-missing-cache-file` and the metric `thanos_compact_generated_index_total` were removed. - [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA). +- [#2671](https://github.com/thanos-io/thanos/pull/2671) *breaking* Tools: bucket replicate flag `--resolution` is now in Go duration format. +- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now replicates by default all blocks. + ### Added + - [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now allows passing repeated `--compaction` and `--resolution` flags. ## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 1dbacdae7e..9d7d27de4d 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -414,31 +414,31 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str // Provide a list of resolution, can not use Enum directly, since string does not implement int64 function. func listResLevel() []string { return []string{ - strconv.FormatInt(downsample.ResLevel0, 10), - strconv.FormatInt(downsample.ResLevel1, 10), - strconv.FormatInt(downsample.ResLevel2, 10)} + time.Duration(downsample.ResLevel0).String(), + time.Duration(downsample.ResLevel1).String(), + time.Duration(downsample.ResLevel2).String()} } func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) { cmd := root.Command("replicate", fmt.Sprintf("Replicate data from one object storage to another. NOTE: Currently it works only with Thanos blocks (%v has to have Thanos metadata).", block.MetaFilename)) httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) toObjStoreConfig := regCommonObjStoreFlags(cmd, "-to", false, "The object storage which replicate data to.") - resolutions := cmd.Flag("resolution", "Only blocks with those resolutions will be replicated. (Resolution in ms). Repeated flag.").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64List() - compactions := cmd.Flag("compaction", "Only blocks with those compaction levels will be replicated. Repeated flag.").Default("1").Ints() + resolutions := cmd.Flag("resolution", "Only blocks with these resolutions will be replicated. Repeated flag.").Default("0s", "5m", "1h").HintAction(listResLevel).DurationList() + compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints() matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings() singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool() - var resolutionLevels []compact.ResolutionLevel - for _, lvl := range *resolutions { - resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl)) - } - m[name+" replicate"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { matchers, err := replicate.ParseFlagMatchers(*matcherStrs) if err != nil { return errors.Wrap(err, "parse block label matchers") } + var resolutionLevels []compact.ResolutionLevel + for _, lvl := range *resolutions { + resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl.Milliseconds())) + } + return replicate.RunReplicate( g, logger, diff --git a/docs/components/tools.md b/docs/components/tools.md index 0a06702e53..a5ca3c853c 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -451,9 +451,9 @@ Flags: format details: https://thanos.io/storage.md/#configuration The object storage which replicate data to. - --resolution=0 ... Only blocks with those resolutions will be - replicated. (Resolution in ms). Repeated flag. - --compaction=1 ... Only blocks with those compaction levels will + --resolution=0s... ... Only blocks with these resolutions will be + replicated. Repeated flag. + --compaction=1... ... Only blocks with these compaction levels will be replicated. Repeated flag. --matcher=key="value" ... Only blocks whose external labels exactly match this matcher will be replicated. diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index bae7fa1f6e..2a97c0a5e8 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -6,6 +6,7 @@ package replicate import ( "bytes" "context" + "fmt" "io" "io/ioutil" "path" @@ -86,13 +87,13 @@ func (bf *BlockFilter) Filter(b *metadata.Meta) bool { gotResolution := compact.ResolutionLevel(b.Thanos.Downsample.Resolution) if _, ok := bf.resolutionLevels[gotResolution]; !ok { - level.Debug(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", bf.resolutionLevels) + level.Info(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", fmt.Sprintf("%v", bf.resolutionLevels)) return false } gotCompactionLevel := b.BlockMeta.Compaction.Level if _, ok := bf.compactionLevels[gotCompactionLevel]; !ok { - level.Debug(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", bf.compactionLevels) + level.Info(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", fmt.Sprintf("%v", bf.compactionLevels)) return false } @@ -213,31 +214,23 @@ func (rs *replicationScheme) execute(ctx context.Context) error { return nil } - level.Debug(rs.logger).Log("msg", "adding block to available blocks", "block_uuid", id.String()) - - availableBlocks = append(availableBlocks, meta) + if rs.blockFilter(meta) { + level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String()) + availableBlocks = append(availableBlocks, meta) + } return nil }); err != nil { return errors.Wrap(err, "iterate over origin bucket") } - candidateBlocks := []*metadata.Meta{} - - for _, b := range availableBlocks { - if rs.blockFilter(b) { - level.Debug(rs.logger).Log("msg", "adding block to candidate blocks", "block_uuid", b.BlockMeta.ULID.String()) - candidateBlocks = append(candidateBlocks, b) - } - } - // In order to prevent races in compactions by the target environment, we // need to replicate oldest start timestamp first. - sort.Slice(candidateBlocks, func(i, j int) bool { - return candidateBlocks[i].BlockMeta.MinTime < candidateBlocks[j].BlockMeta.MinTime + sort.Slice(availableBlocks, func(i, j int) bool { + return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime }) - for _, b := range candidateBlocks { + for _, b := range availableBlocks { if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String()) }