/
json_reader.go
344 lines (290 loc) · 8.78 KB
/
json_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
package indexheader
import (
"context"
"encoding/json"
"hash/crc32"
"io/ioutil"
"os"
"path/filepath"
"sort"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
)
const (
// JSONVersion1 is a enumeration of index-cache.json versions supported by Thanos.
JSONVersion1 = iota + 1
)
var (
jsonUnmarshalError = errors.New("unmarshal index cache")
)
type postingsRange struct {
Name, Value string
Start, End int64
}
type indexCache struct {
Version int
CacheVersion int
Symbols map[uint32]string
LabelValues map[string][]string
Postings []postingsRange
}
type realByteSlice []byte
func (b realByteSlice) Len() int {
return len(b)
}
func (b realByteSlice) Range(start, end int) []byte {
return b[start:end]
}
func (b realByteSlice) Sub(start, end int) index.ByteSlice {
return b[start:end]
}
// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
var castagnoliTable *crc32.Table
func init() {
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
}
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
// after the reader is closed.
func readSymbols(bs index.ByteSlice, version int, off int) ([]string, map[uint32]string, error) {
if off == 0 {
return nil, nil, nil
}
d := encoding.NewDecbufAt(bs, off, castagnoliTable)
var (
origLen = d.Len()
cnt = d.Be32int()
basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d.Len())
symbolSlice []string
symbols = map[uint32]string{}
)
if version == index.FormatV2 {
symbolSlice = make([]string, 0, cnt)
}
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
s := d.UvarintStr()
if version == index.FormatV2 {
symbolSlice = append(symbolSlice, s)
} else {
symbols[nextPos] = s
nextPos = basePos + uint32(origLen-d.Len())
}
cnt--
}
return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols")
}
func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) {
version := int(b.Range(4, 5)[0])
if version != 1 && version != 2 {
return nil, errors.Errorf("unknown index file version %d", version)
}
toc, err := index.NewTOCFromByteSlice(b)
if err != nil {
return nil, errors.Wrap(err, "read TOC")
}
symbolsV2, symbolsV1, err := readSymbols(b, version, int(toc.Symbols))
if err != nil {
return nil, errors.Wrap(err, "read symbols")
}
symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2))
for o, s := range symbolsV1 {
symbolsTable[o] = s
}
for o, s := range symbolsV2 {
symbolsTable[uint32(o)] = s
}
return symbolsTable, nil
}
// WriteJSON writes a cache file containing the first lookup stages
// for an index file.
func WriteJSON(logger log.Logger, indexFn string, fn string) error {
indexFile, err := fileutil.OpenMmapFile(indexFn)
if err != nil {
return errors.Wrapf(err, "open mmap index file %s", indexFn)
}
defer runutil.CloseWithLogOnErr(logger, indexFile, "close index cache mmap file from %s", indexFn)
b := realByteSlice(indexFile.Bytes())
indexr, err := index.NewReader(b)
if err != nil {
return errors.Wrap(err, "open index reader")
}
defer runutil.CloseWithLogOnErr(logger, indexr, "load index cache reader")
// We assume reader verified index already.
symbols, err := getSymbolTable(b)
if err != nil {
return err
}
f, err := os.Create(fn)
if err != nil {
return errors.Wrap(err, "create index cache file")
}
defer runutil.CloseWithLogOnErr(logger, f, "index cache writer")
v := indexCache{
Version: indexr.Version(),
CacheVersion: JSONVersion1,
Symbols: symbols,
LabelValues: map[string][]string{},
}
// Extract label value indices.
lnames, err := indexr.LabelNames()
if err != nil {
return errors.Wrap(err, "read label indices")
}
for _, ln := range lnames {
vals, err := indexr.LabelValues(ln)
if err != nil {
return errors.Wrap(err, "get label values")
}
v.LabelValues[ln] = vals
}
// Extract postings ranges.
pranges, err := indexr.PostingsRanges()
if err != nil {
return errors.Wrap(err, "read postings ranges")
}
for l, rng := range pranges {
v.Postings = append(v.Postings, postingsRange{
Name: l.Name,
Value: l.Value,
Start: rng.Start,
End: rng.End,
})
}
if err := json.NewEncoder(f).Encode(&v); err != nil {
return errors.Wrap(err, "encode file")
}
return nil
}
// JSONReader is a reader based on index-cache.json files.
type JSONReader struct {
indexVersion int
symbols []string
lvals map[string][]string
postings map[labels.Label]index.Range
}
func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*JSONReader, error) {
cachefn := filepath.Join(dir, id.String(), block.IndexCacheFilename)
jr, err := newJSONReaderFromFile(logger, cachefn)
if err == nil {
return jr, err
}
if !os.IsNotExist(errors.Cause(err)) && errors.Cause(err) != jsonUnmarshalError {
return nil, errors.Wrap(err, "read index cache")
}
// Try to download index cache file from object store.
if err = objstore.DownloadFile(ctx, logger, bkt, filepath.Join(id.String(), block.IndexCacheFilename), cachefn); err == nil {
return newJSONReaderFromFile(logger, cachefn)
}
if !bkt.IsObjNotFoundErr(errors.Cause(err)) && errors.Cause(err) != jsonUnmarshalError {
return nil, errors.Wrap(err, "download index cache file")
}
// No cache exists on disk yet, build it from the downloaded index and retry.
fn := filepath.Join(dir, id.String(), block.IndexFilename)
if err := objstore.DownloadFile(ctx, logger, bkt, filepath.Join(id.String(), block.IndexFilename), fn); err != nil {
return nil, errors.Wrap(err, "download index file")
}
defer func() {
if rerr := os.Remove(fn); rerr != nil {
level.Error(logger).Log("msg", "failed to remove temp index file", "path", fn, "err", rerr)
}
}()
if err := WriteJSON(logger, fn, cachefn); err != nil {
return nil, errors.Wrap(err, "write index cache")
}
return newJSONReaderFromFile(logger, cachefn)
}
// ReadJSON reads an index cache file.
func newJSONReaderFromFile(logger log.Logger, fn string) (*JSONReader, error) {
f, err := os.Open(fn)
if err != nil {
return nil, errors.Wrap(err, "open file")
}
defer runutil.CloseWithLogOnErr(logger, f, "index reader")
var v indexCache
bytes, err := ioutil.ReadFile(fn)
if err != nil {
return nil, errors.Wrap(err, "read file")
}
if err = json.Unmarshal(bytes, &v); err != nil {
return nil, errors.Wrap(jsonUnmarshalError, err.Error())
}
strs := map[string]string{}
var maxSymbolID uint32
for o := range v.Symbols {
if o > maxSymbolID {
maxSymbolID = o
}
}
jr := &JSONReader{
indexVersion: v.Version,
lvals: make(map[string][]string, len(v.LabelValues)),
postings: make(map[labels.Label]index.Range, len(v.Postings)),
symbols: make([]string, maxSymbolID+1),
}
// Most strings we encounter are duplicates. Dedup string objects that we keep
// around after the function returns to reduce total memory usage.
// NOTE(fabxc): it could even make sense to deduplicate globally.
getStr := func(s string) string {
if cs, ok := strs[s]; ok {
return cs
}
strs[s] = s
return s
}
for o, s := range v.Symbols {
jr.symbols[o] = getStr(s)
}
for ln, vals := range v.LabelValues {
for i := range vals {
vals[i] = getStr(vals[i])
}
jr.lvals[getStr(ln)] = vals
}
for _, e := range v.Postings {
l := labels.Label{
Name: getStr(e.Name),
Value: getStr(e.Value),
}
jr.postings[l] = index.Range{Start: e.Start, End: e.End}
}
return jr, nil
}
func (r *JSONReader) IndexVersion() int {
return r.indexVersion
}
func (r *JSONReader) LookupSymbol(o uint32) (string, error) {
idx := int(o)
if idx >= len(r.symbols) {
return "", errors.Errorf("bucketIndexReader: unknown symbol offset %d", o)
}
return r.symbols[idx], nil
}
func (r *JSONReader) PostingsOffset(name, value string) index.Range {
return r.postings[labels.Label{Name: name, Value: value}]
}
// LabelValues returns label values for single name.
func (r *JSONReader) LabelValues(name string) []string {
res := make([]string, 0, len(r.lvals[name]))
return append(res, r.lvals[name]...)
}
// LabelNames returns a list of label names.
func (r *JSONReader) LabelNames() []string {
res := make([]string, 0, len(r.lvals))
for ln := range r.lvals {
res = append(res, ln)
}
sort.Strings(res)
return res
}