forked from grafana/loki
/
composite_store_entry.go
168 lines (135 loc) · 5.31 KB
/
composite_store_entry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package stores
import (
"context"
"fmt"
"time"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/errors"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/series"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
"github.com/grafana/loki/pkg/util/validation"
)
var _ Store = &compositeStore{}
type StoreLimits interface {
MaxChunksPerQueryFromStore(userID string) int
MaxQueryLength(userID string) time.Duration
}
type ChunkWriter interface {
Put(ctx context.Context, chunks []chunk.Chunk) error
PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error
}
type compositeStoreEntry struct {
start model.Time
Store
}
type storeEntry struct {
limits StoreLimits
stop func()
fetcher *fetcher.Fetcher
index series.IndexStore
ChunkWriter
}
func (c *storeEntry) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
log, ctx := spanlogger.New(ctx, "GetChunkRefs")
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
level.Debug(log).Log(
"shortcut", shortcut,
"from", from.Time(),
"through", through.Time(),
"err", err,
)
if err != nil {
return nil, nil, err
} else if shortcut {
return nil, nil, nil
}
refs, err := c.index.GetChunkRefs(ctx, userID, from, through, allMatchers...)
chunks := make([]chunk.Chunk, len(refs))
for i, ref := range refs {
chunks[i] = chunk.Chunk{
ChunkRef: ref,
}
}
return [][]chunk.Chunk{chunks}, []*fetcher.Fetcher{c.fetcher}, err
}
func (c *storeEntry) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
return c.index.GetSeries(ctx, userID, from, through, matchers...)
}
func (c *storeEntry) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
c.index.SetChunkFilterer(chunkFilter)
}
// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.LabelNamesForMetricName")
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
level.Debug(log).Log("metric", metricName)
return c.index.LabelNamesForMetricName(ctx, userID, from, through, metricName)
}
func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.LabelValuesForMetricName")
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
return c.index.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...)
}
func (c *storeEntry) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.Stats")
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
return c.index.Stats(ctx, userID, from, through, matchers...)
}
func (c *storeEntry) validateQueryTimeRange(ctx context.Context, userID string, from *model.Time, through *model.Time) (bool, error) {
//nolint:ineffassign,staticcheck //Leaving ctx even though we don't currently use it, we want to make it available for when we might need it and hopefully will ensure us using the correct context at that time
if *through < *from {
return false, errors.QueryError(fmt.Sprintf("invalid query, through < from (%s < %s)", through, from))
}
maxQueryLength := c.limits.MaxQueryLength(userID)
if maxQueryLength > 0 && (*through).Sub(*from) > maxQueryLength {
return false, errors.QueryError(fmt.Sprintf(validation.ErrQueryTooLong, (*through).Sub(*from), maxQueryLength))
}
now := model.Now()
if from.After(now) {
// time-span start is in future ... regard as legal
level.Info(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now)
return true, nil
}
if through.After(now.Add(5 * time.Minute)) {
// time-span end is in future ... regard as legal
level.Info(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now)
*through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes
}
return false, nil
}
func (c *storeEntry) GetChunkFetcher(tm model.Time) *fetcher.Fetcher {
return c.fetcher
}
func (c *storeEntry) Stop() {
if c.stop != nil {
c.stop()
}
}