Skip to content

Commit

Permalink
compact: hook nodownsamplemarkfilter into filters chain
Browse files Browse the repository at this point in the history
We have a NoDownsampleMarkFilter that we were not using before in the
compactor for some reason. Hook it into the filters chain if
downsampling is enabled and then trim matching ULIDs from the
downsampling process. Add a test to cover this scenario.

Fixes #6179.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Nov 14, 2023
1 parent 2320e49 commit 8bc55b8
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 16 deletions.
61 changes: 48 additions & 13 deletions cmd/thanos/compact.go
Expand Up @@ -234,6 +234,7 @@ func runCompact(
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, conf.blockMetaFetchConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency)
noDownsampleMarkerFilter := downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency)
labelShardedMetaFilter := block.NewLabelShardedMetaFilter(relabelConfig)
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)
Expand All @@ -260,18 +261,21 @@ func runCompact(
sy *compact.Syncer
)
{
filters := []block.MetadataFilter{
timePartitionMetaFilter,
labelShardedMetaFilter,
consistencyDelayMetaFilter,
ignoreDeletionMarkFilter,
block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels),
duplicateBlocksFilter,
noCompactMarkerFilter,
}
if !conf.disableDownsampling {
filters = append(filters, noDownsampleMarkerFilter)
}
// Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability.
cf := baseMetaFetcher.NewMetaFetcher(
extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
timePartitionMetaFilter,
labelShardedMetaFilter,
consistencyDelayMetaFilter,
ignoreDeletionMarkFilter,
block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels),
duplicateBlocksFilter,
noCompactMarkerFilter,
},
)
extprom.WrapRegistererWithPrefix("thanos_", reg), filters)
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
api.SetLoaded(blocks, err)
})
Expand Down Expand Up @@ -436,22 +440,53 @@ func runCompact(
return errors.Wrap(err, "sync before first pass of downsampling")
}

for _, meta := range sy.Metas() {
filteredMetas := sy.Metas()
noDownsampleBlocks := noDownsampleMarkerFilter.NoDownsampleMarkedBlocks()
for ul := range noDownsampleBlocks {
delete(filteredMetas, ul)
}

for _, meta := range filteredMetas {
groupKey := meta.Thanos.GroupKey()
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {

if err := downsampleBucket(
ctx,
logger,
downsampleMetrics,
insBkt,
filteredMetas,
downsamplingDir,
conf.downsampleConcurrency,
conf.blockFilesConcurrency,
metadata.HashFunc(conf.hashFunc),
conf.acceptMalformedIndex,
); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {

if err := downsampleBucket(
ctx,
logger,
downsampleMetrics,
insBkt,
filteredMetas,
downsamplingDir,
conf.downsampleConcurrency,
conf.blockFilesConcurrency,
metadata.HashFunc(conf.hashFunc),
conf.acceptMalformedIndex,
); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}

level.Info(logger).Log("msg", "downsampling iterations done")
} else {
level.Info(logger).Log("msg", "downsampling was explicitly disabled")
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Expand Up @@ -92,7 +92,7 @@ func RunDownsample(
// While fetching blocks, filter out blocks that were marked for no downsample.
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency),
})
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/downsample/downsample.go
Expand Up @@ -808,11 +808,11 @@ type GatherNoDownsampleMarkFilter struct {
}

// NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter.
func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter {
func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, concurrency int) *GatherNoDownsampleMarkFilter {
return &GatherNoDownsampleMarkFilter{
logger: logger,
bkt: bkt,
concurrency: 1,
concurrency: concurrency,
}
}

Expand Down
46 changes: 46 additions & 0 deletions test/e2e/compact_test.go
Expand Up @@ -863,3 +863,49 @@ func ensureGETStatusCode(t testing.TB, code int, url string) {
testutil.Ok(t, err)
testutil.Equals(t, code, r.StatusCode)
}

func TestCompactorDownsampleIgnoresMarked(t *testing.T) {
now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z")
testutil.Ok(t, err)

logger := log.NewLogfmtLogger(os.Stderr)
e, err := e2e.NewDockerEnvironment("downsample-mrkd")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

dir := filepath.Join(e.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(dir, os.ModePerm))

const bucket = "compact-test"
m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(m))

bktCfg := e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir())
bkt, err := s3.NewBucketWithConfig(logger, bktCfg, "test")
testutil.Ok(t, err)

downsampledBase := blockDesc{
series: []labels.Labels{
labels.FromStrings("z", "1", "b", "2"),
labels.FromStrings("z", "1", "b", "5"),
},
extLset: labels.FromStrings("case", "block-about-to-be-downsampled"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(10 * 24 * time.Hour)),
}
// New block that will be downsampled.
justAfterConsistencyDelay := 30 * time.Minute

downsampledRawID, err := downsampledBase.Create(context.Background(), dir, justAfterConsistencyDelay, metadata.NoneFunc, 1200)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, downsampledRawID.String()), downsampledRawID.String()))
testutil.Ok(t, block.MarkForNoDownsample(context.Background(), logger, bkt, downsampledRawID, metadata.ManualNoDownsampleReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))

c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()),
}, nil)
testutil.Ok(t, e2e.StartAndWaitReady(c))
testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics()))

}

0 comments on commit 8bc55b8

Please sign in to comment.