Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment files #3261

Merged
merged 9 commits into from Oct 2, 2020
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

- [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before.

## [v0.16.0](https://github.com/thanos-io/thanos/releases) - Release in progress

### Fixed
Expand Down
18 changes: 18 additions & 0 deletions pkg/block/block.go
Expand Up @@ -222,3 +222,21 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
}

// GetSegmentFiles returns list of segment files for given block. Paths are relative to the chunks directory.
// In case of errors, nil is returned.
func GetSegmentFiles(blockDir string) []string {
chunksDir := filepath.Join(blockDir, ChunksDirname)

files, err := ioutil.ReadDir(chunksDir)
if err != nil {
return nil
}

// ReadDir returns files in sorted order already.
var result []string
for _, f := range files {
result = append(result, f.Name())
}
return result
}
1 change: 1 addition & 0 deletions pkg/block/index.go
Expand Up @@ -302,6 +302,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT
if err := rewrite(logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil {
return resid, errors.Wrap(err, "rewrite block")
}
resmeta.Thanos.SegmentFiles = GetSegmentFiles(resdir)
if err := metadata.Write(logger, resdir, &resmeta); err != nil {
return resid, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/metadata/meta.go
Expand Up @@ -59,6 +59,9 @@ type Thanos struct {

// Source is a real upload source of the block.
Source SourceType `json:"source"`

// List of segment files (in chunks directory), in sorted order. Optional.
SegmentFiles []string `json:"segment_files,omitempty"`
}

type ThanosDownsample struct {
Expand Down
7 changes: 4 additions & 3 deletions pkg/compact/compact.go
Expand Up @@ -782,9 +782,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
index := filepath.Join(bdir, block.IndexFilename)

newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
Source: metadata.CompactorSource,
Labels: cg.labels.Map(),
Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
Source: metadata.CompactorSource,
SegmentFiles: block.GetSegmentFiles(bdir),
}, nil)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir)
Expand Down
2 changes: 2 additions & 0 deletions pkg/compact/compact_e2e_test.go
Expand Up @@ -368,6 +368,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
// Check thanos meta.
testutil.Assert(t, labels.Equal(extLabels, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match")
testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution)
testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set")
}
{
meta, ok := others[defaultGroupKey(124, extLabels2)]
Expand All @@ -383,6 +384,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
// Check thanos meta.
testutil.Assert(t, labels.Equal(extLabels2, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match")
testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution)
testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set")
}
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/compact/downsample/streamed_block_writer.go
Expand Up @@ -203,6 +203,7 @@ func (w *streamedBlockWriter) syncDir() (err error) {
func (w *streamedBlockWriter) writeMetaFile() error {
w.meta.Version = metadata.MetaVersion1
w.meta.Thanos.Source = metadata.CompactorSource
w.meta.Thanos.SegmentFiles = block.GetSegmentFiles(w.blockDir)
w.meta.Stats.NumChunks = w.totalChunks
w.meta.Stats.NumSamples = w.totalSamples
w.meta.Stats.NumSeries = w.seriesRefs
Expand Down
1 change: 1 addition & 0 deletions pkg/objstore/objstore.go
Expand Up @@ -67,6 +67,7 @@ type InstrumentedBucket interface {
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.
// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error) error

// Get returns a reader for the given object name.
Expand Down
1 change: 1 addition & 0 deletions pkg/shipper/shipper.go
Expand Up @@ -358,6 +358,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
meta.Thanos.Labels = lset.Map()
}
meta.Thanos.Source = s.source
meta.Thanos.SegmentFiles = block.GetSegmentFiles(updir)
if err := metadata.Write(s.logger, updir, meta); err != nil {
return errors.Wrap(err, "write meta file")
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/shipper/shipper_e2e_test.go
Expand Up @@ -141,6 +141,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {

// The external labels must be attached to the meta file on upload.
meta.Thanos.Labels = extLset.Map()
meta.Thanos.SegmentFiles = []string{"0001", "0002"}

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
Expand Down Expand Up @@ -293,6 +294,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {

// The external labels must be attached to the meta file on upload.
meta.Thanos.Labels = extLset.Map()
meta.Thanos.SegmentFiles = []string{"0001", "0002"}

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
Expand Down
49 changes: 49 additions & 0 deletions pkg/shipper/shipper_test.go
Expand Up @@ -4,18 +4,24 @@
package shipper

import (
"context"
"io/ioutil"
"math"
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"testing"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -167,3 +173,46 @@ func BenchmarkIterBlockMetas(b *testing.B) {
_, err = shipper.blockMetasFromOldest()
testutil.Ok(b, err)
}

func TestShipperAddsSegmentFiles(t *testing.T) {
dir, err := ioutil.TempDir("", "shipper-test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

inmemory := objstore.NewInMemBucket()

lbls := []labels.Label{{Name: "test", Value: "test"}}
s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, false, false)

id := ulid.MustNew(1, nil)
blockDir := path.Join(dir, id.String())
chunksDir := path.Join(blockDir, block.ChunksDirname)
testutil.Ok(t, os.MkdirAll(chunksDir, os.ModePerm))

// Prepare minimal "block" for shipper (meta.json, index, one segment file).
testutil.Ok(t, metadata.Write(log.NewNopLogger(), path.Join(dir, id.String()), &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: id,
MaxTime: 2000,
MinTime: 1000,
Version: 1,
Stats: tsdb.BlockStats{
NumSamples: 1000, // Not really, but shipper needs nonzero value.
},
},
}))
testutil.Ok(t, ioutil.WriteFile(filepath.Join(blockDir, "index"), []byte("index file"), 0666))
segmentFile := "00001"
testutil.Ok(t, ioutil.WriteFile(filepath.Join(chunksDir, segmentFile), []byte("hello world"), 0666))

uploaded, err := s.Sync(context.Background())
testutil.Ok(t, err)
testutil.Equals(t, 1, uploaded)

meta, err := block.DownloadMeta(context.Background(), log.NewNopLogger(), inmemory, id)
testutil.Ok(t, err)

testutil.Equals(t, []string{segmentFile}, meta.Thanos.SegmentFiles)
}
17 changes: 16 additions & 1 deletion pkg/store/bucket.go
Expand Up @@ -1330,7 +1330,17 @@ func newBucketBlock(
})
sort.Sort(b.relabelLabels)

// Get object handles for all chunk files.
// Get object handles for all chunk files (segment files) from meta.json, if available.
if len(meta.Thanos.SegmentFiles) > 0 {
b.chunkObjs = make([]string, 0, len(meta.Thanos.SegmentFiles))

for _, sf := range meta.Thanos.SegmentFiles {
b.chunkObjs = append(b.chunkObjs, path.Join(meta.ULID.String(), block.ChunksDirname, sf))
}
return b, nil
}

// Get object handles for all chunk files from storage.
if err = bkt.Iter(ctx, path.Join(meta.ULID.String(), block.ChunksDirname), func(n string) error {
b.chunkObjs = append(b.chunkObjs, n)
return nil
Expand Down Expand Up @@ -1367,6 +1377,11 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i
if err != nil {
return nil, errors.Wrap(err, "allocate chunk bytes")
}

if seq < 0 || seq >= len(b.chunkObjs) {
return nil, errors.Errorf("unknown segment file for index %d", seq)
}

buf := bytes.NewBuffer(*c)

r, err := b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
Expand Down
3 changes: 2 additions & 1 deletion pkg/testutil/e2eutil/prometheus.go
Expand Up @@ -473,7 +473,8 @@ func createBlock(
return id, errors.Errorf("nothing to write, asked for %d samples", numSamples)
}

if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{
blockDir := filepath.Join(dir, id.String())
if _, err = metadata.InjectThanos(log.NewNopLogger(), blockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: resolution},
Source: metadata.TestSource,
Expand Down