Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat tools: bucket replicate allows to filter multiple compaction and… #2671

Merged
merged 8 commits into from Jun 8, 2020
7 changes: 7 additions & 0 deletions CHANGELOG.md
Expand Up @@ -24,6 +24,13 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2667](https://github.com/thanos-io/thanos/pull/2667) Store: removed support to the legacy `index.cache.json`. The hidden flag `--store.disable-index-header` was removed.
- [#2667](https://github.com/thanos-io/thanos/pull/2667) Compact: the deprecated flag `--index.generate-missing-cache-file` and the metric `thanos_compact_generated_index_total` were removed.
- [2603](https://github.com/thanos-io/thanos/pull/2603) Store/Querier: Significantly optimize cases where StoreAPIs or blocks returns exact overlapping chunks (e.g Store GW and sidecar or brute force Store Gateway HA).
- [#2671](https://github.com/thanos-io/thanos/pull/2671) *breaking* Tools: bucket replicate flag `--resolution` is now in Go duration format.
- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now replicates by default all blocks.


### Added

- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now allows passing repeated `--compaction` and `--resolution` flags.

## [v0.13.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS

Expand Down
21 changes: 12 additions & 9 deletions cmd/thanos/tools_bucket.go
Expand Up @@ -414,19 +414,17 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
// Provide a list of resolution, can not use Enum directly, since string does not implement int64 function.
func listResLevel() []string {
return []string{
strconv.FormatInt(downsample.ResLevel0, 10),
strconv.FormatInt(downsample.ResLevel1, 10),
strconv.FormatInt(downsample.ResLevel2, 10)}
time.Duration(downsample.ResLevel0).String(),
time.Duration(downsample.ResLevel1).String(),
time.Duration(downsample.ResLevel2).String()}
}

func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {
cmd := root.Command("replicate", fmt.Sprintf("Replicate data from one object storage to another. NOTE: Currently it works only with Thanos blocks (%v has to have Thanos metadata).", block.MetaFilename))
httpBindAddr, httpGracePeriod := regHTTPFlags(cmd)
toObjStoreConfig := regCommonObjStoreFlags(cmd, "-to", false, "The object storage which replicate data to.")
// TODO(bwplotka): Allow to replicate many resolution levels.
resolution := cmd.Flag("resolution", "Only blocks with this resolution will be replicated.").Default(strconv.FormatInt(downsample.ResLevel0, 10)).HintAction(listResLevel).Int64()
// TODO(bwplotka): Allow to replicate many compaction levels.
compaction := cmd.Flag("compaction", "Only blocks with this compaction level will be replicated.").Default("1").Int()
resolutions := cmd.Flag("resolution", "Only blocks with these resolutions will be replicated. Repeated flag.").Default("0s", "5m", "1h").HintAction(listResLevel).DurationList()
compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka I eventually took a stab and changed the defaults and the format of --resolution flag to Go duration.
But if you are against it I'll be happy to roll it back it just felt bit more user friendly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I think that's ok (:

matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings()
singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool()

Expand All @@ -436,6 +434,11 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
return errors.Wrap(err, "parse block label matchers")
}

var resolutionLevels []compact.ResolutionLevel
for _, lvl := range *resolutions {
resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl.Milliseconds()))
}

return replicate.RunReplicate(
g,
logger,
Expand All @@ -444,8 +447,8 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
*httpBindAddr,
time.Duration(*httpGracePeriod),
matchers,
compact.ResolutionLevel(*resolution),
*compaction,
resolutionLevels,
*compactions,
objStoreConfig,
toObjStoreConfig,
*singleRun,
Expand Down
8 changes: 4 additions & 4 deletions docs/components/tools.md
Expand Up @@ -451,10 +451,10 @@ Flags:
format details:
https://thanos.io/storage.md/#configuration The
object storage which replicate data to.
--resolution=0 Only blocks with this resolution will be
replicated.
--compaction=1 Only blocks with this compaction level will be
replicated.
--resolution=0s... ... Only blocks with these resolutions will be
replicated. Repeated flag.
--compaction=1... ... Only blocks with these compaction levels will
be replicated. Repeated flag.
--matcher=key="value" ... Only blocks whose external labels exactly match
this matcher will be replicated.
--single-run Run replication only one time, then exit.
Expand Down
8 changes: 4 additions & 4 deletions pkg/replicate/replicator.go
Expand Up @@ -69,8 +69,8 @@ func RunReplicate(
httpBindAddr string,
httpGracePeriod time.Duration,
labelSelector labels.Selector,
resolution compact.ResolutionLevel,
compaction int,
resolutions []compact.ResolutionLevel,
compactions []int,
fromObjStoreConfig *extflag.PathOrContent,
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
Expand Down Expand Up @@ -159,8 +159,8 @@ func RunReplicate(
blockFilter := NewBlockFilter(
logger,
labelSelector,
resolution,
compaction,
resolutions,
compactions,
).Filter
metrics := newReplicationMetrics(reg)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
65 changes: 30 additions & 35 deletions pkg/replicate/scheme.go
Expand Up @@ -6,6 +6,7 @@ package replicate
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"path"
Expand All @@ -27,24 +28,32 @@ import (

// BlockFilter is block filter that filters out compacted and unselected blocks.
type BlockFilter struct {
logger log.Logger
labelSelector labels.Selector
resolutionLevel compact.ResolutionLevel
compactionLevel int
logger log.Logger
labelSelector labels.Selector
resolutionLevels map[compact.ResolutionLevel]struct{}
compactionLevels map[int]struct{}
}

// NewBlockFilter returns block filter.
func NewBlockFilter(
logger log.Logger,
labelSelector labels.Selector,
resolutionLevel compact.ResolutionLevel,
compactionLevel int,
resolutionLevels []compact.ResolutionLevel,
compactionLevels []int,
) *BlockFilter {
allowedResolutions := make(map[compact.ResolutionLevel]struct{})
for _, resolutionLevel := range resolutionLevels {
allowedResolutions[resolutionLevel] = struct{}{}
}
allowedCompactions := make(map[int]struct{})
for _, compactionLevel := range compactionLevels {
allowedCompactions[compactionLevel] = struct{}{}
}
return &BlockFilter{
labelSelector: labelSelector,
logger: logger,
resolutionLevel: resolutionLevel,
compactionLevel: compactionLevel,
labelSelector: labelSelector,
logger: logger,
resolutionLevels: allowedResolutions,
compactionLevels: allowedCompactions,
}
}

Expand Down Expand Up @@ -77,20 +86,14 @@ func (bf *BlockFilter) Filter(b *metadata.Meta) bool {
}

gotResolution := compact.ResolutionLevel(b.Thanos.Downsample.Resolution)
expectedResolution := bf.resolutionLevel

resolutionMatch := gotResolution == expectedResolution
if !resolutionMatch {
level.Debug(bf.logger).Log("msg", "filtering block", "reason", "resolutions don't match", "got_resolution", gotResolution, "expected_resolution", expectedResolution)
if _, ok := bf.resolutionLevels[gotResolution]; !ok {
level.Info(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", fmt.Sprintf("%v", bf.resolutionLevels))
return false
}

gotCompactionLevel := b.BlockMeta.Compaction.Level
expectedCompactionLevel := bf.compactionLevel

compactionMatch := gotCompactionLevel == expectedCompactionLevel
if !compactionMatch {
level.Debug(bf.logger).Log("msg", "filtering block", "reason", "compaction levels don't match", "got_compaction_level", gotCompactionLevel, "expected_compaction_level", expectedCompactionLevel)
if _, ok := bf.compactionLevels[gotCompactionLevel]; !ok {
level.Info(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", fmt.Sprintf("%v", bf.compactionLevels))
return false
}

Expand Down Expand Up @@ -211,31 +214,23 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
return nil
}

level.Debug(rs.logger).Log("msg", "adding block to available blocks", "block_uuid", id.String())

availableBlocks = append(availableBlocks, meta)
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
}

return nil
}); err != nil {
return errors.Wrap(err, "iterate over origin bucket")
}

candidateBlocks := []*metadata.Meta{}

for _, b := range availableBlocks {
if rs.blockFilter(b) {
level.Debug(rs.logger).Log("msg", "adding block to candidate blocks", "block_uuid", b.BlockMeta.ULID.String())
candidateBlocks = append(candidateBlocks, b)
}
}

// In order to prevent races in compactions by the target environment, we
// need to replicate oldest start timestamp first.
sort.Slice(candidateBlocks, func(i, j int) bool {
return candidateBlocks[i].BlockMeta.MinTime < candidateBlocks[j].BlockMeta.MinTime
sort.Slice(availableBlocks, func(i, j int) bool {
return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime
})

for _, b := range candidateBlocks {
for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/replicate/scheme_test.go
Expand Up @@ -311,7 +311,7 @@ func TestReplicationSchemeAll(t *testing.T) {
selector = c.selector
}

filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter
filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}).Filter
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil)
testutil.Ok(t, err)

Expand Down