forked from grafana/loki
/
single_file_index.go
135 lines (112 loc) · 3.31 KB
/
single_file_index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package tsdb
import (
"context"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/pao214/loki/v2/pkg/storage/tsdb/index"
)
// nolint
type TSDBIndex struct {
reader IndexReader
}
func NewTSDBIndex(reader IndexReader) *TSDBIndex {
return &TSDBIndex{
reader: reader,
}
}
func (i *TSDBIndex) Bounds() (model.Time, model.Time) {
from, through := i.reader.Bounds()
return model.Time(from), model.Time(through)
}
// fn must NOT capture it's arguments. They're reused across series iterations and returned to
// a pool after completion.
func (i *TSDBIndex) forSeries(
shard *index.ShardAnnotation,
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta),
matchers ...*labels.Matcher,
) error {
p, err := PostingsForMatchers(i.reader, shard, matchers...)
if err != nil {
return err
}
var ls labels.Labels
chks := chunkMetasPool.Get()
defer chunkMetasPool.Put(chks)
for p.Next() {
hash, err := i.reader.Series(p.At(), &ls, &chks)
if err != nil {
return err
}
// skip series that belong to different shards
if shard != nil && !shard.Match(model.Fingerprint(hash)) {
continue
}
fn(ls, model.Fingerprint(hash), chks)
}
return p.Err()
}
func (i *TSDBIndex) GetChunkRefs(_ context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
queryBounds := newBounds(from, through)
if res == nil {
res = ChunkRefsPool.Get()
}
res = res[:0]
if err := i.forSeries(shard,
func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
// TODO(owen-d): use logarithmic approach
for _, chk := range chks {
// current chunk is outside the range of this request
if !Overlap(queryBounds, chk) {
continue
}
res = append(res, ChunkRef{
User: userID, // assumed to be the same, will be enforced by caller.
Fingerprint: fp,
Start: chk.From(),
End: chk.Through(),
Checksum: chk.Checksum,
})
}
},
matchers...); err != nil {
return nil, err
}
return res, nil
}
func (i *TSDBIndex) Series(_ context.Context, _ string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
queryBounds := newBounds(from, through)
if res == nil {
res = SeriesPool.Get()
}
res = res[:0]
if err := i.forSeries(shard,
func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
// TODO(owen-d): use logarithmic approach
for _, chk := range chks {
if Overlap(queryBounds, chk) {
// this series has at least one chunk in the desired range
res = append(res, Series{
Labels: ls.Copy(),
Fingerprint: fp,
})
break
}
}
},
matchers...); err != nil {
return nil, err
}
return res, nil
}
func (i *TSDBIndex) LabelNames(_ context.Context, _ string, _, _ model.Time, matchers ...*labels.Matcher) ([]string, error) {
if len(matchers) == 0 {
return i.reader.LabelNames()
}
return labelNamesWithMatchers(i.reader, matchers...)
}
func (i *TSDBIndex) LabelValues(_ context.Context, _ string, _, _ model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
if len(matchers) == 0 {
return i.reader.LabelValues(name)
}
return labelValuesWithMatchers(i.reader, name, matchers...)
}