From 287562ced1e79250e75fab8d4979d17f5fe1097e Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Sun, 30 Jul 2023 00:45:29 +0530 Subject: [PATCH] Replace metricsstore/reader tracing instrumentation with OpenTelemetry (#4595) ## Which problem is this PR solving? * Part of #3381 * This PR adds OTEL Provider to metrics reader component ## Short description of the changes - Replaces OT tracer with OTEL tracer to support jtracer pkg --------- Signed-off-by: Afzal Ansari --- pkg/jtracer/jtracer.go | 7 +- plugin/metrics/prometheus/factory.go | 6 +- .../metrics/prometheus/metricsstore/reader.go | 38 +++++---- .../prometheus/metricsstore/reader_test.go | 78 ++++++++++++++----- 4 files changed, 91 insertions(+), 38 deletions(-) diff --git a/pkg/jtracer/jtracer.go b/pkg/jtracer/jtracer.go index 32e9c56c241..7a95441451c 100644 --- a/pkg/jtracer/jtracer.go +++ b/pkg/jtracer/jtracer.go @@ -61,7 +61,7 @@ func New(serviceName string) (*JTracer, error) { } func NoOp() *JTracer { - return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider()} + return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider(), closer: nil} } // initOTEL initializes OTEL Tracer @@ -107,5 +107,8 @@ func otelExporter(ctx context.Context) (sdktrace.SpanExporter, error) { // Shutdown the tracerProvider to clean up resources func (jt *JTracer) Close(ctx context.Context) error { - return jt.closer(ctx) + if jt.closer != nil { + return jt.closer(ctx) + } + return nil } diff --git a/plugin/metrics/prometheus/factory.go b/plugin/metrics/prometheus/factory.go index a29416d397a..60452061d67 100644 --- a/plugin/metrics/prometheus/factory.go +++ b/plugin/metrics/prometheus/factory.go @@ -18,6 +18,8 @@ import ( "flag" "github.com/spf13/viper" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/plugin" @@ -31,11 +33,13 @@ var _ plugin.Configurable = (*Factory)(nil) type Factory struct { options *Options logger *zap.Logger + tracer trace.TracerProvider } // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ + tracer: otel.GetTracerProvider(), options: NewOptions("prometheus"), } } @@ -60,5 +64,5 @@ func (f *Factory) Initialize(logger *zap.Logger) error { // CreateMetricsReader implements storage.MetricsFactory. func (f *Factory) CreateMetricsReader() (metricsstore.Reader, error) { - return prometheusstore.NewMetricsReader(f.logger, f.options.Primary.Configuration) + return prometheusstore.NewMetricsReader(f.options.Primary.Configuration, f.logger, f.tracer) } diff --git a/plugin/metrics/prometheus/metricsstore/reader.go b/plugin/metrics/prometheus/metricsstore/reader.go index 0cf6852183d..3e3a218d6af 100644 --- a/plugin/metrics/prometheus/metricsstore/reader.go +++ b/plugin/metrics/prometheus/metricsstore/reader.go @@ -26,11 +26,12 @@ import ( "time" "unicode" - "github.com/opentracing/opentracing-go" - ottag "github.com/opentracing/opentracing-go/ext" - otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/api" promapi "github.com/prometheus/client_golang/api/prometheus/v1" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/bearertoken" @@ -49,6 +50,7 @@ type ( MetricsReader struct { client promapi.API logger *zap.Logger + tracer trace.Tracer metricsTranslator dbmodel.Translator latencyMetricName string @@ -73,7 +75,7 @@ type ( ) // NewMetricsReader returns a new MetricsReader. -func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsReader, error) { +func NewMetricsReader(cfg config.Configuration, logger *zap.Logger, tracer trace.TracerProvider) (*MetricsReader, error) { logger.Info("Creating metrics reader", zap.Any("configuration", cfg)) roundTripper, err := getHTTPRoundTripper(&cfg, logger) @@ -96,6 +98,7 @@ func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsRea mr := &MetricsReader{ client: promapi.NewAPI(client), logger: logger, + tracer: tracer.Tracer("prom-metrics-reader"), metricsTranslator: dbmodel.New(operationLabel), callsMetricName: buildFullCallsMetricName(cfg), @@ -224,8 +227,8 @@ func (m MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) ( } promQuery := m.buildPromQuery(p) - span, ctx := startSpanForQuery(ctx, p.metricName, promQuery) - defer span.Finish() + ctx, span := startSpanForQuery(ctx, p.metricName, promQuery, m.tracer) + defer span.End() queryRange := promapi.Range{ Start: p.EndTime.Add(-1 * *p.Lookback), @@ -235,8 +238,9 @@ func (m MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) ( mv, warnings, err := m.client.QueryRange(ctx, promQuery, queryRange) if err != nil { + err = fmt.Errorf("failed executing metrics query: %w", err) logErrorToSpan(span, err) - return &metrics.MetricFamily{}, fmt.Errorf("failed executing metrics query: %w", err) + return &metrics.MetricFamily{}, err } if len(warnings) > 0 { m.logger.Warn("Warnings detected on Prometheus query", zap.Any("warnings", warnings), zap.String("query", promQuery), zap.Any("range", queryRange)) @@ -287,17 +291,19 @@ func promqlDurationString(d *time.Duration) string { return string(b) } -func startSpanForQuery(ctx context.Context, metricName, query string) (opentracing.Span, context.Context) { - span, ctx := opentracing.StartSpanFromContext(ctx, metricName) - ottag.DBStatement.Set(span, query) - ottag.DBType.Set(span, "prometheus") - ottag.Component.Set(span, "promql") - return span, ctx +func startSpanForQuery(ctx context.Context, metricName, query string, tp trace.Tracer) (context.Context, trace.Span) { + ctx, span := tp.Start(ctx, metricName) + span.SetAttributes( + attribute.Key(semconv.DBStatementKey).String(query), + attribute.Key(semconv.DBSystemKey).String("prometheus"), + attribute.Key("component").String("promql"), + ) + return ctx, span } -func logErrorToSpan(span opentracing.Span, err error) { - ottag.Error.Set(span, true) - span.LogFields(otlog.Error(err)) +func logErrorToSpan(span trace.Span, err error) { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) } func getHTTPRoundTripper(c *config.Configuration, logger *zap.Logger) (rt http.RoundTripper, err error) { diff --git a/plugin/metrics/prometheus/metricsstore/reader_test.go b/plugin/metrics/prometheus/metricsstore/reader_test.go index cf027f8d6a0..09fb85ec3df 100644 --- a/plugin/metrics/prometheus/metricsstore/reader_test.go +++ b/plugin/metrics/prometheus/metricsstore/reader_test.go @@ -29,6 +29,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/bearertoken" @@ -61,22 +65,38 @@ var defaultConfig = config.Configuration{ LatencyUnit: "ms", } +func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSyncer(exporter), + ) + closer := func() { + assert.NoError(t, tp.Shutdown(context.Background())) + } + return tp, exporter, closer +} + func TestNewMetricsReaderValidAddress(t *testing.T) { logger := zap.NewNop() - reader, err := NewMetricsReader(logger, config.Configuration{ + tracer, _, closer := tracerProvider(t) + defer closer() + reader, err := NewMetricsReader(config.Configuration{ ServerURL: "http://localhost:1234", ConnectTimeout: defaultTimeout, - }) + }, logger, tracer) require.NoError(t, err) assert.NotNil(t, reader) } func TestNewMetricsReaderInvalidAddress(t *testing.T) { logger := zap.NewNop() - reader, err := NewMetricsReader(logger, config.Configuration{ + tracer, _, closer := tracerProvider(t) + defer closer() + reader, err := NewMetricsReader(config.Configuration{ ServerURL: "\n", ConnectTimeout: defaultTimeout, - }) + }, logger, tracer) require.Error(t, err) assert.Contains(t, err.Error(), "failed to initialize prometheus client") assert.Nil(t, reader) @@ -85,14 +105,16 @@ func TestNewMetricsReaderInvalidAddress(t *testing.T) { func TestGetMinStepDuration(t *testing.T) { params := metricsstore.MinStepDurationQueryParameters{} logger := zap.NewNop() + tracer, _, closer := tracerProvider(t) + defer closer() listener, err := net.Listen("tcp", "localhost:") require.NoError(t, err) assert.NotNil(t, listener) - reader, err := NewMetricsReader(logger, config.Configuration{ + reader, err := NewMetricsReader(config.Configuration{ ServerURL: "http://" + listener.Addr().String(), ConnectTimeout: defaultTimeout, - }) + }, logger, tracer) require.NoError(t, err) minStep, err := reader.GetMinStepDuration(context.Background(), ¶ms) @@ -121,18 +143,20 @@ func TestMetricsServerError(t *testing.T) { defer mockPrometheus.Close() logger := zap.NewNop() + tracer, exp, closer := tracerProvider(t) + defer closer() address := mockPrometheus.Listener.Addr().String() - reader, err := NewMetricsReader(logger, config.Configuration{ + reader, err := NewMetricsReader(config.Configuration{ ServerURL: "http://" + address, ConnectTimeout: defaultTimeout, - }) + }, logger, tracer) require.NoError(t, err) - m, err := reader.GetCallRates(context.Background(), ¶ms) assert.NotNil(t, m) - require.Error(t, err) assert.Contains(t, err.Error(), "failed executing metrics query") + require.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported") + assert.Equal(t, codes.Error, exp.GetSpans()[0].Status.Code) } func TestGetLatencies(t *testing.T) { @@ -221,16 +245,19 @@ func TestGetLatencies(t *testing.T) { BaseQueryParameters: buildTestBaseQueryParametersFrom(tc), Quantile: 0.95, } + tracer, exp, closer := tracerProvider(t) + defer closer() cfg := defaultConfig if tc.updateConfig != nil { cfg = tc.updateConfig(cfg) } - reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil) + reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer) defer mockPrometheus.Close() m, err := reader.GetLatencies(context.Background(), ¶ms) require.NoError(t, err) assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription) + assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported") }) } } @@ -318,16 +345,19 @@ func TestGetCallRates(t *testing.T) { params := metricsstore.CallRateQueryParameters{ BaseQueryParameters: buildTestBaseQueryParametersFrom(tc), } + tracer, exp, closer := tracerProvider(t) + defer closer() cfg := defaultConfig if tc.updateConfig != nil { cfg = tc.updateConfig(cfg) } - reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil) + reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer) defer mockPrometheus.Close() m, err := reader.GetCallRates(context.Background(), ¶ms) require.NoError(t, err) assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription) + assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported") }) } } @@ -440,16 +470,19 @@ func TestGetErrorRates(t *testing.T) { params := metricsstore.ErrorRateQueryParameters{ BaseQueryParameters: buildTestBaseQueryParametersFrom(tc), } + tracer, exp, closer := tracerProvider(t) + defer closer() cfg := defaultConfig if tc.updateConfig != nil { cfg = tc.updateConfig(cfg) } - reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil) + reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer) defer mockPrometheus.Close() m, err := reader.GetErrorRates(context.Background(), ¶ms) require.NoError(t, err) assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription) + assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported") }) } } @@ -460,24 +493,29 @@ func TestInvalidLatencyUnit(t *testing.T) { t.Errorf("Expected a panic due to invalid latency unit") } }() + tracer, _, closer := tracerProvider(t) + defer closer() cfg := config.Configuration{ SupportSpanmetricsConnector: true, NormalizeDuration: true, LatencyUnit: "something invalid", } - _, _ = NewMetricsReader(zap.NewNop(), cfg) + _, _ = NewMetricsReader(cfg, zap.NewNop(), tracer) } func TestWarningResponse(t *testing.T) { params := metricsstore.ErrorRateQueryParameters{ BaseQueryParameters: buildTestBaseQueryParametersFrom(metricsTestCase{serviceNames: []string{"foo"}}), } - reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"}) + tracer, exp, closer := tracerProvider(t) + defer closer() + reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"}, tracer) defer mockPrometheus.Close() m, err := reader.GetErrorRates(context.Background(), ¶ms) require.NoError(t, err) assert.NotNil(t, m) + assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported") } func TestGetRoundTripperTLSConfig(t *testing.T) { @@ -571,14 +609,16 @@ func TestGetRoundTripperTokenError(t *testing.T) { func TestInvalidCertFile(t *testing.T) { logger := zap.NewNop() - reader, err := NewMetricsReader(logger, config.Configuration{ + tracer, _, closer := tracerProvider(t) + defer closer() + reader, err := NewMetricsReader(config.Configuration{ ServerURL: "https://localhost:1234", ConnectTimeout: defaultTimeout, TLS: tlscfg.Options{ Enabled: true, CAPath: "foo", }, - }) + }, logger, tracer) require.Error(t, err) assert.Nil(t, reader) } @@ -635,7 +675,7 @@ func buildTestBaseQueryParametersFrom(tc metricsTestCase) metricsstore.BaseQuery } } -func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string) (metricsstore.Reader, *httptest.Server) { +func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string, tracer trace.TracerProvider) (metricsstore.Reader, *httptest.Server) { mockPrometheus := startMockPrometheusServer(t, wantPromQlQuery, wantWarnings) logger := zap.NewNop() @@ -644,7 +684,7 @@ func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wa config.ServerURL = "http://" + address config.ConnectTimeout = defaultTimeout - reader, err := NewMetricsReader(logger, config) + reader, err := NewMetricsReader(config, logger, tracer) require.NoError(t, err) return reader, mockPrometheus