Skip to content

Commit

Permalink
compact+store+tools: Filter for duplicate block data only within a co…
Browse files Browse the repository at this point in the history
…mpaction group and in parallel (#4963)

* Move group key function to the metadata package. Deduplicate within compaction groups and in parallel.

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* cleanup after rebase

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* formatting

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* Reduce mutex operations in DeduplicateFilter

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* fix tests after last update from main

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* formatting

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* Move fetcher modifiers before filters so we can compute correct compactions groups during deduplication

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* Merge the notion of filters and modifiers so we can strip replica labels before deduplication.

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* formatting

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* fix tests

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* remove duplicate filter list entry

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* FIx new calls to NewStoreGW

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* add CHANGELOG

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* Apply suggestions from code review

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

* comment suggested in review

Signed-off-by: Nathan Berkley <nberkley@tripadvisor.com>

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
nberkley and bwplotka committed Jan 28, 2022
1 parent 299cf45 commit 78e923d
Show file tree
Hide file tree
Showing 25 changed files with 171 additions and 239 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#4963](https://github.com/thanos-io/thanos/pull/4963) Compactor, Store, Tools: Loading block metadata now only filters out duplicates within a source (or compaction group if replica labels are configured), and does so in parallel over sources.
- [#5089](https://github.com/thanos-io/thanos/pull/5089) S3: Create an empty map in the case SSE-KMS is used and no KMSEncryptionContext is passed.
- [#4970](https://github.com/thanos-io/thanos/pull/4970) Added a new flag `exclude-delete` to `tools bucket ls`, which excludes blocks marked for deletion.
- [#4903](https://github.com/thanos-io/thanos/pull/4903) Compactor: Added tracing support for compaction.
Expand Down
9 changes: 5 additions & 4 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func runCompact(
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2, conf.blockMetaFetchConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()
duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, bkt, conf.blockMetaFetchConcurrency)
labelShardedMetaFilter := block.NewLabelShardedMetaFilter(relabelConfig)
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
Expand Down Expand Up @@ -271,9 +271,10 @@ func runCompact(
labelShardedMetaFilter,
consistencyDelayMetaFilter,
ignoreDeletionMarkFilter,
block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels),
duplicateBlocksFilter,
noCompactMarkerFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels)},
},
)
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
compactorView.Set(blocks, err)
Expand Down Expand Up @@ -440,7 +441,7 @@ func runCompact(
}

for _, meta := range sy.Metas() {
groupKey := compact.DefaultGroupKey(meta.Thanos)
groupKey := meta.Thanos.GroupKey()
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
Expand Down Expand Up @@ -532,7 +533,7 @@ func runCompact(

// Separate fetcher for global view.
// TODO(bwplotka): Allow Bucket UI to visualize the state of the block as well.
f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil, "component", "globalBucketUI")
f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, "component", "globalBucketUI")
f.UpdateOnChange(func(blocks []metadata.Meta, err error) {
global.Set(blocks, err)
api.SetGlobal(blocks, err)
Expand Down
14 changes: 7 additions & 7 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/errutil"
Expand Down Expand Up @@ -86,8 +85,8 @@ func RunDownsample(
}

metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(),
}, nil)
block.NewDeduplicateFilter(block.FetcherConcurrency),
})
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down Expand Up @@ -121,7 +120,7 @@ func RunDownsample(
}

for _, meta := range metas {
groupKey := compact.DefaultGroupKey(meta.Thanos)
groupKey := meta.Thanos.GroupKey()
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
Expand Down Expand Up @@ -252,10 +251,11 @@ func downsampleBucket(
errMsg = "downsampling to 60 min"
}
if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
metrics.downsampleFailures.WithLabelValues(m.Thanos.GroupKey()).Inc()
errCh <- errors.Wrap(err, errMsg)

}
metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
metrics.downsamples.WithLabelValues(m.Thanos.GroupKey()).Inc()
}
}()
}
Expand Down Expand Up @@ -377,7 +377,7 @@ func processDownsampling(
downsampleDuration := time.Since(begin)
level.Info(logger).Log("msg", "downsampled block",
"from", m.ULID, "to", id, "duration", downsampleDuration, "duration_ms", downsampleDuration.Milliseconds())
metrics.downsampleDuration.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Observe(downsampleDuration.Seconds())
metrics.downsampleDuration.WithLabelValues(m.Thanos.GroupKey()).Observe(downsampleDuration.Seconds())

if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil {
return errors.Wrap(err, "output block index not valid")
Expand Down
11 changes: 5 additions & 6 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
Expand Down Expand Up @@ -154,8 +153,8 @@ func TestRegression4960_Deadlock(t *testing.T) {
testutil.Ok(t, err)

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil)
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
Expand Down Expand Up @@ -194,14 +193,14 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
testutil.Ok(t, err)

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil)
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir should not exist at the end of execution")
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ func runStore(
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, time.Duration(conf.consistencyDelay), extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(),
}, nil)
block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency),
})
if err != nil {
return errors.Wrap(err, "meta fetcher")
}
Expand Down
18 changes: 9 additions & 9 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path
return err
}

fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -386,7 +386,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, 0, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
}
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
}
Expand Down Expand Up @@ -486,7 +486,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -632,8 +632,8 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewDeduplicateFilter(),
}, nil)
block.NewDeduplicateFilter(block.FetcherConcurrency),
})
if err != nil {
return err
}
Expand Down Expand Up @@ -786,7 +786,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, block.FetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()
duplicateBlocksFilter := block.NewDeduplicateFilter(block.FetcherConcurrency)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, tbc.deleteDelay, stubCounter, stubCounter)

ctx := context.Background()
Expand All @@ -803,7 +803,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
block.NewConsistencyDelayMetaFilter(logger, tbc.consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, make([]string, 0))},
},
)
sy, err = compact.NewMetaSyncer(
logger,
Expand Down Expand Up @@ -1312,7 +1312,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, block.FetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()
duplicateBlocksFilter := block.NewDeduplicateFilter(block.FetcherConcurrency)
stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

var sy *compact.Syncer
Expand All @@ -1327,7 +1327,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
block.NewConsistencyDelayMetaFilter(logger, tbc.consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)),
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, make([]string, 0))},
},
)
sy, err = compact.NewMetaSyncer(
logger,
Expand Down

0 comments on commit 78e923d

Please sign in to comment.