forked from grafana/loki
/
query_limiter.go
103 lines (85 loc) · 3.07 KB
/
query_limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package limiter
import (
"context"
"fmt"
"sync"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"github.com/pao214/loki/v2/pkg/ingester/client"
"github.com/pao214/loki/v2/pkg/logproto"
)
type queryLimiterCtxKey struct{}
var (
ctxKey = &queryLimiterCtxKey{}
ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)"
ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)"
ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit (limit: %d chunks)"
)
type QueryLimiter struct {
uniqueSeriesMx sync.Mutex
uniqueSeries map[model.Fingerprint]struct{}
chunkBytesCount atomic.Int64
chunkCount atomic.Int64
maxSeriesPerQuery int
maxChunkBytesPerQuery int
maxChunksPerQuery int
}
// NewQueryLimiter makes a new per-query limiter. Each query limiter
// is configured using the `maxSeriesPerQuery` limit.
func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery int, maxChunksPerQuery int) *QueryLimiter {
return &QueryLimiter{
uniqueSeriesMx: sync.Mutex{},
uniqueSeries: map[model.Fingerprint]struct{}{},
maxSeriesPerQuery: maxSeriesPerQuery,
maxChunkBytesPerQuery: maxChunkBytesPerQuery,
maxChunksPerQuery: maxChunksPerQuery,
}
}
func AddQueryLimiterToContext(ctx context.Context, limiter *QueryLimiter) context.Context {
return context.WithValue(ctx, ctxKey, limiter)
}
// QueryLimiterFromContextWithFallback returns a QueryLimiter from the current context.
// If there is not a QueryLimiter on the context it will return a new no-op limiter.
func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
ql, ok := ctx.Value(ctxKey).(*QueryLimiter)
if !ok {
// If there's no limiter return a new unlimited limiter as a fallback
ql = NewQueryLimiter(0, 0, 0)
}
return ql
}
// AddSeries adds the input series and returns an error if the limit is reached.
func (ql *QueryLimiter) AddSeries(seriesLabels []logproto.LabelAdapter) error {
// If the max series is unlimited just return without managing map
if ql.maxSeriesPerQuery == 0 {
return nil
}
fingerprint := client.FastFingerprint(seriesLabels)
ql.uniqueSeriesMx.Lock()
defer ql.uniqueSeriesMx.Unlock()
ql.uniqueSeries[fingerprint] = struct{}{}
if len(ql.uniqueSeries) > ql.maxSeriesPerQuery {
// Format error with max limit
return fmt.Errorf(ErrMaxSeriesHit, ql.maxSeriesPerQuery)
}
return nil
}
// AddChunkBytes adds the input chunk size in bytes and returns an error if the limit is reached.
func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error {
if ql.maxChunkBytesPerQuery == 0 {
return nil
}
if ql.chunkBytesCount.Add(int64(chunkSizeInBytes)) > int64(ql.maxChunkBytesPerQuery) {
return fmt.Errorf(ErrMaxChunkBytesHit, ql.maxChunkBytesPerQuery)
}
return nil
}
func (ql *QueryLimiter) AddChunks(count int) error {
if ql.maxChunksPerQuery == 0 {
return nil
}
if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) {
return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, ql.maxChunksPerQuery))
}
return nil
}