From e730d58eddcd639851e9f12a1abb96a4cbeef614 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Tue, 16 Apr 2024 08:45:51 +0200 Subject: [PATCH] block: sanity check metas for common corruptions Santiy check metas that are returned from fetcher for common issues. We currently return metas that will lead to crashes in compactor and store, this PR marks them as corrupted so that they will get cleaned up and are visible in metrics. Signed-off-by: Michael Hoffmann --- CHANGELOG.md | 1 + pkg/block/fetcher.go | 48 +++++++++++++++++++++- pkg/block/fetcher_test.go | 65 +++++++++++++++++++++++++++++- pkg/shipper/shipper.go | 1 - pkg/testutil/e2eutil/prometheus.go | 30 +++++++------- 5 files changed, 127 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9fac6460aa..5d9b423ccdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7123](https://github.com/thanos-io/thanos/pull/7123) Rule: Change default Alertmanager API version to v2. - [#7223](https://github.com/thanos-io/thanos/pull/7223) Automatic detection of memory limits and configure GOMEMLIMIT to match. +- [#728s](https://github.com/thanos-io/thanos/pull/7282) Fetcher: mark metas with incomplete files as corrupted. ### Removed diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 4911b4748b7..2265df56fa5 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -11,6 +11,7 @@ import ( "path" "path/filepath" "sort" + "strconv" "strings" "sync" "time" @@ -24,10 +25,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" - "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" + "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extprom" @@ -434,6 +435,10 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met return nil, errors.Wrapf(ErrorSyncMetaCorrupted, "meta.json %v unmarshal: %v", metaFile, err) } + if err := sanityCheckFilesForMeta(m.Thanos.Files); err != nil { + return nil, errors.Wrapf(ErrorSyncMetaCorrupted, "meta.json %v not sane: %v", metaFile, err) + } + if m.Version != metadata.TSDBVersion1 { return nil, errors.Errorf("unexpected meta file: %s version: %d", metaFile, m.Version) } @@ -451,6 +456,47 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met return m, nil } +func sanityCheckFilesForMeta(files []metadata.File) error { + var ( + numChunkFiles int + highestChunkFile int + hasIndex bool + ) + + // Old metas might not have the Thanos.Files field yet, we dont want to mess with them + if len(files) == 0 { + return nil + } + + for _, f := range files { + if f.RelPath == "index" { + hasIndex = true + } + dir, name := path.Split(f.RelPath) + if dir == "chunks/" { + numChunkFiles++ + idx, err := strconv.Atoi(name) + if err != nil { + return errors.Wrap(err, "unexpected chunk file name") + } + if idx > highestChunkFile { + highestChunkFile = idx + } + } + } + + if !hasIndex { + return errors.New("no index file in meta") + } + if numChunkFiles == 0 { + return errors.New("no chunk files in meta") + } + if numChunkFiles != highestChunkFile { + return errors.New("incomplete chunk files in meta") + } + return nil +} + type response struct { metas map[ulid.ULID]*metadata.Meta partial map[ulid.ULID]error diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 5e24e265382..9d4af0fb840 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -23,11 +23,11 @@ import ( "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/objstore" - "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/objtesting" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/model" @@ -106,6 +106,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { var meta metadata.Meta meta.Version = 1 meta.ULID = ULID(1) + meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "index"}, metadata.File{RelPath: "chunks/000001"}) var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) @@ -189,6 +190,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { var meta metadata.Meta meta.Version = 1 meta.ULID = ULID(6) + meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "index"}, metadata.File{RelPath: "chunks/000001"}) var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) @@ -224,6 +226,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { var meta metadata.Meta meta.Version = 20 meta.ULID = ULID(7) + meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "index"}, metadata.File{RelPath: "chunks/000001"}) var buf bytes.Buffer testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) @@ -235,6 +238,64 @@ func TestMetaFetcher_Fetch(t *testing.T) { expectedNoMeta: ULIDs(4), expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), }, + { + name: "error: incomplete chunks", + do: func() { + var meta metadata.Meta + meta.Version = 1 + meta.ULID = ULID(8) + meta.Thanos.Files = append(meta.Thanos.Files, + metadata.File{RelPath: "index"}, + metadata.File{RelPath: "chunks/000001"}, + metadata.File{RelPath: "chunks/000005"}, + ) + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + }, + + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5, 8), + expectedNoMeta: ULIDs(4), + expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), + }, + { + name: "error: no index", + do: func() { + var meta metadata.Meta + meta.Version = 1 + meta.ULID = ULID(9) + meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "chunks/000001"}) + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + }, + + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5, 8, 9), + expectedNoMeta: ULIDs(4), + expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), + }, + { + name: "error: no chunks", + do: func() { + var meta metadata.Meta + meta.Version = 1 + meta.ULID = ULID(10) + meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "index"}) + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + }, + + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5, 8, 9, 10), + expectedNoMeta: ULIDs(4), + expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), + }, } { if ok := t.Run(tcase.name, func(t *testing.T) { tcase.do() diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 6b7be6cd12d..2d13ca7d811 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -26,7 +26,6 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/thanos-io/objstore" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 4faba4fafda..32934abff95 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -516,6 +516,7 @@ func createBlockWithDelay(ctx context.Context, dir string, series []labels.Label logger := log.NewNopLogger() m.ULID = id m.Compaction.Sources = []ulid.ULID{id} + if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil { return ulid.ULID{}, errors.Wrap(err, "write meta.json file") } @@ -622,28 +623,29 @@ func createBlock( } files := []metadata.File{} - if hashFunc != metadata.NoneFunc { - paths := []string{} - if err := filepath.Walk(blockDir, func(path string, info os.FileInfo, err error) error { - if info.IsDir() { - return nil - } - paths = append(paths, path) + paths := []string{} + if err := filepath.Walk(blockDir, func(path string, info os.FileInfo, err error) error { + if info.IsDir() { return nil - }); err != nil { - return id, errors.Wrapf(err, "walking %s", dir) } + paths = append(paths, path) + return nil + }); err != nil { + return id, errors.Wrapf(err, "walking %s", dir) + } - for _, p := range paths { + for _, p := range paths { + f := metadata.File{ + RelPath: strings.TrimPrefix(p, blockDir+"/"), + } + if hashFunc != metadata.NoneFunc { pHash, err := metadata.CalculateHash(p, metadata.SHA256Func, log.NewNopLogger()) if err != nil { return id, errors.Wrapf(err, "calculating hash of %s", blockDir+p) } - files = append(files, metadata.File{ - RelPath: strings.TrimPrefix(p, blockDir+"/"), - Hash: &pHash, - }) + f.Hash = &pHash } + files = append(files, f) } if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{