-
Notifications
You must be signed in to change notification settings - Fork 90
/
query.go
195 lines (166 loc) · 7.08 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package prestostore
import (
"context"
"fmt"
"sort"
"time"
prom "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/clock"
)
type PrometheusImportResults struct {
ProcessedTimeRanges []prom.Range
Metrics []*PrometheusMetric
}
// importFromTimeRange executes a promQL query over the interval between start
// and end, performing multiple Prometheus query_range queries of chunkSize.
// Returns the time ranges queried and any errors encountered. Stops after the
// first error, consult timeRanges to determine how many chunks were queried.
//
// If the number of queries exceeds maxTimeRanges, then the timeRanges
// exceeding that count will be skipped.
func ImportFromTimeRange(logger logrus.FieldLogger, clock clock.Clock, promConn prom.API, prometheusMetricsStorer PrometheusMetricsStorer, metricsCollectors ImporterMetricsCollectors, ctx context.Context, startTime, endTime time.Time, cfg Config) (PrometheusImportResults, error) {
metricsCollectors.ImportsRunningGauge.Inc()
queryRangeDuration := endTime.Sub(startTime)
if cfg.MaxQueryRangeDuration != 0 && queryRangeDuration > cfg.MaxQueryRangeDuration {
newEndTime := startTime.Add(cfg.MaxQueryRangeDuration)
logger.Warnf("time range %s to %s exceeds PrometheusImporter MaxQueryRangeDuration %s, newEndTime: %s", startTime, endTime, cfg.MaxQueryRangeDuration, newEndTime)
endTime = newEndTime
}
importStart := clock.Now()
metricsCollectors.TotalImportsCounter.Inc()
defer func() {
metricsCollectors.ImportsRunningGauge.Dec()
importDuration := clock.Since(importStart)
metricsCollectors.ImportDurationHistogram.Observe(importDuration.Seconds())
logger.Debugf("took %s to run import", importDuration)
}()
timeRanges := getTimeRangesChunked(startTime, endTime, cfg.ChunkSize, cfg.StepSize, cfg.MaxTimeRanges)
var importResults PrometheusImportResults
if len(timeRanges) == 0 {
logger.Debugf("no time ranges to query yet for table %s", cfg.PrestoTableName)
return importResults, nil
}
metricsCount := 0
startTime = timeRanges[0].Start.UTC()
endTime = timeRanges[len(timeRanges)-1].End.UTC()
logger = logger.WithFields(logrus.Fields{
"startTime": startTime,
"endTime": endTime,
})
logger.Debugf("querying for data between %s and %s (chunks: %d)", startTime, endTime, len(timeRanges))
for _, timeRange := range timeRanges {
// check for cancellation
select {
case <-ctx.Done():
return importResults, ctx.Err()
default:
// continue processing if context isn't cancelled.
}
promQueryBegin := timeRange.Start.UTC()
promQueryEnd := timeRange.End.UTC()
promLogger := logger.WithFields(logrus.Fields{
"promQueryBegin": promQueryBegin,
"promQueryEnd": promQueryEnd,
})
promLogger.Debugf("querying Prometheus using range %s to %s", timeRange.Start, timeRange.End)
queryStart := clock.Now()
pVal, err := promConn.QueryRange(ctx, cfg.PrometheusQuery, timeRange)
queryDuration := clock.Since(queryStart)
metricsCollectors.PrometheusQueryDurationHistogram.Observe(float64(queryDuration.Seconds()))
metricsCollectors.TotalPrometheusQueriesCounter.Inc()
if err != nil {
metricsCollectors.FailedImportsCounter.Inc()
metricsCollectors.FailedPrometheusQueriesCounter.Inc()
return importResults, fmt.Errorf("failed to perform Prometheus query: %v", err)
}
matrix, ok := pVal.(model.Matrix)
if !ok {
return importResults, fmt.Errorf("expected a matrix in response to query, got a %v", pVal.Type())
}
metrics := promMatrixToPrometheusMetrics(timeRange, matrix)
numMetrics := len(metrics)
metricsCollectors.MetricsScrapedCounter.Add(float64(numMetrics))
// check for cancellation
select {
case <-ctx.Done():
return importResults, ctx.Err()
default:
// continue processing if context isn't cancelled.
}
if numMetrics != 0 {
// Ensure the metrics are sorted by timestamp
sort.Slice(metrics, func(i, j int) bool {
return metrics[i].Timestamp.Before(metrics[j].Timestamp)
})
metricsBegin := metrics[0].Timestamp
metricsEnd := metrics[numMetrics-1].Timestamp
logger := promLogger.WithFields(logrus.Fields{
"metricsBegin": metricsBegin,
"metricsEnd": metricsEnd,
})
logger.Debugf("got %d metrics for time range %s to %s, storing them into Presto into table %s", numMetrics, promQueryBegin, promQueryEnd, cfg.PrestoTableName)
metricsCollectors.TotalPrometheusQueriesCounter.Inc()
prestoStoreBegin := clock.Now()
err := prometheusMetricsStorer.StorePrometheusMetrics(ctx, cfg.PrestoTableName, metrics)
prestoStoreDuration := clock.Since(prestoStoreBegin)
metricsCollectors.PrestoStoreDurationHistogram.Observe(float64(prestoStoreDuration.Seconds()))
if err != nil {
metricsCollectors.FailedImportsCounter.Inc()
metricsCollectors.FailedPrestoStoresCounter.Inc()
return importResults, fmt.Errorf("failed to store Prometheus metrics into table %s for the range %v to %v: %v",
cfg.PrestoTableName, promQueryBegin, promQueryEnd, err)
}
logger.Debugf("stored %d metrics for time range %s to %s into Presto table %s (took %s)", numMetrics, promQueryBegin, promQueryEnd, cfg.PrestoTableName, prestoStoreDuration)
importResults.Metrics = append(importResults.Metrics, metrics...)
metricsCollectors.MetricsImportedCounter.Add(float64(numMetrics))
metricsCount += numMetrics
}
importResults.ProcessedTimeRanges = append(importResults.ProcessedTimeRanges, timeRange)
}
if len(importResults.ProcessedTimeRanges) != 0 {
begin := importResults.ProcessedTimeRanges[0].Start.UTC()
end := importResults.ProcessedTimeRanges[len(timeRanges)-1].End.UTC()
logger.Infof("stored a total of %d metrics for data between %s and %s into %s", metricsCount, begin, end, cfg.PrestoTableName)
return importResults, nil
} else {
logger.Infof("no time ranges processed for %s", cfg.PrestoTableName)
return importResults, nil
}
}
func getTimeRangesChunked(beginTime, endTime time.Time, chunkSize, stepSize time.Duration, maxTimeRanges int64) []prom.Range {
chunkStart := truncateToSecond(beginTime)
chunkEnd := truncateToSecond(chunkStart.Add(chunkSize))
// don't set a limit if negative or zero
disableMax := maxTimeRanges <= 0
var timeRanges []prom.Range
for i := int64(0); disableMax || (i < maxTimeRanges); i++ {
// Do not collect data after endTime
if chunkEnd.After(endTime) {
break
}
// Only get chunks that are a full chunk size
if chunkEnd.Sub(chunkStart) < chunkSize {
break
}
timeRanges = append(timeRanges, prom.Range{
Start: chunkStart.UTC(),
End: chunkEnd.UTC(),
Step: stepSize,
})
if chunkEnd.Equal(truncateToSecond(endTime)) {
break
}
// Add the metrics step size to the start time so that we don't
// re-query the Previous ranges end time in this range
chunkStart = truncateToSecond(chunkEnd.Add(stepSize))
// Add chunkSize to the end time to get our full chunk. If the chunkEnd
// is past the endTime, then this chunk is skipped.
chunkEnd = truncateToSecond(chunkStart.Add(chunkSize))
}
return timeRanges
}
func truncateToSecond(t time.Time) time.Time {
return t.Truncate(time.Minute)
}