Skip to content

Commit

Permalink
Implement metrics reader (#3004)
Browse files Browse the repository at this point in the history
Signed-off-by: Albert Teoh <albert.teoh@logz.io>
  • Loading branch information
albertteoh committed May 27, 2021
1 parent 23bbd63 commit c0fb781
Show file tree
Hide file tree
Showing 7 changed files with 708 additions and 43 deletions.
93 changes: 93 additions & 0 deletions plugin/metrics/prometheus/metricsstore/dbmodel/to_domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbmodel

import (
"fmt"

"github.com/gogo/protobuf/types"
"github.com/prometheus/common/model"

"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
)

// ToDomainMetricsFamily converts Prometheus' representation of metrics query results to Jaeger's.
func ToDomainMetricsFamily(name, description string, mv model.Value) (*metrics.MetricFamily, error) {
if mv.Type() != model.ValMatrix {
return &metrics.MetricFamily{}, fmt.Errorf("unexpected metrics ValueType: %s", mv.Type())
}
return &metrics.MetricFamily{
Name: name,
Type: metrics.MetricType_GAUGE,
Help: description,
Metrics: toDomainMetrics(mv.(model.Matrix)),
}, nil
}

// toDomainMetrics converts Prometheus' representation of metrics to Jaeger's.
func toDomainMetrics(matrix model.Matrix) []*metrics.Metric {
ms := make([]*metrics.Metric, matrix.Len())
for i, ss := range matrix {
ms[i] = &metrics.Metric{
Labels: toDomainLabels(ss.Metric),
MetricPoints: toDomainMetricPoints(ss.Values),
}
}
return ms
}

// toDomainLabels converts Prometheus' representation of metric labels to Jaeger's.
func toDomainLabels(promLabels model.Metric) []*metrics.Label {
labels := make([]*metrics.Label, len(promLabels))
j := 0
for k, v := range promLabels {
labels[j] = &metrics.Label{Name: string(k), Value: string(v)}
j++
}
return labels
}

// toDomainMetricPoints convert's Prometheus' representation of metrics data points to Jaeger's.
func toDomainMetricPoints(promDps []model.SamplePair) []*metrics.MetricPoint {
domainMps := make([]*metrics.MetricPoint, len(promDps))
for i, promDp := range promDps {
mp := &metrics.MetricPoint{
Timestamp: toDomainTimestamp(promDp.Timestamp),
Value: toDomainMetricPointValue(promDp.Value),
}
domainMps[i] = mp
}
return domainMps
}

// toDomainTimestamp converts Prometheus' representation of timestamps to Jaeger's.
func toDomainTimestamp(timeMs model.Time) *types.Timestamp {
return &types.Timestamp{
Seconds: int64(timeMs / 1000),
Nanos: int32((timeMs % 1000) * 1_000_000),
}
}

// toDomainMetricPointValue converts Prometheus' representation of a double gauge value to Jaeger's.
// The gauge metric type is used because latency, call and error rates metrics do not consist of monotonically
// increasing values; rather, they are a series of any positive floating number which can fluctuate in any
// direction over time.
func toDomainMetricPointValue(promVal model.SampleValue) *metrics.MetricPoint_GaugeValue {
return &metrics.MetricPoint_GaugeValue{
GaugeValue: &metrics.GaugeValue{
Value: &metrics.GaugeValue_DoubleValue{DoubleValue: float64(promVal)},
},
}
}
69 changes: 69 additions & 0 deletions plugin/metrics/prometheus/metricsstore/dbmodel/to_domain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbmodel

import (
"testing"
"time"

"github.com/gogo/protobuf/types"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
)

func TestToDomainMetricsFamily(t *testing.T) {
promMetrics := model.Matrix{}
nowSec := time.Now().Unix()
promMetrics = append(promMetrics, &model.SampleStream{
Metric: map[model.LabelName]model.LabelValue{"label_key": "label_value"},
Values: []model.SamplePair{
{Timestamp: model.Time(nowSec * 1000), Value: 1234},
},
})
mf, err := ToDomainMetricsFamily("the_metric_name", "the_metric_description", promMetrics)
require.NoError(t, err)

assert.NotEmpty(t, mf)

assert.Equal(t, "the_metric_name", mf.Name)
assert.Equal(t, "the_metric_description", mf.Help)
assert.Equal(t, metrics.MetricType_GAUGE, mf.Type)

assert.Len(t, mf.Metrics, 1)
assert.Equal(t, []*metrics.Label{{Name: "label_key", Value: "label_value"}}, mf.Metrics[0].Labels)

wantMpValue := &metrics.MetricPoint_GaugeValue{
GaugeValue: &metrics.GaugeValue{
Value: &metrics.GaugeValue_DoubleValue{
DoubleValue: 1234,
},
},
}
assert.Equal(t, []*metrics.MetricPoint{{Timestamp: &types.Timestamp{Seconds: nowSec}, Value: wantMpValue}}, mf.Metrics[0].MetricPoints)
}

func TestUnexpectedMetricsFamilyType(t *testing.T) {
promMetrics := model.Vector{}
mf, err := ToDomainMetricsFamily("the_metric_name", "the_metric_description", promMetrics)

assert.NotNil(t, mf)
assert.Empty(t, mf)

require.Error(t, err)
assert.EqualError(t, err, "unexpected metrics ValueType: vector")
}
195 changes: 179 additions & 16 deletions plugin/metrics/prometheus/metricsstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,60 @@ package metricsstore

import (
"context"
"fmt"
"net"
"net/http"
"strings"
"time"
"unicode"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/api"
promapi "github.com/prometheus/client_golang/api/prometheus/v1"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin/metrics/prometheus/metricsstore/dbmodel"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
)

// MetricsReader is a Prometheus metrics reader.
type MetricsReader struct {
client promapi.API
logger *zap.Logger
}
const (
minStep = time.Millisecond

latenciesMetricName = "service_latencies"
latenciesMetricDesc = "%.2fth quantile latency, grouped by service"

callsMetricName = "service_call_rate"
callsMetricDesc = "calls/sec, grouped by service"

errorsMetricName = "service_error_rate"
errorsMetricDesc = "error rate, computed as a fraction of errors/sec over calls/sec, grouped by service"
)

type (
// MetricsReader is a Prometheus metrics reader.
MetricsReader struct {
client promapi.API
logger *zap.Logger
}

promQueryParams struct {
groupBy string
spanKindFilter string
serviceFilter string
rate string
}

metricsQueryParams struct {
metricsstore.BaseQueryParameters
groupByHistBucket bool
metricName string
metricDesc string
buildPromQuery func(p promQueryParams) string
}
)

// NewMetricsReader returns a new MetricsReader.
func NewMetricsReader(logger *zap.Logger, hostPort string, connectTimeout time.Duration) (*MetricsReader, error) {
Expand All @@ -52,7 +89,7 @@ func NewMetricsReader(logger *zap.Logger, hostPort string, connectTimeout time.D
RoundTripper: roundTripper,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize prometheus client: %w", err)
}
mr := &MetricsReader{
client: promapi.NewAPI(client),
Expand All @@ -63,24 +100,150 @@ func NewMetricsReader(logger *zap.Logger, hostPort string, connectTimeout time.D
}

// GetLatencies gets the latency metrics for the given set of latency query parameters.
func (m *MetricsReader) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) {
// TODO: Implement me
return &metrics.MetricFamily{}, nil
func (m *MetricsReader) GetLatencies(ctx context.Context, requestParams *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) {
metricsParams := metricsQueryParams{
BaseQueryParameters: requestParams.BaseQueryParameters,
groupByHistBucket: true,
metricName: latenciesMetricName,
metricDesc: fmt.Sprintf(latenciesMetricDesc, requestParams.Quantile),
buildPromQuery: func(p promQueryParams) string {
return fmt.Sprintf(
// Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection.
`histogram_quantile(%.2f, sum(latency_bucket{service_name =~ "%s", %s}) by (%s))`,
requestParams.Quantile,
p.serviceFilter,
p.spanKindFilter,
p.groupBy,
)
},
}
return m.executeQuery(ctx, metricsParams)
}

// GetCallRates gets the call rate metrics for the given set of call rate query parameters.
func (m *MetricsReader) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) {
// TODO: Implement me
return &metrics.MetricFamily{}, nil
func (m *MetricsReader) GetCallRates(ctx context.Context, requestParams *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) {
metricsParams := metricsQueryParams{
BaseQueryParameters: requestParams.BaseQueryParameters,
metricName: callsMetricName,
metricDesc: callsMetricDesc,
buildPromQuery: func(p promQueryParams) string {
return fmt.Sprintf(
// Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection.
`sum(rate(calls_total{service_name =~ "%s", %s}[%s])) by (%s)`,
p.serviceFilter,
p.spanKindFilter,
p.rate,
p.groupBy,
)
},
}
return m.executeQuery(ctx, metricsParams)
}

// GetErrorRates gets the error rate metrics for the given set of error rate query parameters.
func (m *MetricsReader) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) {
// TODO: Implement me
return &metrics.MetricFamily{}, nil
func (m *MetricsReader) GetErrorRates(ctx context.Context, requestParams *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) {
metricsParams := metricsQueryParams{
BaseQueryParameters: requestParams.BaseQueryParameters,
metricName: errorsMetricName,
metricDesc: errorsMetricDesc,
buildPromQuery: func(p promQueryParams) string {
return fmt.Sprintf(
// Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection.
`sum(rate(calls_total{service_name =~ "%s", status_code = "STATUS_CODE_ERROR", %s}[%s])) by (%s) / sum(rate(calls_total{service_name =~ "%s", %s}[%s])) by (%s)`,
p.serviceFilter, p.spanKindFilter, p.rate, p.groupBy,
p.serviceFilter, p.spanKindFilter, p.rate, p.groupBy,
)
},
}
return m.executeQuery(ctx, metricsParams)
}

// GetMinStepDuration gets the minimum step duration (the smallest possible duration between two data points in a time series) supported.
func (m *MetricsReader) GetMinStepDuration(_ context.Context, _ *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) {
return time.Millisecond, nil
return minStep, nil
}

// executeQuery executes a query against a Prometheus-compliant metrics backend.
func (m *MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) (*metrics.MetricFamily, error) {
if p.GroupByOperation {
p.metricName = strings.Replace(p.metricName, "service", "service_operation", 1)
p.metricDesc += " & operation"
}
promQuery := buildPromQuery(p)

span, ctx := startSpanForQuery(ctx, p.metricName, promQuery)
defer span.Finish()

queryRange := promapi.Range{
Start: p.EndTime.Add(-1 * *p.Lookback),
End: *p.EndTime,
Step: *p.Step,
}

m.logger.Debug("Executing Prometheus query", zap.String("query", promQuery), zap.Any("range", queryRange))

mv, warnings, err := m.client.QueryRange(ctx, promQuery, queryRange)
if err != nil {
logErrorToSpan(span, err)
return &metrics.MetricFamily{}, fmt.Errorf("failed executing metrics query: %w", err)
}
if len(warnings) > 0 {
m.logger.Warn("Warnings detected on Prometheus query", zap.Any("warnings", warnings))
}

m.logger.Debug("Prometheus query results", zap.String("results", mv.String()))
return dbmodel.ToDomainMetricsFamily(
p.metricName,
p.metricDesc,
mv,
)
}

func buildPromQuery(metricsParams metricsQueryParams) string {
groupBy := []string{"service_name"}
if metricsParams.GroupByOperation {
groupBy = append(groupBy, "operation")
}
if metricsParams.groupByHistBucket {
// Group by the bucket value ("le" => "less than or equal to").
groupBy = append(groupBy, "le")
}

spanKindFilter := ""
if len(metricsParams.SpanKinds) > 0 {
spanKindFilter = fmt.Sprintf(`span_kind =~ "%s"`, strings.Join(metricsParams.SpanKinds, "|"))
}
promParams := promQueryParams{
serviceFilter: strings.Join(metricsParams.ServiceNames, "|"),
spanKindFilter: spanKindFilter,
rate: promqlDurationString(metricsParams.RatePer),
groupBy: strings.Join(groupBy, ","),
}
return metricsParams.buildPromQuery(promParams)
}

// promqlDurationString formats the duration string to be promQL-compliant.
// PromQL only accepts "single-unit" durations like "30s", "1m", "1h"; not "1h5s" or "1m0s".
func promqlDurationString(d *time.Duration) string {
var b []byte
for _, c := range d.String() {
b = append(b, byte(c))
if unicode.IsLetter(c) {
break
}
}
return string(b)
}

func startSpanForQuery(ctx context.Context, metricName, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, metricName)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "prometheus")
ottag.Component.Set(span, "promql")
return span, ctx
}

func logErrorToSpan(span opentracing.Span, err error) {
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
}
Loading

0 comments on commit c0fb781

Please sign in to comment.