diff --git a/CHANGELOG.md b/CHANGELOG.md index 78b9e7d864..9ea3180673 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,7 +39,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7123](https://github.com/thanos-io/thanos/pull/7123) Rule: Change default Alertmanager API version to v2. - [#7223](https://github.com/thanos-io/thanos/pull/7223) Automatic detection of memory limits and configure GOMEMLIMIT to match. -- [#7282](https://github.com/thanos-io/thanos/pull/7282) Fetcher: mark metas with incomplete files as corrupted. +- [#7282](https://github.com/thanos-io/thanos/pull/7282) Fetcher: mark metas with incomplete files as partial if configured. - [#7282](https://github.com/thanos-io/thanos/pull/7282) Compactor: add flag to disable cleanup of partial uploads ### Removed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 1b408f6d6b..9b111c67d8 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -419,7 +419,7 @@ func runCompact( var cleanMtx sync.Mutex // TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex. cleanPartialMarked := func() error { - if !conf.cleanupPartialUploads { + if conf.disableCleanupPartialUploads { level.Info(logger).Log("msg", "cleanup of partial uploads is disabled, skipping") return nil } @@ -728,7 +728,7 @@ type compactConfig struct { progressCalculateInterval time.Duration filterConf *store.FilterConfig disableAdminOperations bool - cleanupPartialUploads bool + disableCleanupPartialUploads bool } func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -842,6 +842,5 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&cc.label) cmd.Flag("disable-admin-operations", "Disable UI/API admin operations like marking blocks for deletion and no compaction.").Default("false").BoolVar(&cc.disableAdminOperations) - cmd.Flag("compact.cleanup-partial-uploads", "Enable cleanup of partial uploads"). - Hidden().Default("true").BoolVar(&cc.cleanupPartialUploads) + cmd.Flag("compact.disable-cleanup-partial-uploads", "Disable cleanup of partial uploads.").Default("false").BoolVar(&cc.disableCleanupPartialUploads) } diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 2265df56fa..e26f6d0bdf 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -78,10 +78,10 @@ const ( FailedMeta = "failed" // Synced label values. - labelExcludedMeta = "label-excluded" - timeExcludedMeta = "time-excluded" - tooFreshMeta = "too-fresh" - duplicateMeta = "duplicate" + LabelExcludedMeta = "label-excluded" + TimeExcludedMeta = "time-excluded" + TooFreshMeta = "too-fresh" + DuplicateMeta = "duplicate" // Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted, // but don't have a replacement block yet. MarkedForDeletionMeta = "marked-for-deletion" @@ -93,7 +93,10 @@ const ( MarkedForNoDownsampleMeta = "marked-for-no-downsample" // Modified label values. - replicaRemovedMeta = "replica-label-removed" + ReplicaRemovedMeta = "replica-label-removed" + + // Mysterious incomplete block, meta was uploaded indicating that its uploaded just fine, but at the same time it indicates that the block is incomplete + MetaHasIncompleteFiles = "meta-has-incomplete-files" ) func NewBaseFetcherMetrics(reg prometheus.Registerer) *BaseFetcherMetrics { @@ -155,19 +158,20 @@ func DefaultSyncedStateLabelValues() [][]string { {CorruptedMeta}, {NoMeta}, {LoadedMeta}, - {tooFreshMeta}, + {TooFreshMeta}, {FailedMeta}, - {labelExcludedMeta}, - {timeExcludedMeta}, - {duplicateMeta}, + {LabelExcludedMeta}, + {TimeExcludedMeta}, + {DuplicateMeta}, {MarkedForDeletionMeta}, {MarkedForNoCompactionMeta}, + {MetaHasIncompleteFiles}, } } func DefaultModifiedLabelValues() [][]string { return [][]string{ - {replicaRemovedMeta}, + {ReplicaRemovedMeta}, } } @@ -175,7 +179,7 @@ func DefaultModifiedLabelValues() [][]string { type Lister interface { // GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response. // Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json - GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) + GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]error, err error) } // RecursiveLister lists block IDs by recursively iterating through a bucket. @@ -191,8 +195,8 @@ func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader } } -func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { - partialBlocks = make(map[ulid.ULID]bool) +func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]error, err error) { + partialBlocks = make(map[ulid.ULID]error) err = f.bkt.Iter(ctx, "", func(name string) error { parts := strings.Split(name, "/") dir, file := parts[0], parts[len(parts)-1] @@ -201,12 +205,12 @@ func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch ch return nil } if _, ok := partialBlocks[id]; !ok { - partialBlocks[id] = true + partialBlocks[id] = errors.Wrapf(ErrorSyncMetaNotFound, "block id: %s", id) } if !IsBlockMetaFile(file) { return nil } - partialBlocks[id] = false + partialBlocks[id] = nil select { case <-ctx.Done(): @@ -218,6 +222,93 @@ func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch ch return partialBlocks, err } +// RecursiveBlockValidatingLister lists block IDs by recursively iterating through a bucket and performs several validations +type RecursiveBlockValidatingLister struct { + logger log.Logger + bkt objstore.InstrumentedBucketReader +} + +func NewRecursiveBlockValidatingLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveBlockValidatingLister { + return &RecursiveBlockValidatingLister{ + logger: logger, + bkt: bkt, + } +} + +func (f *RecursiveBlockValidatingLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]error, err error) { + filesPerBlock := make(map[ulid.ULID][]string) + err = f.bkt.Iter(ctx, "", func(name string) error { + parts := strings.Split(name, "/") + id, ok := IsBlockDir(parts[0]) + if !ok { + return nil + } + filesPerBlock[id] = append(filesPerBlock[id], strings.TrimLeft(name, parts[0]+"/")) + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return nil + }, objstore.WithRecursiveIter) + + partialBlocks = make(map[ulid.ULID]error) + for id, files := range filesPerBlock { + if checkErr := checkForIncompleteFiles(files); checkErr != nil { + partialBlocks[id] = checkErr + } else { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case ch <- id: + } + } + } + return partialBlocks, err +} + +func checkForIncompleteFiles(files []string) error { + var ( + numChunkFiles int + highestChunkFile int + hasIndex, hasMeta bool + ) + + for _, f := range files { + if f == "index" { + hasIndex = true + } + if f == "meta.json" { + hasMeta = true + } + dir, name := path.Split(f) + if dir == "chunks/" { + numChunkFiles++ + idx, err := strconv.Atoi(name) + if err != nil { + return errors.Wrap(err, "unexpected chunk file name") + } + if idx > highestChunkFile { + highestChunkFile = idx + } + } + } + + if !hasMeta { + return ErrorSyncMetaNotFound + } + if !hasIndex { + return errors.Wrap(ErrorSyncMetaIncomplete, "no index file in meta") + } + if numChunkFiles == 0 { + return errors.Wrap(ErrorSyncMetaIncomplete, "no chunk files in meta") + } + if numChunkFiles != highestChunkFile { + return errors.Wrap(ErrorSyncMetaIncomplete, "incomplete chunk files in meta") + } + return nil +} + // ConcurrentLister lists block IDs by doing a top level iteration of the bucket // followed by one Exists call for each discovered block to detect partial blocks. type ConcurrentLister struct { @@ -232,10 +323,10 @@ func NewConcurrentLister(logger log.Logger, bkt objstore.InstrumentedBucketReade } } -func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { +func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]error, err error) { const concurrency = 64 - partialBlocks = make(map[ulid.ULID]bool) + partialBlocks = make(map[ulid.ULID]error) var ( metaChan = make(chan ulid.ULID, concurrency) eg, gCtx = errgroup.WithContext(ctx) @@ -250,11 +341,11 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c metaFile := path.Join(uid.String(), MetaFilename) ok, err := f.bkt.Exists(gCtx, metaFile) if err != nil { - return errors.Wrapf(err, "meta.json file exists: %v", uid) + return errors.Wrapf(err, "meta.json file exists call: %v", uid) } if !ok { mu.Lock() - partialBlocks[uid] = true + partialBlocks[uid] = errors.Wrapf(ErrorSyncMetaNotFound, "block id: %s", uid) mu.Unlock() continue } @@ -383,8 +474,9 @@ func (f *BaseFetcher) NewMetaFetcherWithMetrics(fetcherMetrics *FetcherMetrics, } var ( - ErrorSyncMetaNotFound = errors.New("meta.json not found") - ErrorSyncMetaCorrupted = errors.New("meta.json corrupted") + ErrorSyncMetaNotFound = errors.New("meta.json not found") + ErrorSyncMetaCorrupted = errors.New("meta.json corrupted") + ErrorSyncMetaIncomplete = errors.New("meta.json incomplete") ) // loadMeta returns metadata from object storage or error. @@ -435,10 +527,6 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met return nil, errors.Wrapf(ErrorSyncMetaCorrupted, "meta.json %v unmarshal: %v", metaFile, err) } - if err := sanityCheckFilesForMeta(m.Thanos.Files); err != nil { - return nil, errors.Wrapf(ErrorSyncMetaCorrupted, "meta.json %v not sane: %v", metaFile, err) - } - if m.Version != metadata.TSDBVersion1 { return nil, errors.Errorf("unexpected meta file: %s version: %d", metaFile, m.Version) } @@ -456,55 +544,15 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met return m, nil } -func sanityCheckFilesForMeta(files []metadata.File) error { - var ( - numChunkFiles int - highestChunkFile int - hasIndex bool - ) - - // Old metas might not have the Thanos.Files field yet, we dont want to mess with them - if len(files) == 0 { - return nil - } - - for _, f := range files { - if f.RelPath == "index" { - hasIndex = true - } - dir, name := path.Split(f.RelPath) - if dir == "chunks/" { - numChunkFiles++ - idx, err := strconv.Atoi(name) - if err != nil { - return errors.Wrap(err, "unexpected chunk file name") - } - if idx > highestChunkFile { - highestChunkFile = idx - } - } - } - - if !hasIndex { - return errors.New("no index file in meta") - } - if numChunkFiles == 0 { - return errors.New("no chunk files in meta") - } - if numChunkFiles != highestChunkFile { - return errors.New("incomplete chunk files in meta") - } - return nil -} - type response struct { metas map[ulid.ULID]*metadata.Meta partial map[ulid.ULID]error // If metaErr > 0 it means incomplete view, so some metas, failed to be loaded. metaErrs errutil.MultiError - noMetas float64 - corruptedMetas float64 + noMetas float64 + corruptedMetas float64 + incompleteMetas float64 } func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { @@ -555,7 +603,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { }) } - var partialBlocks map[ulid.ULID]bool + var partialBlocks map[ulid.ULID]error var err error // Workers scheduled, distribute blocks. eg.Go(func() error { @@ -569,10 +617,15 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { } mtx.Lock() - for blockULID, isPartial := range partialBlocks { - if isPartial { - resp.partial[blockULID] = errors.Errorf("block %s has no meta file", blockULID) - resp.noMetas++ + for blockULID, err := range partialBlocks { + if err != nil { + switch errors.Cause(err) { + case ErrorSyncMetaNotFound: + resp.noMetas++ + case ErrorSyncMetaIncomplete: + resp.incompleteMetas++ + } + resp.partial[blockULID] = err } } mtx.Unlock() @@ -654,6 +707,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter metrics.Synced.WithLabelValues(FailedMeta).Set(float64(len(resp.metaErrs))) metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetas) metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetas) + metrics.Synced.WithLabelValues(MetaHasIncompleteFiles).Set(resp.incompleteMetas) for _, filter := range filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. @@ -731,7 +785,7 @@ func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID] if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() { continue } - synced.WithLabelValues(timeExcludedMeta).Inc() + synced.WithLabelValues(TimeExcludedMeta).Inc() delete(metas, id) } return nil @@ -765,7 +819,7 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]* } if processedLabels, _ := relabel.Process(b.Labels(), f.relabelConfig...); processedLabels.IsEmpty() { - synced.WithLabelValues(labelExcludedMeta).Inc() + synced.WithLabelValues(LabelExcludedMeta).Inc() delete(metas, id) } } @@ -861,7 +915,7 @@ childLoop: if metas[duplicate] != nil { f.duplicateIDs = append(f.duplicateIDs, duplicate) } - synced.WithLabelValues(duplicateMeta).Inc() + synced.WithLabelValues(DuplicateMeta).Inc() delete(metas, duplicate) } f.mu.Unlock() @@ -919,7 +973,7 @@ func (r *ReplicaLabelRemover) Filter(_ context.Context, metas map[ulid.ULID]*met if _, exists := l[replicaLabel]; exists { delete(l, replicaLabel) countReplicaLabelRemoved[replicaLabel] += 1 - modified.WithLabelValues(replicaRemovedMeta).Inc() + modified.WithLabelValues(ReplicaRemovedMeta).Inc() } } if len(l) == 0 { @@ -980,7 +1034,7 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL meta.Thanos.Source != metadata.CompactorRepairSource { level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) - synced.WithLabelValues(tooFreshMeta).Inc() + synced.WithLabelValues(TooFreshMeta).Inc() delete(metas, id) } } diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 9d4af0fb84..7c74587826 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -64,6 +64,9 @@ func ULIDs(is ...int) []ulid.ULID { } func TestMetaFetcher_Fetch(t *testing.T) { + var ( + emptyBuf bytes.Buffer + ) objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -74,8 +77,8 @@ func TestMetaFetcher_Fetch(t *testing.T) { r := prometheus.NewRegistry() noopLogger := log.NewNopLogger() insBkt := objstore.WithNoopInstr(bkt) - baseBlockIDsFetcher := NewConcurrentLister(noopLogger, insBkt) - baseFetcher, err := NewBaseFetcher(noopLogger, 20, insBkt, baseBlockIDsFetcher, dir, r) + lister := NewRecursiveBlockValidatingLister(noopLogger, insBkt) + baseFetcher, err := NewBaseFetcher(noopLogger, 20, insBkt, lister, dir, r) testutil.Ok(t, err) fetcher := baseFetcher.NewMetaFetcher(r, []MetadataFilter{ @@ -83,14 +86,15 @@ func TestMetaFetcher_Fetch(t *testing.T) { }, nil) for i, tcase := range []struct { - name string - do func() - filterULID ulid.ULID - expectedMetas []ulid.ULID - expectedCorruptedMeta []ulid.ULID - expectedNoMeta []ulid.ULID - expectedFiltered int - expectedMetaErr error + name string + do func() + filterULID ulid.ULID + expectedMetas []ulid.ULID + expectedCorruptedMeta []ulid.ULID + expectedIncompleteMeta []ulid.ULID + expectedNoMeta []ulid.ULID + expectedFiltered int + expectedMetaErr error }{ { name: "empty bucket", @@ -111,14 +115,20 @@ func TestMetaFetcher_Fetch(t *testing.T) { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "index"), &emptyBuf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "chunks", "000001"), &emptyBuf)) meta.ULID = ULID(2) testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "index"), &emptyBuf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "chunks", "000001"), &emptyBuf)) meta.ULID = ULID(3) testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "index"), &emptyBuf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "chunks", "000001"), &emptyBuf)) }, expectedMetas: ULIDs(1, 2, 3), @@ -175,7 +185,10 @@ func TestMetaFetcher_Fetch(t *testing.T) { { name: "corrupted meta.json", do: func() { - testutil.Ok(t, bkt.Upload(ctx, path.Join(ULID(5).String(), MetaFilename), bytes.NewBuffer([]byte("{ not a json")))) + ulid := ULID(5) + testutil.Ok(t, bkt.Upload(ctx, path.Join(ulid.String(), MetaFilename), bytes.NewBuffer([]byte("{ not a json")))) + testutil.Ok(t, bkt.Upload(ctx, path.Join(ulid.String(), "index"), &emptyBuf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), &emptyBuf)) }, expectedMetas: ULIDs(1, 2, 3), @@ -195,6 +208,8 @@ func TestMetaFetcher_Fetch(t *testing.T) { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "index"), &emptyBuf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "chunks", "000001"), &emptyBuf)) }, expectedMetas: ULIDs(1, 3, 6), @@ -231,6 +246,8 @@ func TestMetaFetcher_Fetch(t *testing.T) { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "index"), &emptyBuf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "chunks", "000001"), &emptyBuf)) }, expectedMetas: ULIDs(1, 3, 6), @@ -253,12 +270,16 @@ func TestMetaFetcher_Fetch(t *testing.T) { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "index"), &emptyBuf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "chunks", "000001"), &emptyBuf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "chunks", "000005"), &emptyBuf)) }, - expectedMetas: ULIDs(1, 3, 6), - expectedCorruptedMeta: ULIDs(5, 8), - expectedNoMeta: ULIDs(4), - expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5), + expectedNoMeta: ULIDs(4), + expectedIncompleteMeta: ULIDs(8), + expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), }, { name: "error: no index", @@ -271,12 +292,14 @@ func TestMetaFetcher_Fetch(t *testing.T) { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "chunks", "000001"), &emptyBuf)) }, - expectedMetas: ULIDs(1, 3, 6), - expectedCorruptedMeta: ULIDs(5, 8, 9), - expectedNoMeta: ULIDs(4), - expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5), + expectedIncompleteMeta: ULIDs(8, 9), + expectedNoMeta: ULIDs(4), + expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), }, { name: "error: no chunks", @@ -289,12 +312,14 @@ func TestMetaFetcher_Fetch(t *testing.T) { var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), "index"), &emptyBuf)) }, - expectedMetas: ULIDs(1, 3, 6), - expectedCorruptedMeta: ULIDs(5, 8, 9, 10), - expectedNoMeta: ULIDs(4), - expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5), + expectedNoMeta: ULIDs(4), + expectedIncompleteMeta: ULIDs(8, 9, 10), + expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), }, } { if ok := t.Run(tcase.name, func(t *testing.T) { @@ -332,6 +357,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { }) expected := append([]ulid.ULID{}, tcase.expectedCorruptedMeta...) expected = append(expected, tcase.expectedNoMeta...) + expected = append(expected, tcase.expectedIncompleteMeta...) sort.Slice(expected, func(i, j int) bool { return expected[i].Compare(expected[j]) >= 0 }) @@ -346,11 +372,12 @@ func TestMetaFetcher_Fetch(t *testing.T) { testutil.Equals(t, float64(i+1), promtest.ToFloat64(fetcher.metrics.Syncs)) testutil.Equals(t, float64(len(tcase.expectedMetas)), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(LoadedMeta))) testutil.Equals(t, float64(len(tcase.expectedNoMeta)), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(NoMeta))) + testutil.Equals(t, float64(len(tcase.expectedIncompleteMeta)), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(MetaHasIncompleteFiles))) testutil.Equals(t, float64(tcase.expectedFiltered), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues("filtered"))) - testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(labelExcludedMeta))) - testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(timeExcludedMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(LabelExcludedMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(TimeExcludedMeta))) testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(FailedMeta))) - testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(tooFreshMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(TooFreshMeta))) }); !ok { return } @@ -418,7 +445,7 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { m := newTestFetcherMetrics() testutil.Ok(t, f.Filter(ctx, input, m.Synced, nil)) - testutil.Equals(t, 3.0, promtest.ToFloat64(m.Synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, 3.0, promtest.ToFloat64(m.Synced.WithLabelValues(LabelExcludedMeta))) testutil.Equals(t, expected, input) } @@ -517,7 +544,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { testutil.Ok(t, f.Filter(ctx, input, m.Synced, nil)) testutil.Equals(t, expected, input) - testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.Synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.Synced.WithLabelValues(LabelExcludedMeta))) }) @@ -580,7 +607,7 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) { m := newTestFetcherMetrics() testutil.Ok(t, f.Filter(ctx, input, m.Synced, nil)) - testutil.Equals(t, 2.0, promtest.ToFloat64(m.Synced.WithLabelValues(timeExcludedMeta))) + testutil.Equals(t, 2.0, promtest.ToFloat64(m.Synced.WithLabelValues(TimeExcludedMeta))) testutil.Equals(t, expected, input) } @@ -931,7 +958,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) { } testutil.Ok(t, f.Filter(ctx, metas, m.Synced, nil)) compareSliceWithMapKeys(t, metas, tcase.expected) - testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.Synced.WithLabelValues(duplicateMeta))) + testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.Synced.WithLabelValues(DuplicateMeta))) }); !ok { return } @@ -1000,7 +1027,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) { m := newTestFetcherMetrics() testutil.Ok(t, tcase.replicaLabelRemover.Filter(ctx, tcase.input, nil, m.Modified)) - testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.Modified.WithLabelValues(replicaRemovedMeta))) + testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.Modified.WithLabelValues(ReplicaRemovedMeta))) testutil.Equals(t, tcase.expected, tcase.input) } } @@ -1105,7 +1132,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { testutil.Equals(t, map[string]float64{"consistency_delay_seconds{}": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) testutil.Ok(t, f.Filter(ctx, input, m.Synced, nil)) - testutil.Equals(t, 0.0, promtest.ToFloat64(m.Synced.WithLabelValues(tooFreshMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(m.Synced.WithLabelValues(TooFreshMeta))) testutil.Equals(t, expected, input) }) @@ -1130,7 +1157,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { testutil.Equals(t, map[string]float64{"consistency_delay_seconds{}": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) testutil.Ok(t, f.Filter(ctx, input, m.Synced, nil)) - testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.Synced.WithLabelValues(tooFreshMeta))) + testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.Synced.WithLabelValues(TooFreshMeta))) testutil.Equals(t, expected, input) }) }