diff --git a/CHANGELOG.md b/CHANGELOG.md index 05b3e5f400d..134714a9c69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Compactor now properly handles partial block uploads for all operation like rete * Removed `thanos_compact_sync_meta_*` metrics. Use `thanos_blocks_meta_*` metrics instead. * Added `thanos_consistency_delay_seconds` and `thanos_compactor_aborted_partial_uploads_deletion_attempts_total` metrics. + - [#1936](https://github.com/thanos-io/thanos/pull/1936) Store: Improved synchronization of meta JSON files. Store now properly handles corrupted disk cache. Added meta.json sync metrics. - [#1856](https://github.com/thanos-io/thanos/pull/1856) Receive: close DBReadOnly after flushing to fix a memory leak. - [#1882](https://github.com/thanos-io/thanos/pull/1882) Receive: upload to object storage as 'receive' rather than 'sidecar'. @@ -35,6 +36,14 @@ Compactor now properly handles partial block uploads for all operation like rete - [#1881](https://github.com/thanos-io/thanos/pull/1881) Store Gateway: memcached support for index cache. See [documentation](docs/components/store.md/#index-cache) for further information. - [#1904](https://github.com/thanos-io/thanos/pull/1904) Add a skip-chunks option in Store Series API to improve the response time of `/api/v1/series` endpoint. +### Changed + +- [#1947](https://github.com/thanos-io/thanos/pull/1947) Upgraded Prometheus dependencies to v2.15.2. This includes: + + * Compactor: Significant reduction of memory footprint for compaction and downsampling process. + * Querier: Accepting spaces between time range and square bracket. e.g `[ 5m]` + * Querier: Improved PromQL parser performance. + ## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03 ### Added diff --git a/go.mod b/go.mod index bae87de5120..aa141722b1e 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,6 @@ require ( github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible github.com/armon/go-metrics v0.3.0 - github.com/aws/aws-sdk-go v1.25.35 // indirect github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cespare/xxhash v1.1.0 @@ -71,7 +70,7 @@ require ( github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 github.com/prometheus/common v0.7.0 github.com/prometheus/procfs v0.0.6 // indirect - github.com/prometheus/prometheus v1.8.2-0.20191126064551-80ba03c67da1 // Prometheus master v2.14.0 + github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef // master ~ v2.15.2 github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect github.com/satori/go.uuid v1.2.0 // indirect github.com/smartystreets/assertions v1.0.1 // indirect diff --git a/go.sum b/go.sum index 724168b4067..640a65ba006 100644 --- a/go.sum +++ b/go.sum @@ -83,9 +83,8 @@ github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= -github.com/aws/aws-sdk-go v1.23.12/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/aws/aws-sdk-go v1.25.35 h1:fe2tJnqty/v/50pyngKdNk/NP8PFphYDA1Z7N3EiiiE= -github.com/aws/aws-sdk-go v1.25.35/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-sdk-go v1.25.48 h1:J82DYDGZHOKHdhx6hD24Tm30c2C3GchYGfN0mf9iKUk= +github.com/aws/aws-sdk-go v1.25.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -447,8 +446,8 @@ github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDa github.com/prometheus/procfs v0.0.6 h1:0qbH+Yqu/cj1ViVLvEWCP6qMQ4efWUj6bQqOEA0V0U4= github.com/prometheus/procfs v0.0.6/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= -github.com/prometheus/prometheus v1.8.2-0.20191126064551-80ba03c67da1 h1:5ee1ewJCJYB7Bp314qaPcRNFaAPsdHN6BFzBC1wMVbQ= -github.com/prometheus/prometheus v1.8.2-0.20191126064551-80ba03c67da1/go.mod h1:PVTKYlgELGIDbIKIyWRzD4WKjnavPynGOFLSuDpvOwU= +github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef h1:pYYKXo/zGx25kyViw+Gdbxd0ItIg+vkVKpwgWUEyIc4= +github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef/go.mod h1:7U90zPoLkWjEIQcy/rweQla82OCTUzxVHE51G3OhJbI= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/pkg/block/index.go b/pkg/block/index.go index a86abb80450..0df14ded4dd 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -1,6 +1,7 @@ package block import ( + "context" "fmt" "hash/crc32" "math/rand" @@ -282,7 +283,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT } defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk writer") - indexw, err := index.NewWriter(filepath.Join(resdir, IndexFilename)) + indexw, err := index.NewWriter(context.TODO(), filepath.Join(resdir, IndexFilename)) if err != nil { return resid, errors.Wrap(err, "open index writer") } @@ -406,17 +407,19 @@ func rewrite( meta *metadata.Meta, ignoreChkFns []ignoreFnType, ) error { - symbols, err := indexr.Symbols() - if err != nil { - return err + symbols := indexr.Symbols() + for symbols.Next() { + if err := indexw.AddSymbol(symbols.At()); err != nil { + return errors.Wrap(err, "add symbol") + } } - if err := indexw.AddSymbols(symbols); err != nil { - return err + if symbols.Err() != nil { + return errors.Wrap(symbols.Err(), "next symbol") } all, err := indexr.Postings(index.AllPostingsKey()) if err != nil { - return err + return errors.Wrap(err, "postings") } all = indexr.SortedPostings(all) @@ -434,7 +437,7 @@ func rewrite( id := all.At() if err := indexr.Series(id, &lset, &chks); err != nil { - return err + return errors.Wrap(err, "series") } // Make sure labels are in sorted order. sort.Sort(lset) @@ -442,7 +445,7 @@ func rewrite( for i, c := range chks { chks[i].Chunk, err = chunkr.Chunk(c.Ref) if err != nil { - return err + return errors.Wrap(err, "chunk read") } } @@ -513,24 +516,6 @@ func rewrite( i++ lastSet = s.lset } - - s := make([]string, 0, 256) - for n, v := range values { - s = s[:0] - - for x := range v { - s = append(s, x) - } - if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return errors.Wrap(err, "write label index") - } - } - - for _, l := range postings.SortedKeys() { - if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil { - return errors.Wrap(err, "write postings") - } - } return nil } diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go new file mode 100644 index 00000000000..257005c7e21 --- /dev/null +++ b/pkg/block/index_test.go @@ -0,0 +1,83 @@ +package block + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestRewrite(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + b, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}}, + }, 150, 0, 1000, nil, 124) + testutil.Ok(t, err) + + ir, err := index.NewFileReader(filepath.Join(tmpDir, b.String(), IndexFilename)) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, ir.Close()) }() + + cr, err := chunks.NewDirReader(filepath.Join(tmpDir, b.String(), "chunks"), nil) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, cr.Close()) }() + + m := &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: ULID(1)}, + Thanos: metadata.Thanos{}, + } + + testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, m.ULID.String()), os.ModePerm)) + iw, err := index.NewWriter(ctx, filepath.Join(tmpDir, m.ULID.String(), IndexFilename)) + testutil.Ok(t, err) + defer iw.Close() + + cw, err := chunks.NewWriter(filepath.Join(tmpDir, m.ULID.String())) + testutil.Ok(t, err) + + defer cw.Close() + + testutil.Ok(t, rewrite(log.NewNopLogger(), ir, cr, iw, cw, m, []ignoreFnType{func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (bool, error) { + return curr.MaxTime == 696, nil + }})) + + testutil.Ok(t, iw.Close()) + testutil.Ok(t, cw.Close()) + + ir2, err := index.NewFileReader(filepath.Join(tmpDir, m.ULID.String(), IndexFilename)) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, ir2.Close()) }() + + all, err := ir2.Postings(index.AllPostingsKey()) + testutil.Ok(t, err) + + for p := ir2.SortedPostings(all); p.Next(); { + var lset labels.Labels + var chks []chunks.Meta + + testutil.Ok(t, ir2.Series(p.At(), &lset, &chks)) + testutil.Equals(t, 1, len(chks)) + } + +} diff --git a/pkg/block/indexheader/json_reader.go b/pkg/block/indexheader/json_reader.go index dd798588747..ee756d35ce4 100644 --- a/pkg/block/indexheader/json_reader.go +++ b/pkg/block/indexheader/json_reader.go @@ -3,6 +3,7 @@ package indexheader import ( "context" "encoding/json" + "hash/crc32" "io/ioutil" "os" "path/filepath" @@ -13,6 +14,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block" @@ -56,6 +58,50 @@ func (b realByteSlice) Sub(start, end int) index.ByteSlice { return b[start:end] } +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable *crc32.Table + +func init() { + castagnoliTable = crc32.MakeTable(crc32.Castagnoli) +} + +// ReadSymbols reads the symbol table fully into memory and allocates proper strings for them. +// Strings backed by the mmap'd memory would cause memory faults if applications keep using them +// after the reader is closed. +func readSymbols(bs index.ByteSlice, version int, off int) ([]string, map[uint32]string, error) { + if off == 0 { + return nil, nil, nil + } + d := encoding.NewDecbufAt(bs, off, castagnoliTable) + + var ( + origLen = d.Len() + cnt = d.Be32int() + basePos = uint32(off) + 4 + nextPos = basePos + uint32(origLen-d.Len()) + symbolSlice []string + symbols = map[uint32]string{} + ) + if version == index.FormatV2 { + symbolSlice = make([]string, 0, cnt) + } + + for d.Err() == nil && d.Len() > 0 && cnt > 0 { + s := d.UvarintStr() + + if version == index.FormatV2 { + symbolSlice = append(symbolSlice, s) + } else { + symbols[nextPos] = s + nextPos = basePos + uint32(origLen-d.Len()) + } + cnt-- + } + return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols") +} + func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { version := int(b.Range(4, 5)[0]) @@ -68,7 +114,7 @@ func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { return nil, errors.Wrap(err, "read TOC") } - symbolsV2, symbolsV1, err := index.ReadSymbols(b, version, int(toc.Symbols)) + symbolsV2, symbolsV1, err := readSymbols(b, version, int(toc.Symbols)) if err != nil { return nil, errors.Wrap(err, "read symbols") } @@ -120,32 +166,15 @@ func WriteJSON(logger log.Logger, indexFn string, fn string) error { } // Extract label value indices. - lnames, err := indexr.LabelIndices() + lnames, err := indexr.LabelNames() if err != nil { return errors.Wrap(err, "read label indices") } - for _, lns := range lnames { - if len(lns) != 1 { - continue - } - ln := lns[0] - - tpls, err := indexr.LabelValues(ln) + for _, ln := range lnames { + vals, err := indexr.LabelValues(ln) if err != nil { return errors.Wrap(err, "get label values") } - vals := make([]string, 0, tpls.Len()) - - for i := 0; i < tpls.Len(); i++ { - v, err := tpls.At(i) - if err != nil { - return errors.Wrap(err, "get label value") - } - if len(v) != 1 { - return errors.Errorf("unexpected tuple length %d", len(v)) - } - vals = append(vals, v[0]) - } v.LabelValues[ln] = vals } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 9a7411b2e95..926175dc689 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -192,7 +192,7 @@ func TestGroup_Compact_e2e(t *testing.T) { numSamples: 100, mint: 0, maxt: 1000, extLset: extLabels, res: 124, series: []labels.Labels{ {{Name: "a", Value: "1"}}, - {{Name: "a", Value: "2"}, {Name: "a", Value: "2"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}}, {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, }, @@ -247,7 +247,7 @@ func TestGroup_Compact_e2e(t *testing.T) { numSamples: 100, mint: 0, maxt: 1000, extLset: extLabels2, res: 124, series: []labels.Labels{ {{Name: "a", Value: "1"}}, - {{Name: "a", Value: "2"}, {Name: "a", Value: "2"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}}, {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, }, @@ -371,7 +371,7 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(prepareDir)) }() - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() for _, b := range blocks { @@ -407,7 +407,7 @@ func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, return ulid.ULID{}, errors.Wrap(err, "close index") } - w, err := index.NewWriter(path.Join(dir, uid.String(), "index")) + w, err := index.NewWriter(context.Background(), path.Join(dir, uid.String(), "index")) if err != nil { return ulid.ULID{}, errors.Wrap(err, "new index") } diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 3e0de643517..9b13acf6528 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -568,11 +568,11 @@ func (b *memBlock) Meta() tsdb.BlockMeta { return tsdb.BlockMeta{} } -func (b *memBlock) Postings(name, val string) (index.Postings, error) { +func (b *memBlock) Postings(name string, val ...string) (index.Postings, error) { allName, allVal := index.AllPostingsKey() - if name != allName || val != allVal { - return nil, errors.New("unsupported call to Postings()") + if name != allName || val[0] != allVal { + return nil, errors.New("unexpected call to Postings() that is not AllVall") } sort.Slice(b.postings, func(i, j int) bool { return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0 @@ -593,15 +593,20 @@ func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) e } func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) { - if id >= uint64(b.numberOfChunks) { + if id >= b.numberOfChunks { return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id) } return b.chunks[id], nil } -func (b *memBlock) Symbols() (map[string]struct{}, error) { - return b.symbols, nil +func (b *memBlock) Symbols() index.StringIter { + res := make([]string, 0, len(b.symbols)) + for s := range b.symbols { + res = append(res, s) + } + sort.Strings(res) + return index.NewStringListIter(res) } func (b *memBlock) SortedPostings(p index.Postings) index.Postings { diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index f5b5b841841..bde6397869d 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -1,6 +1,7 @@ package downsample import ( + "context" "io" "path/filepath" @@ -19,32 +20,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" ) -type labelValues map[string]struct{} - -func (lv labelValues) add(value string) { - lv[value] = struct{}{} -} - -func (lv labelValues) get(set *[]string) { - for value := range lv { - *set = append(*set, value) - } -} - -type labelsValues map[string]labelValues - -func (lv labelsValues) add(labelSet labels.Labels) { - for _, label := range labelSet { - values, ok := lv[label.Name] - if !ok { - // Add new label. - values = labelValues{} - lv[label.Name] = values - } - values.add(label.Value) - } -} - // streamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption // by writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be // sealed afterwards, when there aren't more series to process. @@ -62,9 +37,7 @@ type streamedBlockWriter struct { indexReader tsdb.IndexReader closers []io.Closer - labelsValues labelsValues // labelsValues list of used label sets: name -> []values. - memPostings *index.MemPostings // memPostings contains references from label name:value -> postings. - postings uint64 // postings is a current posting position. + seriesRefs uint64 // postings is a current posting position. } // NewStreamedBlockWriter returns streamedBlockWriter instance, it's not concurrency safe. @@ -100,32 +73,30 @@ func NewStreamedBlockWriter( } closers = append(closers, chunkWriter) - indexWriter, err := index.NewWriter(filepath.Join(blockDir, block.IndexFilename)) + indexWriter, err := index.NewWriter(context.TODO(), filepath.Join(blockDir, block.IndexFilename)) if err != nil { return nil, errors.Wrap(err, "open index writer in streamedBlockWriter") } closers = append(closers, indexWriter) - symbols, err := indexReader.Symbols() - if err != nil { - return nil, errors.Wrap(err, "read symbols") + symbols := indexReader.Symbols() + for symbols.Next() { + if err = indexWriter.AddSymbol(symbols.At()); err != nil { + return nil, errors.Wrap(err, "add symbols") + } } - - err = indexWriter.AddSymbols(symbols) - if err != nil { - return nil, errors.Wrap(err, "add symbols") + if err := symbols.Err(); err != nil { + return nil, errors.Wrap(err, "read symbols") } return &streamedBlockWriter{ - logger: logger, - blockDir: blockDir, - indexReader: indexReader, - indexWriter: indexWriter, - chunkWriter: chunkWriter, - meta: originMeta, - closers: closers, - labelsValues: make(labelsValues, 1024), - memPostings: index.NewUnorderedMemPostings(), + logger: logger, + blockDir: blockDir, + indexReader: indexReader, + indexWriter: indexWriter, + chunkWriter: chunkWriter, + meta: originMeta, + closers: closers, }, nil } @@ -146,14 +117,12 @@ func (w *streamedBlockWriter) WriteSeries(lset labels.Labels, chunks []chunks.Me return errors.Wrap(err, "add chunks") } - if err := w.indexWriter.AddSeries(w.postings, lset, chunks...); err != nil { + if err := w.indexWriter.AddSeries(w.seriesRefs, lset, chunks...); err != nil { w.ignoreFinalize = true return errors.Wrap(err, "add series") } - w.labelsValues.add(lset) - w.memPostings.Add(w.postings, lset) - w.postings++ + w.seriesRefs++ w.totalChunks += uint64(len(chunks)) for i := range chunks { @@ -183,14 +152,6 @@ func (w *streamedBlockWriter) Close() error { // Finalize saves prepared index and metadata to corresponding files. - if err := w.writeLabelSets(); err != nil { - return errors.Wrap(err, "write label sets") - } - - if err := w.writeMemPostings(); err != nil { - return errors.Wrap(err, "write mem postings") - } - for _, cl := range w.closers { merr.Add(cl.Close()) } @@ -243,37 +204,13 @@ func (w *streamedBlockWriter) syncDir() (err error) { return nil } -// writeLabelSets fills the index writer with label sets. -func (w *streamedBlockWriter) writeLabelSets() error { - s := make([]string, 0, 256) - for n, v := range w.labelsValues { - s = s[:0] - v.get(&s) - if err := w.indexWriter.WriteLabelIndex([]string{n}, s); err != nil { - return errors.Wrap(err, "write label index") - } - } - return nil -} - -// writeMemPostings fills the index writer with mem postings. -func (w *streamedBlockWriter) writeMemPostings() error { - w.memPostings.EnsureOrder() - for _, l := range w.memPostings.SortedKeys() { - if err := w.indexWriter.WritePostings(l.Name, l.Value, w.memPostings.Get(l.Name, l.Value)); err != nil { - return errors.Wrap(err, "write postings") - } - } - return nil -} - // writeMetaFile writes meta file. func (w *streamedBlockWriter) writeMetaFile() error { w.meta.Version = metadata.MetaVersion1 w.meta.Thanos.Source = metadata.CompactorSource w.meta.Stats.NumChunks = w.totalChunks w.meta.Stats.NumSamples = w.totalSamples - w.meta.Stats.NumSeries = w.postings + w.meta.Stats.NumSeries = w.seriesRefs return metadata.Write(w.logger, w.blockDir, &w.meta) }