Skip to content

Commit

Permalink
Segment files (#3261)
Browse files Browse the repository at this point in the history
* Added SegmentFiles field to Thanos metadata.

Filled by components that write meta.json (shipper, compactor,
repair, ...)

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Use segment files if present in meta.json file.
Don't panic if segment index is incorrect.
Added comment to Iter about sorted results.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Add comment about not using empty segment files field.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* CHANGELOG.md entry

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Removed injection of SegmentFiles in e2eutil/prometheus because it calling block.GetSegmentFiles creates an import cycle.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix compilation error.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Add SegmentFiles to shipped meta.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Add SegmentFiles to shipped meta.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Review feedback.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany committed Oct 2, 2020
1 parent e4941a5 commit 358e526
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

- [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before.

## [v0.16.0](https://github.com/thanos-io/thanos/releases) - Release in progress

### Fixed
Expand Down
18 changes: 18 additions & 0 deletions pkg/block/block.go
Expand Up @@ -222,3 +222,21 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
}

// GetSegmentFiles returns list of segment files for given block. Paths are relative to the chunks directory.
// In case of errors, nil is returned.
func GetSegmentFiles(blockDir string) []string {
chunksDir := filepath.Join(blockDir, ChunksDirname)

files, err := ioutil.ReadDir(chunksDir)
if err != nil {
return nil
}

// ReadDir returns files in sorted order already.
var result []string
for _, f := range files {
result = append(result, f.Name())
}
return result
}
1 change: 1 addition & 0 deletions pkg/block/index.go
Expand Up @@ -302,6 +302,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT
if err := rewrite(logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil {
return resid, errors.Wrap(err, "rewrite block")
}
resmeta.Thanos.SegmentFiles = GetSegmentFiles(resdir)
if err := metadata.Write(logger, resdir, &resmeta); err != nil {
return resid, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/metadata/meta.go
Expand Up @@ -59,6 +59,9 @@ type Thanos struct {

// Source is a real upload source of the block.
Source SourceType `json:"source"`

// List of segment files (in chunks directory), in sorted order. Optional.
SegmentFiles []string `json:"segment_files,omitempty"`
}

type ThanosDownsample struct {
Expand Down
7 changes: 4 additions & 3 deletions pkg/compact/compact.go
Expand Up @@ -782,9 +782,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
index := filepath.Join(bdir, block.IndexFilename)

newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
Source: metadata.CompactorSource,
Labels: cg.labels.Map(),
Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
Source: metadata.CompactorSource,
SegmentFiles: block.GetSegmentFiles(bdir),
}, nil)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir)
Expand Down
2 changes: 2 additions & 0 deletions pkg/compact/compact_e2e_test.go
Expand Up @@ -368,6 +368,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
// Check thanos meta.
testutil.Assert(t, labels.Equal(extLabels, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match")
testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution)
testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set")
}
{
meta, ok := others[defaultGroupKey(124, extLabels2)]
Expand All @@ -383,6 +384,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
// Check thanos meta.
testutil.Assert(t, labels.Equal(extLabels2, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match")
testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution)
testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set")
}
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/compact/downsample/streamed_block_writer.go
Expand Up @@ -203,6 +203,7 @@ func (w *streamedBlockWriter) syncDir() (err error) {
func (w *streamedBlockWriter) writeMetaFile() error {
w.meta.Version = metadata.MetaVersion1
w.meta.Thanos.Source = metadata.CompactorSource
w.meta.Thanos.SegmentFiles = block.GetSegmentFiles(w.blockDir)
w.meta.Stats.NumChunks = w.totalChunks
w.meta.Stats.NumSamples = w.totalSamples
w.meta.Stats.NumSeries = w.seriesRefs
Expand Down
1 change: 1 addition & 0 deletions pkg/objstore/objstore.go
Expand Up @@ -67,6 +67,7 @@ type InstrumentedBucket interface {
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.
// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error) error

// Get returns a reader for the given object name.
Expand Down
1 change: 1 addition & 0 deletions pkg/shipper/shipper.go
Expand Up @@ -358,6 +358,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
meta.Thanos.Labels = lset.Map()
}
meta.Thanos.Source = s.source
meta.Thanos.SegmentFiles = block.GetSegmentFiles(updir)
if err := metadata.Write(s.logger, updir, meta); err != nil {
return errors.Wrap(err, "write meta file")
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/shipper/shipper_e2e_test.go
Expand Up @@ -141,6 +141,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {

// The external labels must be attached to the meta file on upload.
meta.Thanos.Labels = extLset.Map()
meta.Thanos.SegmentFiles = []string{"0001", "0002"}

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
Expand Down Expand Up @@ -293,6 +294,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {

// The external labels must be attached to the meta file on upload.
meta.Thanos.Labels = extLset.Map()
meta.Thanos.SegmentFiles = []string{"0001", "0002"}

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
Expand Down
49 changes: 49 additions & 0 deletions pkg/shipper/shipper_test.go
Expand Up @@ -4,18 +4,24 @@
package shipper

import (
"context"
"io/ioutil"
"math"
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"testing"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -167,3 +173,46 @@ func BenchmarkIterBlockMetas(b *testing.B) {
_, err = shipper.blockMetasFromOldest()
testutil.Ok(b, err)
}

func TestShipperAddsSegmentFiles(t *testing.T) {
dir, err := ioutil.TempDir("", "shipper-test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

inmemory := objstore.NewInMemBucket()

lbls := []labels.Label{{Name: "test", Value: "test"}}
s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, false, false)

id := ulid.MustNew(1, nil)
blockDir := path.Join(dir, id.String())
chunksDir := path.Join(blockDir, block.ChunksDirname)
testutil.Ok(t, os.MkdirAll(chunksDir, os.ModePerm))

// Prepare minimal "block" for shipper (meta.json, index, one segment file).
testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id.String()), &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id,
MaxTime: 2000,
MinTime: 1000,
Version: 1,
Stats: tsdb.BlockStats{
NumSamples: 1000, // Not really, but shipper needs nonzero value.
},
},
}))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(blockDir, "index"), []byte("index file"), 0666))
segmentFile := "00001"
testutil.Ok(t, ioutil.WriteFile(filepath.Join(chunksDir, segmentFile), []byte("hello world"), 0666))

uploaded, err := s.Sync(context.Background())
testutil.Ok(t, err)
testutil.Equals(t, 1, uploaded)

meta, err := block.DownloadMeta(context.Background(), log.NewNopLogger(), inmemory, id)
testutil.Ok(t, err)

testutil.Equals(t, []string{segmentFile}, meta.Thanos.SegmentFiles)
}
17 changes: 16 additions & 1 deletion pkg/store/bucket.go
Expand Up @@ -1330,7 +1330,17 @@ func newBucketBlock(
})
sort.Sort(b.relabelLabels)

// Get object handles for all chunk files.
// Get object handles for all chunk files (segment files) from meta.json, if available.
if len(meta.Thanos.SegmentFiles) > 0 {
b.chunkObjs = make([]string, 0, len(meta.Thanos.SegmentFiles))

for _, sf := range meta.Thanos.SegmentFiles {
b.chunkObjs = append(b.chunkObjs, path.Join(meta.ULID.String(), block.ChunksDirname, sf))
}
return b, nil
}

// Get object handles for all chunk files from storage.
if err = bkt.Iter(ctx, path.Join(meta.ULID.String(), block.ChunksDirname), func(n string) error {
b.chunkObjs = append(b.chunkObjs, n)
return nil
Expand Down Expand Up @@ -1367,6 +1377,11 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i
if err != nil {
return nil, errors.Wrap(err, "allocate chunk bytes")
}

if seq < 0 || seq >= len(b.chunkObjs) {
return nil, errors.Errorf("unknown segment file for index %d", seq)
}

buf := bytes.NewBuffer(*c)

r, err := b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
Expand Down
3 changes: 2 additions & 1 deletion pkg/testutil/e2eutil/prometheus.go
Expand Up @@ -473,7 +473,8 @@ func createBlock(
return id, errors.Errorf("nothing to write, asked for %d samples", numSamples)
}

if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{
blockDir := filepath.Join(dir, id.String())
if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: resolution},
Source: metadata.TestSource,
Expand Down

0 comments on commit 358e526

Please sign in to comment.