-
Notifications
You must be signed in to change notification settings - Fork 90
/
importer.go
194 lines (170 loc) · 7.2 KB
/
importer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package prestostore
import (
"context"
"sync"
"time"
prom "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/clock"
)
type ImporterMetricsCollectors struct {
TotalImportsCounter prometheus.Counter
FailedImportsCounter prometheus.Counter
ImportDurationHistogram prometheus.Observer
TotalPrometheusQueriesCounter prometheus.Counter
FailedPrometheusQueriesCounter prometheus.Counter
PrometheusQueryDurationHistogram prometheus.Observer
TotalPrestoStoresCounter prometheus.Counter
FailedPrestoStoresCounter prometheus.Counter
PrestoStoreDurationHistogram prometheus.Observer
MetricsScrapedCounter prometheus.Counter
MetricsImportedCounter prometheus.Counter
ImportsRunningGauge prometheus.Gauge
}
// PrometheusImporter imports Prometheus metrics into Presto tables
type PrometheusImporter struct {
logger logrus.FieldLogger
promConn prom.API
prometheusMetricsRepo PrometheusMetricsRepo
clock clock.Clock
cfg Config
// importLock ensures only one import is running at a time, protecting the
// lastTimestamp and metrics fields
importLock sync.Mutex
// lastTimestamp is the lastTimestamp stored for this PrometheusImporter
lastTimestamp *time.Time
// metricsCollectors contains metrics instrumentation types.
metricsCollectors ImporterMetricsCollectors
}
type Config struct {
PrometheusQuery string
PrestoTableName string
ChunkSize time.Duration
StepSize time.Duration
MaxTimeRanges int64
MaxQueryRangeDuration time.Duration
ImportFromTime *time.Time
MaxBackfillImportDuration time.Duration
}
func NewPrometheusImporter(logger logrus.FieldLogger, promConn prom.API, prometheusMetricsRepo PrometheusMetricsRepo, clock clock.Clock, cfg Config, collectors ImporterMetricsCollectors) *PrometheusImporter {
logger = logger.WithFields(logrus.Fields{
"component": "PrometheusImporter",
"tableName": cfg.PrestoTableName,
"chunkSize": cfg.ChunkSize,
"stepSize": cfg.StepSize,
})
return &PrometheusImporter{
logger: logger,
promConn: promConn,
prometheusMetricsRepo: prometheusMetricsRepo,
clock: clock,
cfg: cfg,
metricsCollectors: collectors,
}
}
func (importer *PrometheusImporter) UpdateConfig(cfg Config) {
importer.importLock.Lock()
importer.cfg = cfg
importer.logger = importer.logger.WithFields(logrus.Fields{
"tableName": cfg.PrestoTableName,
"chunkSize": cfg.ChunkSize,
"stepSize": cfg.StepSize,
})
importer.importLock.Unlock()
}
// ImportFromLastTimestamp executes a Presto query from the last time range it
// queried and stores the results in a Presto table.
// The importer will track the last time series it retrieved and will query
// the next time range starting from where it left off if paused or stopped.
// For more details on how querying Prometheus is done, see the package
// pkg/promquery.
func (importer *PrometheusImporter) ImportFromLastTimestamp(ctx context.Context) (*PrometheusImportResults, error) {
importer.importLock.Lock()
importer.logger.Debugf("PrometheusImporter ImportFromLastTimestamp started")
defer importer.logger.Debugf("PrometheusImporter ImportFromLastTimestamp finished")
defer importer.importLock.Unlock()
endTime := importer.clock.Now().UTC()
cfg := importer.cfg
// if importer.lastTimestamp is null then it's because we haven't run
// before, we have been restarted (error, or not) and do not know the
// last time we collected and need to re-query Presto to figure out
// the last timestamp
if importer.lastTimestamp == nil {
var err error
importer.logger.Debugf("lastTimestamp for table %s: isn't known, querying for timestamp", cfg.PrestoTableName)
importer.lastTimestamp, err = importer.prometheusMetricsRepo.GetLastTimestampForTable(cfg.PrestoTableName)
if err != nil {
importer.logger.WithError(err).Errorf("unable to get last timestamp for table %s", cfg.PrestoTableName)
return nil, err
}
}
var startTime time.Time
// if lastTimestamp is still nil, but we didn't error than there is no
// last timestamp and this is the first collection, if not then our query
// above found a timestamp in the table
if importer.lastTimestamp != nil {
importer.logger.Debugf("lastTimestamp for table %s: %s", cfg.PrestoTableName, importer.lastTimestamp.String())
// We don't want to duplicate the importer.lastTimestamp metric so add
// the step size so that we start at the next interval no longer in
// our range.
startTime = importer.lastTimestamp.Add(cfg.StepSize)
} else {
// check if we're supposed to start from a specific
// time, and if not backfill a default amount
if cfg.ImportFromTime != nil {
importer.logger.Debugf("importFromTimestamp for table %s: %s", cfg.PrestoTableName, cfg.ImportFromTime.String())
startTime = *cfg.ImportFromTime
} else {
importer.logger.Debugf("no lastTimestamp or importFromTime for table %s: backfilling %s", cfg.PrestoTableName, cfg.MaxBackfillImportDuration)
startTime = endTime.Add(-cfg.MaxBackfillImportDuration)
}
importer.logger.Infof("no data in table %s: backfilling from %s until %s", cfg.PrestoTableName, startTime, endTime)
}
if startTime.After(endTime) {
importer.logger.Infof("import for table %s too early, skipping import", cfg.PrestoTableName)
return &PrometheusImportResults{}, nil
}
// If the startTime is too far back, we should limit this run to
// cfg.MaxQueryRangeDuration so that if we're stopped for an
// extended amount of time, this function won't return a slice with too
// many time ranges.
totalChunkDuration := startTime.Sub(endTime)
if totalChunkDuration >= cfg.MaxQueryRangeDuration {
endTime = startTime.Add(cfg.MaxQueryRangeDuration)
}
importResults, err := ImportFromTimeRange(importer.logger, importer.clock, importer.promConn, importer.prometheusMetricsRepo, importer.metricsCollectors, ctx, startTime, endTime, cfg)
if err != nil {
importer.logger.WithFields(logrus.Fields{"startTime": startTime, "endTime": endTime}).WithError(err).Error("error collecting metrics")
// at this point we cannot be sure what is in Presto and what
// isn't, so reset our importer.lastTimestamp
importer.lastTimestamp = nil
return &importResults, err
}
if len(importResults.ProcessedTimeRanges) != 0 {
lastTS := importResults.ProcessedTimeRanges[len(importResults.ProcessedTimeRanges)-1].End
importer.lastTimestamp = &lastTS
}
return &importResults, nil
}
func promMatrixToPrometheusMetrics(timeRange prom.Range, matrix model.Matrix) []*PrometheusMetric {
var metrics []*PrometheusMetric
// iterate over segments of contiguous billing metrics
for _, sampleStream := range matrix {
labels := make(map[string]string, len(sampleStream.Metric))
for k, v := range sampleStream.Metric {
labels[string(k)] = string(v)
}
for _, value := range sampleStream.Values {
metric := &PrometheusMetric{
Labels: labels,
Amount: float64(value.Value),
StepSize: timeRange.Step,
Timestamp: value.Timestamp.Time().UTC(),
}
metrics = append(metrics, metric)
}
}
return metrics
}