forked from grafana/loki
/
queries.go
157 lines (125 loc) · 4.02 KB
/
queries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package util
import (
"context"
"sync"
"github.com/go-kit/log/level"
"github.com/pao214/loki/v2/pkg/storage/chunk"
util_math "github.com/pao214/loki/v2/pkg/util/math"
"github.com/pao214/loki/v2/pkg/util/spanlogger"
)
const maxQueriesPerGoroutine = 100
type TableQuerier interface {
MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error
}
// QueriesByTable groups and returns queries by tables.
func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery {
queriesByTable := make(map[string][]chunk.IndexQuery)
for _, query := range queries {
if _, ok := queriesByTable[query.TableName]; !ok {
queriesByTable[query.TableName] = []chunk.IndexQuery{}
}
queriesByTable[query.TableName] = append(queriesByTable[query.TableName], query)
}
return queriesByTable
}
func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error {
if len(queries) == 0 {
return nil
}
errs := make(chan error)
id := NewIndexDeduper(callback)
defer func() {
logger := spanlogger.FromContext(ctx)
level.Debug(logger).Log("msg", "done processing index queries", "table-name", queries[0].TableName,
"query-count", len(queries), "num-entries-sent", id.numEntriesSent)
}()
if len(queries) <= maxQueriesPerGoroutine {
return tableQuerier.MultiQueries(ctx, queries, id.Callback)
}
for i := 0; i < len(queries); i += maxQueriesPerGoroutine {
q := queries[i:util_math.Min(i+maxQueriesPerGoroutine, len(queries))]
go func(queries []chunk.IndexQuery) {
errs <- tableQuerier.MultiQueries(ctx, queries, id.Callback)
}(q)
}
var lastErr error
for i := 0; i < len(queries); i += maxQueriesPerGoroutine {
err := <-errs
if err != nil {
lastErr = err
}
}
return lastErr
}
// IndexDeduper should always be used on table level not the whole query level because it just looks at range values which can be repeated across tables
// Cortex anyways dedupes entries across tables
type IndexDeduper struct {
callback chunk.QueryPagesCallback
seenRangeValues map[string]map[string]struct{}
numEntriesSent int
mtx sync.RWMutex
}
func NewIndexDeduper(callback chunk.QueryPagesCallback) *IndexDeduper {
return &IndexDeduper{
callback: callback,
seenRangeValues: map[string]map[string]struct{}{},
}
}
func (i *IndexDeduper) Callback(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
return i.callback(query, &filteringBatch{
query: query,
ReadBatch: batch,
isSeen: i.isSeen,
})
}
func (i *IndexDeduper) isSeen(hashValue string, rangeValue []byte) bool {
i.mtx.RLock()
// index entries are never modified during query processing so it should be safe to reference a byte slice as a string.
rangeValueStr := GetUnsafeString(rangeValue)
if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok {
i.mtx.RUnlock()
return true
}
i.mtx.RUnlock()
i.mtx.Lock()
defer i.mtx.Unlock()
// re-check if another concurrent call added the values already, if so do not add it again and return true
if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok {
return true
}
// add the hashValue first if missing
if _, ok := i.seenRangeValues[hashValue]; !ok {
i.seenRangeValues[hashValue] = map[string]struct{}{}
}
// add the rangeValue
i.seenRangeValues[hashValue][rangeValueStr] = struct{}{}
i.numEntriesSent++
return false
}
type isSeen func(hashValue string, rangeValue []byte) bool
type filteringBatch struct {
query chunk.IndexQuery
chunk.ReadBatch
isSeen isSeen
}
func (f *filteringBatch) Iterator() chunk.ReadBatchIterator {
return &filteringBatchIter{
query: f.query,
ReadBatchIterator: f.ReadBatch.Iterator(),
isSeen: f.isSeen,
}
}
type filteringBatchIter struct {
query chunk.IndexQuery
chunk.ReadBatchIterator
isSeen isSeen
}
func (f *filteringBatchIter) Next() bool {
for f.ReadBatchIterator.Next() {
if f.isSeen(f.query.HashValue, f.ReadBatchIterator.RangeValue()) {
continue
}
return true
}
return false
}