Skip to content

Commit

Permalink
compactor: Add ReplicaLabelRemover as MetaFetcher filter to enable of…
Browse files Browse the repository at this point in the history
…fline vertical compaction/deduplication for replicated data (#2250)

* Create ReplicaLabelsFilter to allow for offline deduplication

Signed-off-by: Matthias Loibl <mail@matthiasloibl.com>

* Start adding a e2e test for offline-deduplication with Thanos compact

Signed-off-by: Matthias Loibl <mail@matthiasloibl.com>

* Address issues that have discovered after review

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Fix e2e test service issue

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Improve fetcher unit tests

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Add simple compactor e2e tests with replica remover

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Remove unnecessary interface

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Add more test cases

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Improve and stabilize e2e tests

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Increase ruler sd refresh interval

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Separate filters and modifiers

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

Co-authored-by: Matthias Loibl <mail@matthiasloibl.com>
  • Loading branch information
kakkoyun and metalmatze committed Mar 24, 2020
1 parent d89a497 commit 94b5882
Show file tree
Hide file tree
Showing 20 changed files with 911 additions and 121 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Expand Up @@ -25,4 +25,4 @@ jobs:
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

- name: Run e2e docker-based tests.
run: make test-e2e
run: make test-e2e-ci
6 changes: 3 additions & 3 deletions CHANGELOG.md
Expand Up @@ -21,12 +21,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added

- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.

### Changed

- [#2136](https://github.com/thanos-io/thanos/pull/2136) store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage.
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion.

- [#2136](https://github.com/thanos-io/thanos/pull/2136) *breaking* store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage.
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. If you want to keep existing behavior, you should add `--delete-delay=0s` as a flag.
- [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more.
- [#2294](https://github.com/thanos-io/thanos/pull/2294) store: optimizations for fetching postings. Queries using `=~".*"` matchers or negation matchers (`!=...` or `!~...`) benefit the most.

Expand Down
17 changes: 16 additions & 1 deletion Makefile
Expand Up @@ -245,8 +245,23 @@ test-local:
.PHONY: test-e2e
test-e2e: ## Runs all Thanos e2e docker-based e2e tests from test/e2e. Required access to docker daemon.
test-e2e: docker
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go test -v ./test/e2e/...
@go test -failfast -timeout 5m -v ./test/e2e/...

.PHONY: test-e2e-ci
test-e2e-ci: ## Runs all Thanos e2e docker-based e2e tests from test/e2e, using limited resources. Required access to docker daemon.
test-e2e-ci: docker
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go clean -testcache
@go test -failfast -parallel 1 -timeout 5m -v ./test/e2e/...

.PHONY: install-deps
install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests.
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/bucket.go
Expand Up @@ -140,7 +140,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
issues = append(issues, issueFn)
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -458,7 +458,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down
29 changes: 23 additions & 6 deletions cmd/thanos/compact.go
Expand Up @@ -133,6 +133,12 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("48h"))

dedupReplicaLabels := cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When it is set true, this will given labels from blocks so that vertical compaction could merge blocks."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
"This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication.").
Hidden().Strings()

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand All @@ -157,6 +163,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
*dedupReplicaLabels,
selectorRelabelConf,
*waitInterval,
)
Expand All @@ -183,6 +190,7 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
dedupReplicaLabels []string,
selectorRelabelConf *extflag.PathOrContent,
waitInterval time.Duration,
) error {
Expand Down Expand Up @@ -278,17 +286,26 @@ func runCompact(
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
duplicateBlocksFilter.Filter,

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
},
block.NewReplicaLabelRemover(logger, dedupReplicaLabels),
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, false)
enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","))
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Expand Up @@ -72,7 +72,7 @@ func RunDownsample(
return err
}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), nil)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/main_test.go
Expand Up @@ -76,7 +76,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
Expand Down
14 changes: 7 additions & 7 deletions cmd/thanos/store.go
Expand Up @@ -236,13 +236,13 @@ func runStore(

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay)
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer,
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
block.NewDeduplicateFilter().Filter,
)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(),
})
if err != nil {
return errors.Wrap(err, "meta fetcher")
}
Expand Down

0 comments on commit 94b5882

Please sign in to comment.