Skip to content

Commit

Permalink
Allow using different listing strategies
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Feb 13, 2024
1 parent e78d867 commit 7cc4098
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 53 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Expand Up @@ -239,7 +239,7 @@ func runCompact(
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Expand Up @@ -90,7 +90,7 @@ func RunDownsample(
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// While fetching blocks, filter out blocks that were marked for no downsample.
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency),
Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/main_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestRegression4960_Deadlock(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down Expand Up @@ -197,7 +198,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Expand Up @@ -346,7 +346,7 @@ func runStore(
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/tools_bucket.go
Expand Up @@ -365,7 +365,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path

// We ignore any block that has the deletion marker file.
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
Expand Down Expand Up @@ -423,7 +423,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
Expand Down Expand Up @@ -525,7 +525,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
Expand Down Expand Up @@ -669,7 +669,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
return err
}
// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
Expand Down Expand Up @@ -848,7 +848,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat

var sy *compact.Syncer
{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down Expand Up @@ -1391,7 +1391,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P

var sy *compact.Syncer
{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
120 changes: 96 additions & 24 deletions pkg/block/fetcher.go
Expand Up @@ -170,26 +170,27 @@ func DefaultModifiedLabelValues() [][]string {
}
}

// Fetcher interface to retieve blockId information from a bucket.
type BlockIDsFetcher interface {
// GetActiveBlocksIDs returning it via channel (streaming) and response.
// BlockIDsLister lists block IDs from a bucket.
type BlockIDsLister interface {
// GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response.
// Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json
GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error)
}

type BaseBlockIDsFetcher struct {
// RecursiveLister lists block IDs by iterating the object storage bucket recursively.
type RecursiveLister struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
}

func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher {
return &BaseBlockIDsFetcher{
func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveLister {
return &RecursiveLister{
logger: logger,
bkt: bkt,
}
}

func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
partialBlocks = make(map[ulid.ULID]bool)
err = f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
Expand All @@ -216,6 +217,77 @@ func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, c
return partialBlocks, err
}

// DefaultLister Default lists block IDs by doing a top level iteration of the bucket and using an Exists call to detect partial blocks.
type DefaultLister struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
}

func NewDefaultLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *DefaultLister {
return &DefaultLister{
logger: logger,
bkt: bkt,
}
}

func (f *DefaultLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
const concurrency = 64

var metaChan = make(chan ulid.ULID, concurrency)
if err = f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
dir, file := parts[0], parts[len(parts)-1]
id, ok := IsBlockDir(dir)
if !ok {
return nil
}
if !IsBlockMetaFile(file) {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case metaChan <- id:
}
return nil
}); err != nil {
return nil, err
}

partialBlocks = make(map[ulid.ULID]bool)
var (
eg errgroup.Group
mu sync.Mutex
)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for uid := range metaChan {
// TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items.
// For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix.
// TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work).
metaFile := path.Join(uid.String(), MetaFilename)
ok, err := f.bkt.Exists(ctx, metaFile)
if err != nil {
return errors.Wrapf(err, "meta.json file exists: %v", uid)
}
if !ok {
mu.Lock()
partialBlocks[uid] = true
mu.Unlock()
return ErrorSyncMetaNotFound
}
ch <- uid
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err

}
return partialBlocks, nil
}

type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
UpdateOnChange(func([]metadata.Meta, error))
Expand All @@ -234,10 +306,10 @@ type MetadataFilter interface {
// BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
// Go-routine safe.
type BaseFetcher struct {
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
blockIDsFetcher BlockIDsFetcher
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
blockIDsLister BlockIDsLister

// Optional local directory to cache meta.json files.
cacheDir string
Expand All @@ -249,12 +321,12 @@ type BaseFetcher struct {
}

// NewBaseFetcher constructs BaseFetcher.
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsLister, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
return NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, NewBaseFetcherMetrics(reg))
}

// NewBaseFetcherWithMetrics constructs BaseFetcher.
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsLister BlockIDsLister, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -268,24 +340,24 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.
}

return &BaseFetcher{
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
blockIDsFetcher: blockIDsFetcher,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
blockIDsLister: blockIDsLister,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
}, nil
}

// NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads.
// NOTE: Not suitable to use in production.
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher) (*MetaFetcher, error) {
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsLister) (*MetaFetcher, error) {
return NewMetaFetcher(logger, 1, bkt, blockIDsFetcher, "", nil, nil)
}

// NewMetaFetcher returns meta fetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) {
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsLister, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, blockIDsFetcher, dir, reg)
if err != nil {
return nil, err
Expand All @@ -294,7 +366,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente
}

// NewMetaFetcherWithMetrics returns meta fetcher.
func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) {
func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsLister, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, baseFetcherMetrics)
if err != nil {
return nil, err
Expand Down Expand Up @@ -445,7 +517,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
partialBlocks, err = f.blockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
partialBlocks, err = f.blockIDsLister.GetActiveAndPartialBlockIDs(ctx, ch)
return err
})

Expand Down
3 changes: 2 additions & 1 deletion pkg/block/fetcher_test.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/objstore/objtesting"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
r := prometheus.NewRegistry()
noopLogger := log.NewNopLogger()
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := NewBaseBlockIDsFetcher(noopLogger, insBkt)
baseBlockIDsFetcher := NewDefaultLister(noopLogger, insBkt)
baseFetcher, err := NewBaseFetcher(noopLogger, 20, insBkt, baseBlockIDsFetcher, dir, r)
testutil.Ok(t, err)

Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/clean_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)
Expand All @@ -30,7 +31,7 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
logger := log.NewNopLogger()

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down
7 changes: 4 additions & 3 deletions pkg/compact/compact_e2e_test.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/objstore/objtesting"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/dedup"
Expand Down Expand Up @@ -95,7 +96,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {

duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(nil, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(nil, insBkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{
duplicateBlocksFilter,
})
Expand Down Expand Up @@ -197,7 +198,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2)
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
Expand Down Expand Up @@ -509,7 +510,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T

duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/retention_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand Down Expand Up @@ -245,7 +246,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution))
}

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewDefaultLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/replicate/replicator.go
Expand Up @@ -244,7 +244,7 @@ func newMetaFetcher(
if ignoreMarkedForDeletion {
filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency))
}
baseBlockIDsFetcher := thanosblock.NewBaseBlockIDsFetcher(logger, fromBkt)
baseBlockIDsFetcher := thanosblock.NewDefaultLister(logger, fromBkt)
return thanosblock.NewMetaFetcher(
logger,
concurrency,
Expand Down

0 comments on commit 7cc4098

Please sign in to comment.