Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OTEL tracer to metricsstore reader component #4595

Merged
merged 13 commits into from
Jul 29, 2023
7 changes: 5 additions & 2 deletions pkg/jtracer/jtracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func New(serviceName string) (*JTracer, error) {
}

func NoOp() *JTracer {
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider()}
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider(), closer: nil}
}

// initOTEL initializes OTEL Tracer
Expand Down Expand Up @@ -107,5 +107,8 @@ func otelExporter(ctx context.Context) (sdktrace.SpanExporter, error) {

// Shutdown the tracerProvider to clean up resources
func (jt *JTracer) Close(ctx context.Context) error {
return jt.closer(ctx)
if jt.closer != nil {
return jt.closer(ctx)
}
return nil
}
6 changes: 5 additions & 1 deletion plugin/metrics/prometheus/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"flag"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin"
Expand All @@ -31,11 +33,13 @@ var _ plugin.Configurable = (*Factory)(nil)
type Factory struct {
options *Options
logger *zap.Logger
tracer trace.TracerProvider
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
options: NewOptions("prometheus"),
}
}
Expand All @@ -60,5 +64,5 @@ func (f *Factory) Initialize(logger *zap.Logger) error {

// CreateMetricsReader implements storage.MetricsFactory.
func (f *Factory) CreateMetricsReader() (metricsstore.Reader, error) {
return prometheusstore.NewMetricsReader(f.logger, f.options.Primary.Configuration)
return prometheusstore.NewMetricsReader(f.options.Primary.Configuration, f.logger, f.tracer)
}
38 changes: 22 additions & 16 deletions plugin/metrics/prometheus/metricsstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
"time"
"unicode"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/api"
promapi "github.com/prometheus/client_golang/api/prometheus/v1"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand All @@ -49,6 +50,7 @@ type (
MetricsReader struct {
client promapi.API
logger *zap.Logger
tracer trace.Tracer

metricsTranslator dbmodel.Translator
latencyMetricName string
Expand All @@ -73,7 +75,7 @@ type (
)

// NewMetricsReader returns a new MetricsReader.
func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsReader, error) {
func NewMetricsReader(cfg config.Configuration, logger *zap.Logger, tracer trace.TracerProvider) (*MetricsReader, error) {
logger.Info("Creating metrics reader", zap.Any("configuration", cfg))

roundTripper, err := getHTTPRoundTripper(&cfg, logger)
Expand All @@ -96,6 +98,7 @@ func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsRea
mr := &MetricsReader{
client: promapi.NewAPI(client),
logger: logger,
tracer: tracer.Tracer("prom-metrics-reader"),

metricsTranslator: dbmodel.New(operationLabel),
callsMetricName: buildFullCallsMetricName(cfg),
Expand Down Expand Up @@ -224,8 +227,8 @@ func (m MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) (
}
promQuery := m.buildPromQuery(p)

span, ctx := startSpanForQuery(ctx, p.metricName, promQuery)
defer span.Finish()
ctx, span := startSpanForQuery(ctx, p.metricName, promQuery, m.tracer)
defer span.End()

queryRange := promapi.Range{
Start: p.EndTime.Add(-1 * *p.Lookback),
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -235,8 +238,9 @@ func (m MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) (

mv, warnings, err := m.client.QueryRange(ctx, promQuery, queryRange)
if err != nil {
err = fmt.Errorf("failed executing metrics query: %w", err)
logErrorToSpan(span, err)
return &metrics.MetricFamily{}, fmt.Errorf("failed executing metrics query: %w", err)
return &metrics.MetricFamily{}, err
}
if len(warnings) > 0 {
m.logger.Warn("Warnings detected on Prometheus query", zap.Any("warnings", warnings), zap.String("query", promQuery), zap.Any("range", queryRange))
Expand Down Expand Up @@ -287,17 +291,19 @@ func promqlDurationString(d *time.Duration) string {
return string(b)
}

func startSpanForQuery(ctx context.Context, metricName, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, metricName)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "prometheus")
ottag.Component.Set(span, "promql")
return span, ctx
func startSpanForQuery(ctx context.Context, metricName, query string, tp trace.Tracer) (context.Context, trace.Span) {
ctx, span := tp.Start(ctx, metricName)
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("prometheus"),
attribute.Key("component").String("promql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
func logErrorToSpan(span trace.Span, err error) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

func getHTTPRoundTripper(c *config.Configuration, logger *zap.Logger) (rt http.RoundTripper, err error) {
Expand Down
78 changes: 59 additions & 19 deletions plugin/metrics/prometheus/metricsstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand Down Expand Up @@ -61,22 +65,38 @@ var defaultConfig = config.Configuration{
LatencyUnit: "ms",
}

func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter, func()) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
)
closer := func() {
assert.NoError(t, tp.Shutdown(context.Background()))
}
return tp, exporter, closer
}

func TestNewMetricsReaderValidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
defer closer()
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://localhost:1234",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.NoError(t, err)
assert.NotNil(t, reader)
}

func TestNewMetricsReaderInvalidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
defer closer()
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "\n",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to initialize prometheus client")
assert.Nil(t, reader)
Expand All @@ -85,14 +105,16 @@ func TestNewMetricsReaderInvalidAddress(t *testing.T) {
func TestGetMinStepDuration(t *testing.T) {
params := metricsstore.MinStepDurationQueryParameters{}
logger := zap.NewNop()
tracer, _, closer := tracerProvider(t)
defer closer()
listener, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)
assert.NotNil(t, listener)

reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + listener.Addr().String(),
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

minStep, err := reader.GetMinStepDuration(context.Background(), &params)
Expand Down Expand Up @@ -121,18 +143,20 @@ func TestMetricsServerError(t *testing.T) {
defer mockPrometheus.Close()

logger := zap.NewNop()
tracer, exp, closer := tracerProvider(t)
defer closer()
address := mockPrometheus.Listener.Addr().String()
reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + address,
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.NoError(t, err)

m, err := reader.GetCallRates(context.Background(), &params)
assert.NotNil(t, m)

require.Error(t, err)
assert.Contains(t, err.Error(), "failed executing metrics query")
require.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
assert.Equal(t, codes.Error, exp.GetSpans()[0].Status.Code)
}

func TestGetLatencies(t *testing.T) {
Expand Down Expand Up @@ -221,16 +245,19 @@ func TestGetLatencies(t *testing.T) {
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
Quantile: 0.95,
}
tracer, exp, closer := tracerProvider(t)
defer closer()
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetLatencies(context.Background(), &params)
require.NoError(t, err)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
})
}
}
Expand Down Expand Up @@ -318,16 +345,19 @@ func TestGetCallRates(t *testing.T) {
params := metricsstore.CallRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
}
tracer, exp, closer := tracerProvider(t)
defer closer()
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetCallRates(context.Background(), &params)
require.NoError(t, err)
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
})
}
}
Expand Down Expand Up @@ -440,16 +470,19 @@ func TestGetErrorRates(t *testing.T) {
params := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
}
tracer, exp, closer := tracerProvider(t)
defer closer()
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetErrorRates(context.Background(), &params)
require.NoError(t, err)
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
})
}
}
Expand All @@ -460,24 +493,29 @@ func TestInvalidLatencyUnit(t *testing.T) {
t.Errorf("Expected a panic due to invalid latency unit")
}
}()
tracer, _, closer := tracerProvider(t)
defer closer()
cfg := config.Configuration{
SupportSpanmetricsConnector: true,
NormalizeDuration: true,
LatencyUnit: "something invalid",
}
_, _ = NewMetricsReader(zap.NewNop(), cfg)
_, _ = NewMetricsReader(cfg, zap.NewNop(), tracer)
}

func TestWarningResponse(t *testing.T) {
params := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(metricsTestCase{serviceNames: []string{"foo"}}),
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"})
tracer, exp, closer := tracerProvider(t)
defer closer()
reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"}, tracer)
defer mockPrometheus.Close()

m, err := reader.GetErrorRates(context.Background(), &params)
require.NoError(t, err)
assert.NotNil(t, m)
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
}

func TestGetRoundTripperTLSConfig(t *testing.T) {
Expand Down Expand Up @@ -571,14 +609,16 @@ func TestGetRoundTripperTokenError(t *testing.T) {

func TestInvalidCertFile(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
defer closer()
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "https://localhost:1234",
ConnectTimeout: defaultTimeout,
TLS: tlscfg.Options{
Enabled: true,
CAPath: "foo",
},
})
}, logger, tracer)
require.Error(t, err)
assert.Nil(t, reader)
}
Expand Down Expand Up @@ -635,7 +675,7 @@ func buildTestBaseQueryParametersFrom(tc metricsTestCase) metricsstore.BaseQuery
}
}

func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string) (metricsstore.Reader, *httptest.Server) {
func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string, tracer trace.TracerProvider) (metricsstore.Reader, *httptest.Server) {
mockPrometheus := startMockPrometheusServer(t, wantPromQlQuery, wantWarnings)

logger := zap.NewNop()
Expand All @@ -644,7 +684,7 @@ func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wa
config.ServerURL = "http://" + address
config.ConnectTimeout = defaultTimeout

reader, err := NewMetricsReader(logger, config)
reader, err := NewMetricsReader(config, logger, tracer)
require.NoError(t, err)

return reader, mockPrometheus
Expand Down