diff --git a/plugin/metrics/prometheus/metricsstore/dbmodel/to_domain.go b/plugin/metrics/prometheus/metricsstore/dbmodel/to_domain.go new file mode 100644 index 00000000000..76504fa4b08 --- /dev/null +++ b/plugin/metrics/prometheus/metricsstore/dbmodel/to_domain.go @@ -0,0 +1,93 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dbmodel + +import ( + "fmt" + + "github.com/gogo/protobuf/types" + "github.com/prometheus/common/model" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" +) + +// ToDomainMetricsFamily converts Prometheus' representation of metrics query results to Jaeger's. +func ToDomainMetricsFamily(name, description string, mv model.Value) (*metrics.MetricFamily, error) { + if mv.Type() != model.ValMatrix { + return &metrics.MetricFamily{}, fmt.Errorf("unexpected metrics ValueType: %s", mv.Type()) + } + return &metrics.MetricFamily{ + Name: name, + Type: metrics.MetricType_GAUGE, + Help: description, + Metrics: toDomainMetrics(mv.(model.Matrix)), + }, nil +} + +// toDomainMetrics converts Prometheus' representation of metrics to Jaeger's. +func toDomainMetrics(matrix model.Matrix) []*metrics.Metric { + ms := make([]*metrics.Metric, matrix.Len()) + for i, ss := range matrix { + ms[i] = &metrics.Metric{ + Labels: toDomainLabels(ss.Metric), + MetricPoints: toDomainMetricPoints(ss.Values), + } + } + return ms +} + +// toDomainLabels converts Prometheus' representation of metric labels to Jaeger's. +func toDomainLabels(promLabels model.Metric) []*metrics.Label { + labels := make([]*metrics.Label, len(promLabels)) + j := 0 + for k, v := range promLabels { + labels[j] = &metrics.Label{Name: string(k), Value: string(v)} + j++ + } + return labels +} + +// toDomainMetricPoints convert's Prometheus' representation of metrics data points to Jaeger's. +func toDomainMetricPoints(promDps []model.SamplePair) []*metrics.MetricPoint { + domainMps := make([]*metrics.MetricPoint, len(promDps)) + for i, promDp := range promDps { + mp := &metrics.MetricPoint{ + Timestamp: toDomainTimestamp(promDp.Timestamp), + Value: toDomainMetricPointValue(promDp.Value), + } + domainMps[i] = mp + } + return domainMps +} + +// toDomainTimestamp converts Prometheus' representation of timestamps to Jaeger's. +func toDomainTimestamp(timeMs model.Time) *types.Timestamp { + return &types.Timestamp{ + Seconds: int64(timeMs / 1000), + Nanos: int32((timeMs % 1000) * 1_000_000), + } +} + +// toDomainMetricPointValue converts Prometheus' representation of a double gauge value to Jaeger's. +// The gauge metric type is used because latency, call and error rates metrics do not consist of monotonically +// increasing values; rather, they are a series of any positive floating number which can fluctuate in any +// direction over time. +func toDomainMetricPointValue(promVal model.SampleValue) *metrics.MetricPoint_GaugeValue { + return &metrics.MetricPoint_GaugeValue{ + GaugeValue: &metrics.GaugeValue{ + Value: &metrics.GaugeValue_DoubleValue{DoubleValue: float64(promVal)}, + }, + } +} diff --git a/plugin/metrics/prometheus/metricsstore/dbmodel/to_domain_test.go b/plugin/metrics/prometheus/metricsstore/dbmodel/to_domain_test.go new file mode 100644 index 00000000000..9b3af3a5031 --- /dev/null +++ b/plugin/metrics/prometheus/metricsstore/dbmodel/to_domain_test.go @@ -0,0 +1,69 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dbmodel + +import ( + "testing" + "time" + + "github.com/gogo/protobuf/types" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" +) + +func TestToDomainMetricsFamily(t *testing.T) { + promMetrics := model.Matrix{} + nowSec := time.Now().Unix() + promMetrics = append(promMetrics, &model.SampleStream{ + Metric: map[model.LabelName]model.LabelValue{"label_key": "label_value"}, + Values: []model.SamplePair{ + {Timestamp: model.Time(nowSec * 1000), Value: 1234}, + }, + }) + mf, err := ToDomainMetricsFamily("the_metric_name", "the_metric_description", promMetrics) + require.NoError(t, err) + + assert.NotEmpty(t, mf) + + assert.Equal(t, "the_metric_name", mf.Name) + assert.Equal(t, "the_metric_description", mf.Help) + assert.Equal(t, metrics.MetricType_GAUGE, mf.Type) + + assert.Len(t, mf.Metrics, 1) + assert.Equal(t, []*metrics.Label{{Name: "label_key", Value: "label_value"}}, mf.Metrics[0].Labels) + + wantMpValue := &metrics.MetricPoint_GaugeValue{ + GaugeValue: &metrics.GaugeValue{ + Value: &metrics.GaugeValue_DoubleValue{ + DoubleValue: 1234, + }, + }, + } + assert.Equal(t, []*metrics.MetricPoint{{Timestamp: &types.Timestamp{Seconds: nowSec}, Value: wantMpValue}}, mf.Metrics[0].MetricPoints) +} + +func TestUnexpectedMetricsFamilyType(t *testing.T) { + promMetrics := model.Vector{} + mf, err := ToDomainMetricsFamily("the_metric_name", "the_metric_description", promMetrics) + + assert.NotNil(t, mf) + assert.Empty(t, mf) + + require.Error(t, err) + assert.EqualError(t, err, "unexpected metrics ValueType: vector") +} diff --git a/plugin/metrics/prometheus/metricsstore/reader.go b/plugin/metrics/prometheus/metricsstore/reader.go index d38837b2eed..99f5cde4c58 100644 --- a/plugin/metrics/prometheus/metricsstore/reader.go +++ b/plugin/metrics/prometheus/metricsstore/reader.go @@ -16,23 +16,60 @@ package metricsstore import ( "context" + "fmt" "net" "net/http" + "strings" "time" + "unicode" + "github.com/opentracing/opentracing-go" + ottag "github.com/opentracing/opentracing-go/ext" + otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/api" promapi "github.com/prometheus/client_golang/api/prometheus/v1" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/plugin/metrics/prometheus/metricsstore/dbmodel" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" "github.com/jaegertracing/jaeger/storage/metricsstore" ) -// MetricsReader is a Prometheus metrics reader. -type MetricsReader struct { - client promapi.API - logger *zap.Logger -} +const ( + minStep = time.Millisecond + + latenciesMetricName = "service_latencies" + latenciesMetricDesc = "%.2fth quantile latency, grouped by service" + + callsMetricName = "service_call_rate" + callsMetricDesc = "calls/sec, grouped by service" + + errorsMetricName = "service_error_rate" + errorsMetricDesc = "error rate, computed as a fraction of errors/sec over calls/sec, grouped by service" +) + +type ( + // MetricsReader is a Prometheus metrics reader. + MetricsReader struct { + client promapi.API + logger *zap.Logger + } + + promQueryParams struct { + groupBy string + spanKindFilter string + serviceFilter string + rate string + } + + metricsQueryParams struct { + metricsstore.BaseQueryParameters + groupByHistBucket bool + metricName string + metricDesc string + buildPromQuery func(p promQueryParams) string + } +) // NewMetricsReader returns a new MetricsReader. func NewMetricsReader(logger *zap.Logger, hostPort string, connectTimeout time.Duration) (*MetricsReader, error) { @@ -52,7 +89,7 @@ func NewMetricsReader(logger *zap.Logger, hostPort string, connectTimeout time.D RoundTripper: roundTripper, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to initialize prometheus client: %w", err) } mr := &MetricsReader{ client: promapi.NewAPI(client), @@ -63,24 +100,150 @@ func NewMetricsReader(logger *zap.Logger, hostPort string, connectTimeout time.D } // GetLatencies gets the latency metrics for the given set of latency query parameters. -func (m *MetricsReader) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) { - // TODO: Implement me - return &metrics.MetricFamily{}, nil +func (m *MetricsReader) GetLatencies(ctx context.Context, requestParams *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) { + metricsParams := metricsQueryParams{ + BaseQueryParameters: requestParams.BaseQueryParameters, + groupByHistBucket: true, + metricName: latenciesMetricName, + metricDesc: fmt.Sprintf(latenciesMetricDesc, requestParams.Quantile), + buildPromQuery: func(p promQueryParams) string { + return fmt.Sprintf( + // Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection. + `histogram_quantile(%.2f, sum(latency_bucket{service_name =~ "%s", %s}) by (%s))`, + requestParams.Quantile, + p.serviceFilter, + p.spanKindFilter, + p.groupBy, + ) + }, + } + return m.executeQuery(ctx, metricsParams) } // GetCallRates gets the call rate metrics for the given set of call rate query parameters. -func (m *MetricsReader) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) { - // TODO: Implement me - return &metrics.MetricFamily{}, nil +func (m *MetricsReader) GetCallRates(ctx context.Context, requestParams *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) { + metricsParams := metricsQueryParams{ + BaseQueryParameters: requestParams.BaseQueryParameters, + metricName: callsMetricName, + metricDesc: callsMetricDesc, + buildPromQuery: func(p promQueryParams) string { + return fmt.Sprintf( + // Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection. + `sum(rate(calls_total{service_name =~ "%s", %s}[%s])) by (%s)`, + p.serviceFilter, + p.spanKindFilter, + p.rate, + p.groupBy, + ) + }, + } + return m.executeQuery(ctx, metricsParams) } // GetErrorRates gets the error rate metrics for the given set of error rate query parameters. -func (m *MetricsReader) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) { - // TODO: Implement me - return &metrics.MetricFamily{}, nil +func (m *MetricsReader) GetErrorRates(ctx context.Context, requestParams *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) { + metricsParams := metricsQueryParams{ + BaseQueryParameters: requestParams.BaseQueryParameters, + metricName: errorsMetricName, + metricDesc: errorsMetricDesc, + buildPromQuery: func(p promQueryParams) string { + return fmt.Sprintf( + // Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection. + `sum(rate(calls_total{service_name =~ "%s", status_code = "STATUS_CODE_ERROR", %s}[%s])) by (%s) / sum(rate(calls_total{service_name =~ "%s", %s}[%s])) by (%s)`, + p.serviceFilter, p.spanKindFilter, p.rate, p.groupBy, + p.serviceFilter, p.spanKindFilter, p.rate, p.groupBy, + ) + }, + } + return m.executeQuery(ctx, metricsParams) } // GetMinStepDuration gets the minimum step duration (the smallest possible duration between two data points in a time series) supported. func (m *MetricsReader) GetMinStepDuration(_ context.Context, _ *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) { - return time.Millisecond, nil + return minStep, nil +} + +// executeQuery executes a query against a Prometheus-compliant metrics backend. +func (m *MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) (*metrics.MetricFamily, error) { + if p.GroupByOperation { + p.metricName = strings.Replace(p.metricName, "service", "service_operation", 1) + p.metricDesc += " & operation" + } + promQuery := buildPromQuery(p) + + span, ctx := startSpanForQuery(ctx, p.metricName, promQuery) + defer span.Finish() + + queryRange := promapi.Range{ + Start: p.EndTime.Add(-1 * *p.Lookback), + End: *p.EndTime, + Step: *p.Step, + } + + m.logger.Debug("Executing Prometheus query", zap.String("query", promQuery), zap.Any("range", queryRange)) + + mv, warnings, err := m.client.QueryRange(ctx, promQuery, queryRange) + if err != nil { + logErrorToSpan(span, err) + return &metrics.MetricFamily{}, fmt.Errorf("failed executing metrics query: %w", err) + } + if len(warnings) > 0 { + m.logger.Warn("Warnings detected on Prometheus query", zap.Any("warnings", warnings)) + } + + m.logger.Debug("Prometheus query results", zap.String("results", mv.String())) + return dbmodel.ToDomainMetricsFamily( + p.metricName, + p.metricDesc, + mv, + ) +} + +func buildPromQuery(metricsParams metricsQueryParams) string { + groupBy := []string{"service_name"} + if metricsParams.GroupByOperation { + groupBy = append(groupBy, "operation") + } + if metricsParams.groupByHistBucket { + // Group by the bucket value ("le" => "less than or equal to"). + groupBy = append(groupBy, "le") + } + + spanKindFilter := "" + if len(metricsParams.SpanKinds) > 0 { + spanKindFilter = fmt.Sprintf(`span_kind =~ "%s"`, strings.Join(metricsParams.SpanKinds, "|")) + } + promParams := promQueryParams{ + serviceFilter: strings.Join(metricsParams.ServiceNames, "|"), + spanKindFilter: spanKindFilter, + rate: promqlDurationString(metricsParams.RatePer), + groupBy: strings.Join(groupBy, ","), + } + return metricsParams.buildPromQuery(promParams) +} + +// promqlDurationString formats the duration string to be promQL-compliant. +// PromQL only accepts "single-unit" durations like "30s", "1m", "1h"; not "1h5s" or "1m0s". +func promqlDurationString(d *time.Duration) string { + var b []byte + for _, c := range d.String() { + b = append(b, byte(c)) + if unicode.IsLetter(c) { + break + } + } + return string(b) +} + +func startSpanForQuery(ctx context.Context, metricName, query string) (opentracing.Span, context.Context) { + span, ctx := opentracing.StartSpanFromContext(ctx, metricName) + ottag.DBStatement.Set(span, query) + ottag.DBType.Set(span, "prometheus") + ottag.Component.Set(span, "promql") + return span, ctx +} + +func logErrorToSpan(span opentracing.Span, err error) { + ottag.Error.Set(span, true) + span.LogFields(otlog.Error(err)) } diff --git a/plugin/metrics/prometheus/metricsstore/reader_test.go b/plugin/metrics/prometheus/metricsstore/reader_test.go index 728a71328a6..16c38132c11 100644 --- a/plugin/metrics/prometheus/metricsstore/reader_test.go +++ b/plugin/metrics/prometheus/metricsstore/reader_test.go @@ -16,76 +16,369 @@ package metricsstore import ( "context" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strings" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" "github.com/jaegertracing/jaeger/storage/metricsstore" ) +type ( + metricsTestCase struct { + name string + serviceNames []string + spanKinds []string + groupByOperation bool + wantName string + wantDescription string + wantLabels map[string]string + wantPromQlQuery string + } +) + const defaultTimeout = 30 * time.Second func TestNewMetricsReaderValidAddress(t *testing.T) { logger := zap.NewNop() reader, err := NewMetricsReader(logger, "localhost:1234", defaultTimeout) - assert.NoError(t, err) + require.NoError(t, err) assert.NotNil(t, reader) } func TestNewMetricsReaderInvalidAddress(t *testing.T) { logger := zap.NewNop() reader, err := NewMetricsReader(logger, "\n", defaultTimeout) - const wantErrMsg = `parse "http://\n": net/url: invalid control character in URL` - assert.EqualError(t, err, wantErrMsg) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to initialize prometheus client") assert.Nil(t, reader) } func TestGetMinStepDuration(t *testing.T) { params := metricsstore.MinStepDurationQueryParameters{} logger := zap.NewNop() + listener, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + assert.NotNil(t, listener) - reader, err := NewMetricsReader(logger, "localhost:1234", defaultTimeout) - assert.NoError(t, err) + reader, err := NewMetricsReader(logger, listener.Addr().String(), defaultTimeout) + require.NoError(t, err) minStep, err := reader.GetMinStepDuration(context.Background(), ¶ms) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, time.Millisecond, minStep) } -func TestGetLatencies(t *testing.T) { - params := metricsstore.LatenciesQueryParameters{} +func TestMetricsServerError(t *testing.T) { + endTime := time.Now() + lookback := time.Minute + step := time.Millisecond + ratePer := 10 * time.Minute + + params := metricsstore.CallRateQueryParameters{ + BaseQueryParameters: metricsstore.BaseQueryParameters{ + EndTime: &endTime, + Lookback: &lookback, + Step: &step, + RatePer: &ratePer, + }, + } + + mockPrometheus := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "internal server error", http.StatusInternalServerError) + })) + defer mockPrometheus.Close() + logger := zap.NewNop() + address := mockPrometheus.Listener.Addr().String() + reader, err := NewMetricsReader(logger, address, defaultTimeout) + require.NoError(t, err) - reader, err := NewMetricsReader(logger, "localhost:1234", defaultTimeout) - assert.NoError(t, err) + m, err := reader.GetCallRates(context.Background(), ¶ms) + assert.NotNil(t, m) - m, err := reader.GetLatencies(context.Background(), ¶ms) - assert.NoError(t, err) - assert.Empty(t, m) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed executing metrics query") } -func TestGetCallRates(t *testing.T) { - params := metricsstore.CallRateQueryParameters{} - logger := zap.NewNop() +func TestGetLatencies(t *testing.T) { + for _, tc := range []metricsTestCase{ + { + name: "group by service should be reflected in name/description and query group-by", + serviceNames: []string{"emailservice"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOperation: false, + wantName: "service_latencies", + wantDescription: "0.95th quantile latency, grouped by service", + wantLabels: map[string]string{ + "service_name": "emailservice", + }, + wantPromQlQuery: `histogram_quantile(0.95, sum(latency_bucket{service_name =~ "emailservice", ` + + `span_kind =~ "SPAN_KIND_SERVER"}) by (service_name,le))`, + }, + { + name: "group by service and operation should be reflected in name/description and query group-by", + serviceNames: []string{"emailservice"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOperation: true, + wantName: "service_operation_latencies", + wantDescription: "0.95th quantile latency, grouped by service & operation", + wantLabels: map[string]string{ + "operation": "/OrderResult", + "service_name": "emailservice", + }, + wantPromQlQuery: `histogram_quantile(0.95, sum(latency_bucket{service_name =~ "emailservice", ` + + `span_kind =~ "SPAN_KIND_SERVER"}) by (service_name,operation,le))`, + }, + { + name: "two services and span kinds result in regex 'or' symbol in query", + serviceNames: []string{"frontend", "emailservice"}, + spanKinds: []string{"SPAN_KIND_SERVER", "SPAN_KIND_CLIENT"}, + groupByOperation: false, + wantName: "service_latencies", + wantDescription: "0.95th quantile latency, grouped by service", + wantLabels: map[string]string{ + "service_name": "emailservice", + }, + wantPromQlQuery: `histogram_quantile(0.95, sum(latency_bucket{service_name =~ "frontend|emailservice", ` + + `span_kind =~ "SPAN_KIND_SERVER|SPAN_KIND_CLIENT"}) by (service_name,le))`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + params := metricsstore.LatenciesQueryParameters{ + BaseQueryParameters: buildTestBaseQueryParametersFrom(tc), + Quantile: 0.95, + } + reader, mockPrometheus := prepareMetricsReaderAndServer(t, tc.wantPromQlQuery, nil) + defer mockPrometheus.Close() - reader, err := NewMetricsReader(logger, "localhost:1234", defaultTimeout) - assert.NoError(t, err) + m, err := reader.GetLatencies(context.Background(), ¶ms) + require.NoError(t, err) + assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription) + }) + } +} - m, err := reader.GetCallRates(context.Background(), ¶ms) - assert.NoError(t, err) - assert.Empty(t, m) +func TestGetCallRates(t *testing.T) { + for _, tc := range []metricsTestCase{ + { + name: "group by service only should be reflected in name/description and query group-by", + serviceNames: []string{"emailservice"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOperation: false, + wantName: "service_call_rate", + wantDescription: "calls/sec, grouped by service", + wantLabels: map[string]string{ + "service_name": "emailservice", + }, + wantPromQlQuery: `sum(rate(calls_total{service_name =~ "emailservice", ` + + `span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name)`, + }, + { + name: "group by service and operation should be reflected in name/description and query group-by", + serviceNames: []string{"emailservice"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOperation: true, + wantName: "service_operation_call_rate", + wantDescription: "calls/sec, grouped by service & operation", + wantLabels: map[string]string{ + "operation": "/OrderResult", + "service_name": "emailservice", + }, + wantPromQlQuery: `sum(rate(calls_total{service_name =~ "emailservice", ` + + `span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name,operation)`, + }, + { + name: "two services and span kinds result in regex 'or' symbol in query", + serviceNames: []string{"frontend", "emailservice"}, + spanKinds: []string{"SPAN_KIND_SERVER", "SPAN_KIND_CLIENT"}, + groupByOperation: false, + wantName: "service_call_rate", + wantDescription: "calls/sec, grouped by service", + wantLabels: map[string]string{ + "service_name": "emailservice", + }, + wantPromQlQuery: `sum(rate(calls_total{service_name =~ "frontend|emailservice", ` + + `span_kind =~ "SPAN_KIND_SERVER|SPAN_KIND_CLIENT"}[10m])) by (service_name)`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + params := metricsstore.CallRateQueryParameters{ + BaseQueryParameters: buildTestBaseQueryParametersFrom(tc), + } + reader, mockPrometheus := prepareMetricsReaderAndServer(t, tc.wantPromQlQuery, nil) + defer mockPrometheus.Close() + + m, err := reader.GetCallRates(context.Background(), ¶ms) + require.NoError(t, err) + assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription) + }) + } } func TestGetErrorRates(t *testing.T) { - params := metricsstore.ErrorRateQueryParameters{} - logger := zap.NewNop() + for _, tc := range []metricsTestCase{ + { + name: "group by service only should be reflected in name/description and query group-by", + serviceNames: []string{"emailservice"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOperation: false, + wantName: "service_error_rate", + wantDescription: "error rate, computed as a fraction of errors/sec over calls/sec, grouped by service", + wantLabels: map[string]string{ + "service_name": "emailservice", + }, + wantPromQlQuery: `sum(rate(calls_total{service_name =~ "emailservice", status_code = "STATUS_CODE_ERROR", ` + + `span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name) / ` + + `sum(rate(calls_total{service_name =~ "emailservice", span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name)`, + }, + { + name: "group by service and operation should be reflected in name/description and query group-by", + serviceNames: []string{"emailservice"}, + spanKinds: []string{"SPAN_KIND_SERVER"}, + groupByOperation: true, + wantName: "service_operation_error_rate", + wantDescription: "error rate, computed as a fraction of errors/sec over calls/sec, grouped by service & operation", + wantLabels: map[string]string{ + "operation": "/OrderResult", + "service_name": "emailservice", + }, + wantPromQlQuery: `sum(rate(calls_total{service_name =~ "emailservice", status_code = "STATUS_CODE_ERROR", ` + + `span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name,operation) / ` + + `sum(rate(calls_total{service_name =~ "emailservice", span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name,operation)`, + }, + { + name: "two services and span kinds result in regex 'or' symbol in query", + serviceNames: []string{"frontend", "emailservice"}, + spanKinds: []string{"SPAN_KIND_SERVER", "SPAN_KIND_CLIENT"}, + groupByOperation: false, + wantName: "service_error_rate", + wantDescription: "error rate, computed as a fraction of errors/sec over calls/sec, grouped by service", + wantLabels: map[string]string{ + "service_name": "emailservice", + }, + wantPromQlQuery: `sum(rate(calls_total{service_name =~ "frontend|emailservice", status_code = "STATUS_CODE_ERROR", ` + + `span_kind =~ "SPAN_KIND_SERVER|SPAN_KIND_CLIENT"}[10m])) by (service_name) / ` + + `sum(rate(calls_total{service_name =~ "frontend|emailservice", span_kind =~ "SPAN_KIND_SERVER|SPAN_KIND_CLIENT"}[10m])) by (service_name)`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + params := metricsstore.ErrorRateQueryParameters{ + BaseQueryParameters: buildTestBaseQueryParametersFrom(tc), + } + reader, mockPrometheus := prepareMetricsReaderAndServer(t, tc.wantPromQlQuery, nil) + defer mockPrometheus.Close() - reader, err := NewMetricsReader(logger, "localhost:1234", defaultTimeout) - assert.NoError(t, err) + m, err := reader.GetErrorRates(context.Background(), ¶ms) + require.NoError(t, err) + assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription) + }) + } +} + +func TestWarningResponse(t *testing.T) { + params := metricsstore.ErrorRateQueryParameters{ + BaseQueryParameters: buildTestBaseQueryParametersFrom(metricsTestCase{serviceNames: []string{"foo"}}), + } + reader, mockPrometheus := prepareMetricsReaderAndServer(t, "", []string{"warning0", "warning1"}) + defer mockPrometheus.Close() m, err := reader.GetErrorRates(context.Background(), ¶ms) - assert.NoError(t, err) - assert.Empty(t, m) + require.NoError(t, err) + assert.NotNil(t, m) +} + +func startMockPrometheusServer(t *testing.T, wantPromQlQuery string, wantWarnings []string) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if len(wantWarnings) > 0 { + sendResponse(t, w, "testdata/warning_response.json") + return + } + + body, _ := ioutil.ReadAll(r.Body) + defer r.Body.Close() + + u, err := url.Parse("http://" + r.Host + r.RequestURI + "?" + string(body)) + require.NoError(t, err) + + q := u.Query() + promQuery := q.Get("query") + assert.Equal(t, wantPromQlQuery, promQuery) + + mockResponsePayloadFile := "testdata/service_datapoint_response.json" + if strings.Contains(promQuery, "by (service_name,operation") { + mockResponsePayloadFile = "testdata/service_operation_datapoint_response.json" + } + sendResponse(t, w, mockResponsePayloadFile) + })) +} + +func sendResponse(t *testing.T, w http.ResponseWriter, responseFile string) error { + bytes, err := os.ReadFile(responseFile) + if err != nil { + return err + } + + _, err = fmt.Fprintln(w, string(bytes)) + return err +} + +func buildTestBaseQueryParametersFrom(tc metricsTestCase) metricsstore.BaseQueryParameters { + endTime := time.Now() + lookback := time.Minute + step := time.Millisecond + ratePer := 10 * time.Minute + + return metricsstore.BaseQueryParameters{ + ServiceNames: tc.serviceNames, + GroupByOperation: tc.groupByOperation, + EndTime: &endTime, + Lookback: &lookback, + Step: &step, + RatePer: &ratePer, + SpanKinds: tc.spanKinds, + } +} + +func prepareMetricsReaderAndServer(t *testing.T, wantPromQlQuery string, wantWarnings []string) (metricsstore.Reader, *httptest.Server) { + mockPrometheus := startMockPrometheusServer(t, wantPromQlQuery, wantWarnings) + + logger := zap.NewNop() + address := mockPrometheus.Listener.Addr().String() + reader, err := NewMetricsReader(logger, address, defaultTimeout) + require.NoError(t, err) + return reader, mockPrometheus +} + +func assertMetrics(t *testing.T, gotMetrics *metrics.MetricFamily, wantLabels map[string]string, wantName, wantDescription string) { + assert.Len(t, gotMetrics.Metrics, 1) + assert.Equal(t, wantName, gotMetrics.Name) + assert.Equal(t, wantDescription, gotMetrics.Help) + mps := gotMetrics.Metrics[0].MetricPoints + assert.Len(t, mps, 1) + + // There is no guaranteed order of labels, so we need to take the approach of using a map of expected values. + labels := gotMetrics.Metrics[0].Labels + assert.Equal(t, len(wantLabels), len(labels)) + for _, l := range labels { + assert.Contains(t, wantLabels, l.Name) + assert.Equal(t, wantLabels[l.Name], l.Value) + delete(wantLabels, l.Name) + } + assert.Empty(t, wantLabels) + assert.Equal(t, int64(1620351786), mps[0].Timestamp.GetSeconds()) + + actualVal := mps[0].Value.(*metrics.MetricPoint_GaugeValue).GaugeValue.Value.(*metrics.GaugeValue_DoubleValue).DoubleValue + assert.Equal(t, float64(9223372036854), actualVal) } diff --git a/plugin/metrics/prometheus/metricsstore/testdata/service_datapoint_response.json b/plugin/metrics/prometheus/metricsstore/testdata/service_datapoint_response.json new file mode 100644 index 00000000000..e0a046d5c85 --- /dev/null +++ b/plugin/metrics/prometheus/metricsstore/testdata/service_datapoint_response.json @@ -0,0 +1,19 @@ +{ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "service_name": "emailservice" + }, + "values": [ + [ + 1620351786, + "9223372036854" + ] + ] + } + ] + } +} \ No newline at end of file diff --git a/plugin/metrics/prometheus/metricsstore/testdata/service_operation_datapoint_response.json b/plugin/metrics/prometheus/metricsstore/testdata/service_operation_datapoint_response.json new file mode 100644 index 00000000000..bfe13afc8fd --- /dev/null +++ b/plugin/metrics/prometheus/metricsstore/testdata/service_operation_datapoint_response.json @@ -0,0 +1,20 @@ +{ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "operation": "/OrderResult", + "service_name": "emailservice" + }, + "values": [ + [ + 1620351786, + "9223372036854" + ] + ] + } + ] + } +} \ No newline at end of file diff --git a/plugin/metrics/prometheus/metricsstore/testdata/warning_response.json b/plugin/metrics/prometheus/metricsstore/testdata/warning_response.json new file mode 100644 index 00000000000..46f19a9b857 --- /dev/null +++ b/plugin/metrics/prometheus/metricsstore/testdata/warning_response.json @@ -0,0 +1,8 @@ +{ + "status": "warning", + "warnings": ["warning0", "warning1"], + "data": { + "resultType": "matrix", + "result": [] + } +} \ No newline at end of file