-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
otelprometheuschecker.go
202 lines (160 loc) · 7.9 KB
/
otelprometheuschecker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package componenttest // import "go.opentelemetry.io/collector/component/componenttest"
import (
"fmt"
"math"
"net/http"
"net/http/httptest"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/multierr"
"go.opentelemetry.io/collector/component"
)
// prometheusChecker is used to assert exported metrics from a prometheus handler.
type prometheusChecker struct {
otelHandler http.Handler
}
func (pc *prometheusChecker) checkScraperMetrics(receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
scraperAttrs := attributesForScraperMetrics(receiver, scraper)
return multierr.Combine(
pc.checkCounter("scraper_scraped_metric_points", scrapedMetricPoints, scraperAttrs),
pc.checkCounter("scraper_errored_metric_points", erroredMetricPoints, scraperAttrs))
}
func (pc *prometheusChecker) checkReceiverTraces(receiver component.ID, protocol string, accepted, dropped int64) error {
return pc.checkReceiver(receiver, "spans", protocol, accepted, dropped)
}
func (pc *prometheusChecker) checkReceiverLogs(receiver component.ID, protocol string, accepted, dropped int64) error {
return pc.checkReceiver(receiver, "log_records", protocol, accepted, dropped)
}
func (pc *prometheusChecker) checkReceiverMetrics(receiver component.ID, protocol string, accepted, dropped int64) error {
return pc.checkReceiver(receiver, "metric_points", protocol, accepted, dropped)
}
func (pc *prometheusChecker) checkReceiver(receiver component.ID, datatype, protocol string, acceptedMetricPoints, droppedMetricPoints int64) error {
receiverAttrs := attributesForReceiverMetrics(receiver, protocol)
return multierr.Combine(
pc.checkCounter(fmt.Sprintf("receiver_accepted_%s", datatype), acceptedMetricPoints, receiverAttrs),
pc.checkCounter(fmt.Sprintf("receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs))
}
func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "spans", accepted, refused, dropped)
}
func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped)
}
func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "log_records", accepted, refused, dropped)
}
func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped int64) error {
processorAttrs := attributesForProcessorMetrics(processor)
return multierr.Combine(
pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs))
}
func (pc *prometheusChecker) checkExporterTraces(exporter component.ID, sent, sendFailed int64) error {
return pc.checkExporter(exporter, "spans", sent, sendFailed)
}
func (pc *prometheusChecker) checkExporterLogs(exporter component.ID, sent, sendFailed int64) error {
return pc.checkExporter(exporter, "log_records", sent, sendFailed)
}
func (pc *prometheusChecker) checkExporterMetrics(exporter component.ID, sent, sendFailed int64) error {
return pc.checkExporter(exporter, "metric_points", sent, sendFailed)
}
func (pc *prometheusChecker) checkExporter(exporter component.ID, datatype string, sent, sendFailed int64) error {
exporterAttrs := attributesForExporterMetrics(exporter)
errs := pc.checkCounter(fmt.Sprintf("exporter_sent_%s", datatype), sent, exporterAttrs)
if sendFailed > 0 {
errs = multierr.Append(errs,
pc.checkCounter(fmt.Sprintf("exporter_send_failed_%s", datatype), sendFailed, exporterAttrs))
}
return errs
}
func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, datatype string, enqueueFailed int64) error {
if enqueueFailed == 0 {
return nil
}
exporterAttrs := attributesForExporterMetrics(exporter)
return pc.checkCounter(fmt.Sprintf("exporter_enqueue_failed_%s", datatype), enqueueFailed, exporterAttrs)
}
func (pc *prometheusChecker) checkExporterMetricGauge(exporter component.ID, metric string, val int64) error {
exporterAttrs := attributesForExporterMetrics(exporter)
ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, exporterAttrs)
if err != nil {
return err
}
expected := float64(val)
if math.Abs(ts.GetGauge().GetValue()-expected) > 0.0001 {
return fmt.Errorf("values for metric '%s' did not match, expected '%f' got '%f'", metric, expected, ts.GetGauge().GetValue())
}
return nil
}
func (pc *prometheusChecker) checkCounter(expectedMetric string, value int64, attrs []attribute.KeyValue) error {
ts, err := pc.getMetric(expectedMetric, io_prometheus_client.MetricType_COUNTER, attrs)
if err != nil {
return err
}
expected := float64(value)
if math.Abs(expected-ts.GetCounter().GetValue()) > 0.0001 {
return fmt.Errorf("values for metric '%s' did not match, expected '%f' got '%f'", expectedMetric, expected, ts.GetCounter().GetValue())
}
return nil
}
// getMetric returns the metric time series that matches the given name, type and set of attributes
// it fetches data from the prometheus endpoint and parse them, ideally OTel Go should provide a MeterRecorder of some kind.
func (pc *prometheusChecker) getMetric(expectedName string, expectedType io_prometheus_client.MetricType, expectedAttrs []attribute.KeyValue) (*io_prometheus_client.Metric, error) {
parsed, err := fetchPrometheusMetrics(pc.otelHandler)
if err != nil {
return nil, err
}
metricFamily, ok := parsed[expectedName]
if !ok {
return nil, fmt.Errorf("metric '%s' not found", expectedName)
}
if metricFamily.Type.String() != expectedType.String() {
return nil, fmt.Errorf("metric '%v' has type '%s' instead of '%s'", expectedName, metricFamily.Type.String(), expectedType.String())
}
expectedSet := attribute.NewSet(expectedAttrs...)
for _, metric := range metricFamily.Metric {
var attrs []attribute.KeyValue
for _, label := range metric.Label {
attrs = append(attrs, attribute.String(label.GetName(), label.GetValue()))
}
set := attribute.NewSet(attrs...)
if expectedSet.Equals(&set) {
return metric, nil
}
}
return nil, fmt.Errorf("metric '%s' doesn't have a timeseries with the given attributes: %s", expectedName, expectedSet.Encoded(attribute.DefaultEncoder()))
}
func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_client.MetricFamily, error) {
req, err := http.NewRequest(http.MethodGet, "/metrics", nil)
if err != nil {
return nil, err
}
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
var parser expfmt.TextParser
return parser.TextToMetricFamilies(rr.Body)
}
func attributesForScraperMetrics(receiver component.ID, scraper component.ID) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String(receiverTag, receiver.String()),
attribute.String(scraperTag, scraper.String()),
}
}
// attributesForReceiverMetrics returns the attributes that are needed for the receiver metrics.
func attributesForReceiverMetrics(receiver component.ID, transport string) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String(receiverTag, receiver.String()),
attribute.String(transportTag, transport),
}
}
func attributesForProcessorMetrics(processor component.ID) []attribute.KeyValue {
return []attribute.KeyValue{attribute.String(processorTag, processor.String())}
}
// attributesForExporterMetrics returns the attributes that are needed for the receiver metrics.
func attributesForExporterMetrics(exporter component.ID) []attribute.KeyValue {
return []attribute.KeyValue{attribute.String(exporterTag, exporter.String())}
}