From 358e5263bcb956eea6aaf72c11927fcb09ee39f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Fri, 2 Oct 2020 10:23:41 +0200 Subject: [PATCH] Segment files (#3261) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added SegmentFiles field to Thanos metadata. Filled by components that write meta.json (shipper, compactor, repair, ...) Signed-off-by: Peter Štibraný * Use segment files if present in meta.json file. Don't panic if segment index is incorrect. Added comment to Iter about sorted results. Signed-off-by: Peter Štibraný * Add comment about not using empty segment files field. Signed-off-by: Peter Štibraný * CHANGELOG.md entry Signed-off-by: Peter Štibraný * Removed injection of SegmentFiles in e2eutil/prometheus because it calling block.GetSegmentFiles creates an import cycle. Signed-off-by: Peter Štibraný * Fix compilation error. Signed-off-by: Peter Štibraný * Add SegmentFiles to shipped meta. Signed-off-by: Peter Štibraný * Add SegmentFiles to shipped meta. Signed-off-by: Peter Štibraný * Review feedback. Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 + pkg/block/block.go | 18 +++++++ pkg/block/index.go | 1 + pkg/block/metadata/meta.go | 3 ++ pkg/compact/compact.go | 7 +-- pkg/compact/compact_e2e_test.go | 2 + .../downsample/streamed_block_writer.go | 1 + pkg/objstore/objstore.go | 1 + pkg/shipper/shipper.go | 1 + pkg/shipper/shipper_e2e_test.go | 2 + pkg/shipper/shipper_test.go | 49 +++++++++++++++++++ pkg/store/bucket.go | 17 ++++++- pkg/testutil/e2eutil/prometheus.go | 3 +- 13 files changed, 102 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94abc0e0e6..b249dd62ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased +- [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before. + ## [v0.16.0](https://github.com/thanos-io/thanos/releases) - Release in progress ### Fixed diff --git a/pkg/block/block.go b/pkg/block/block.go index b2492fbf39..550ebc351f 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -222,3 +222,21 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil } + +// GetSegmentFiles returns list of segment files for given block. Paths are relative to the chunks directory. +// In case of errors, nil is returned. +func GetSegmentFiles(blockDir string) []string { + chunksDir := filepath.Join(blockDir, ChunksDirname) + + files, err := ioutil.ReadDir(chunksDir) + if err != nil { + return nil + } + + // ReadDir returns files in sorted order already. + var result []string + for _, f := range files { + result = append(result, f.Name()) + } + return result +} diff --git a/pkg/block/index.go b/pkg/block/index.go index 2551aacd9c..41ac09dad7 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -302,6 +302,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT if err := rewrite(logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { return resid, errors.Wrap(err, "rewrite block") } + resmeta.Thanos.SegmentFiles = GetSegmentFiles(resdir) if err := metadata.Write(logger, resdir, &resmeta); err != nil { return resid, err } diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index b57b57722f..c361b703e7 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -59,6 +59,9 @@ type Thanos struct { // Source is a real upload source of the block. Source SourceType `json:"source"` + + // List of segment files (in chunks directory), in sorted order. Optional. + SegmentFiles []string `json:"segment_files,omitempty"` } type ThanosDownsample struct { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 562a4ca336..74e971d39a 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -782,9 +782,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( index := filepath.Join(bdir, block.IndexFilename) newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ - Labels: cg.labels.Map(), - Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, - Source: metadata.CompactorSource, + Labels: cg.labels.Map(), + Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, + Source: metadata.CompactorSource, + SegmentFiles: block.GetSegmentFiles(bdir), }, nil) if err != nil { return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 8effd4f7bb..478d0364ff 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -368,6 +368,7 @@ func TestGroup_Compact_e2e(t *testing.T) { // Check thanos meta. testutil.Assert(t, labels.Equal(extLabels, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match") testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution) + testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set") } { meta, ok := others[defaultGroupKey(124, extLabels2)] @@ -383,6 +384,7 @@ func TestGroup_Compact_e2e(t *testing.T) { // Check thanos meta. testutil.Assert(t, labels.Equal(extLabels2, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match") testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution) + testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set") } }) } diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 8ea34794bb..8a7a3f4f48 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -203,6 +203,7 @@ func (w *streamedBlockWriter) syncDir() (err error) { func (w *streamedBlockWriter) writeMetaFile() error { w.meta.Version = metadata.MetaVersion1 w.meta.Thanos.Source = metadata.CompactorSource + w.meta.Thanos.SegmentFiles = block.GetSegmentFiles(w.blockDir) w.meta.Stats.NumChunks = w.totalChunks w.meta.Stats.NumSamples = w.totalSamples w.meta.Stats.NumSeries = w.seriesRefs diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 61db70b2f7..cdab0925a6 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -67,6 +67,7 @@ type InstrumentedBucket interface { type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. + // Entries are passed to function in sorted order. Iter(ctx context.Context, dir string, f func(string) error) error // Get returns a reader for the given object name. diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 30496e43c0..6804f0f2e4 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -358,6 +358,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { meta.Thanos.Labels = lset.Map() } meta.Thanos.Source = s.source + meta.Thanos.SegmentFiles = block.GetSegmentFiles(updir) if err := metadata.Write(s.logger, updir, meta); err != nil { return errors.Wrap(err, "write meta file") } diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index e0383e4432..6ce7620cc8 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -141,6 +141,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { // The external labels must be attached to the meta file on upload. meta.Thanos.Labels = extLset.Map() + meta.Thanos.SegmentFiles = []string{"0001", "0002"} var buf bytes.Buffer enc := json.NewEncoder(&buf) @@ -293,6 +294,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { // The external labels must be attached to the meta file on upload. meta.Thanos.Labels = extLset.Map() + meta.Thanos.SegmentFiles = []string{"0001", "0002"} var buf bytes.Buffer enc := json.NewEncoder(&buf) diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 59c564df3c..b262dd0626 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -4,18 +4,24 @@ package shipper import ( + "context" "io/ioutil" "math" "math/rand" "os" "path" + "path/filepath" "sort" "testing" "github.com/go-kit/kit/log" "github.com/oklog/ulid" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" + + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -167,3 +173,46 @@ func BenchmarkIterBlockMetas(b *testing.B) { _, err = shipper.blockMetasFromOldest() testutil.Ok(b, err) } + +func TestShipperAddsSegmentFiles(t *testing.T) { + dir, err := ioutil.TempDir("", "shipper-test") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + inmemory := objstore.NewInMemBucket() + + lbls := []labels.Label{{Name: "test", Value: "test"}} + s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, false, false) + + id := ulid.MustNew(1, nil) + blockDir := path.Join(dir, id.String()) + chunksDir := path.Join(blockDir, block.ChunksDirname) + testutil.Ok(t, os.MkdirAll(chunksDir, os.ModePerm)) + + // Prepare minimal "block" for shipper (meta.json, index, one segment file). + testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id.String()), &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: id, + MaxTime: 2000, + MinTime: 1000, + Version: 1, + Stats: tsdb.BlockStats{ + NumSamples: 1000, // Not really, but shipper needs nonzero value. + }, + }, + })) + testutil.Ok(t, ioutil.WriteFile(filepath.Join(blockDir, "index"), []byte("index file"), 0666)) + segmentFile := "00001" + testutil.Ok(t, ioutil.WriteFile(filepath.Join(chunksDir, segmentFile), []byte("hello world"), 0666)) + + uploaded, err := s.Sync(context.Background()) + testutil.Ok(t, err) + testutil.Equals(t, 1, uploaded) + + meta, err := block.DownloadMeta(context.Background(), log.NewNopLogger(), inmemory, id) + testutil.Ok(t, err) + + testutil.Equals(t, []string{segmentFile}, meta.Thanos.SegmentFiles) +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 61ca8fb1d9..188cbbe140 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1330,7 +1330,17 @@ func newBucketBlock( }) sort.Sort(b.relabelLabels) - // Get object handles for all chunk files. + // Get object handles for all chunk files (segment files) from meta.json, if available. + if len(meta.Thanos.SegmentFiles) > 0 { + b.chunkObjs = make([]string, 0, len(meta.Thanos.SegmentFiles)) + + for _, sf := range meta.Thanos.SegmentFiles { + b.chunkObjs = append(b.chunkObjs, path.Join(meta.ULID.String(), block.ChunksDirname, sf)) + } + return b, nil + } + + // Get object handles for all chunk files from storage. if err = bkt.Iter(ctx, path.Join(meta.ULID.String(), block.ChunksDirname), func(n string) error { b.chunkObjs = append(b.chunkObjs, n) return nil @@ -1367,6 +1377,11 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i if err != nil { return nil, errors.Wrap(err, "allocate chunk bytes") } + + if seq < 0 || seq >= len(b.chunkObjs) { + return nil, errors.Errorf("unknown segment file for index %d", seq) + } + buf := bytes.NewBuffer(*c) r, err := b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length) diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 447d65425f..046a55e656 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -473,7 +473,8 @@ func createBlock( return id, errors.Errorf("nothing to write, asked for %d samples", numSamples) } - if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{ + blockDir := filepath.Join(dir, id.String()) + if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{ Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: resolution}, Source: metadata.TestSource,