Skip to content

Commit

Permalink
Replace metricsstore/reader tracing instrumentation with OpenTelemetry (
Browse files Browse the repository at this point in the history
#4595)

## Which problem is this PR solving?
* Part of #3381 
* This PR adds OTEL Provider to metrics reader component

## Short description of the changes
- Replaces OT tracer with OTEL tracer to support jtracer pkg

---------

Signed-off-by: Afzal Ansari <afzal442@gmail.com>
  • Loading branch information
afzal442 committed Jul 29, 2023
1 parent b744534 commit 287562c
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 38 deletions.
7 changes: 5 additions & 2 deletions pkg/jtracer/jtracer.go
Expand Up @@ -61,7 +61,7 @@ func New(serviceName string) (*JTracer, error) {
}

func NoOp() *JTracer {
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider()}
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider(), closer: nil}
}

// initOTEL initializes OTEL Tracer
Expand Down Expand Up @@ -107,5 +107,8 @@ func otelExporter(ctx context.Context) (sdktrace.SpanExporter, error) {

// Shutdown the tracerProvider to clean up resources
func (jt *JTracer) Close(ctx context.Context) error {
return jt.closer(ctx)
if jt.closer != nil {
return jt.closer(ctx)
}
return nil
}
6 changes: 5 additions & 1 deletion plugin/metrics/prometheus/factory.go
Expand Up @@ -18,6 +18,8 @@ import (
"flag"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin"
Expand All @@ -31,11 +33,13 @@ var _ plugin.Configurable = (*Factory)(nil)
type Factory struct {
options *Options
logger *zap.Logger
tracer trace.TracerProvider
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
options: NewOptions("prometheus"),
}
}
Expand All @@ -60,5 +64,5 @@ func (f *Factory) Initialize(logger *zap.Logger) error {

// CreateMetricsReader implements storage.MetricsFactory.
func (f *Factory) CreateMetricsReader() (metricsstore.Reader, error) {
return prometheusstore.NewMetricsReader(f.logger, f.options.Primary.Configuration)
return prometheusstore.NewMetricsReader(f.options.Primary.Configuration, f.logger, f.tracer)
}
38 changes: 22 additions & 16 deletions plugin/metrics/prometheus/metricsstore/reader.go
Expand Up @@ -26,11 +26,12 @@ import (
"time"
"unicode"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/api"
promapi "github.com/prometheus/client_golang/api/prometheus/v1"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand All @@ -49,6 +50,7 @@ type (
MetricsReader struct {
client promapi.API
logger *zap.Logger
tracer trace.Tracer

metricsTranslator dbmodel.Translator
latencyMetricName string
Expand All @@ -73,7 +75,7 @@ type (
)

// NewMetricsReader returns a new MetricsReader.
func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsReader, error) {
func NewMetricsReader(cfg config.Configuration, logger *zap.Logger, tracer trace.TracerProvider) (*MetricsReader, error) {
logger.Info("Creating metrics reader", zap.Any("configuration", cfg))

roundTripper, err := getHTTPRoundTripper(&cfg, logger)
Expand All @@ -96,6 +98,7 @@ func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsRea
mr := &MetricsReader{
client: promapi.NewAPI(client),
logger: logger,
tracer: tracer.Tracer("prom-metrics-reader"),

metricsTranslator: dbmodel.New(operationLabel),
callsMetricName: buildFullCallsMetricName(cfg),
Expand Down Expand Up @@ -224,8 +227,8 @@ func (m MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) (
}
promQuery := m.buildPromQuery(p)

span, ctx := startSpanForQuery(ctx, p.metricName, promQuery)
defer span.Finish()
ctx, span := startSpanForQuery(ctx, p.metricName, promQuery, m.tracer)
defer span.End()

queryRange := promapi.Range{
Start: p.EndTime.Add(-1 * *p.Lookback),
Expand All @@ -235,8 +238,9 @@ func (m MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) (

mv, warnings, err := m.client.QueryRange(ctx, promQuery, queryRange)
if err != nil {
err = fmt.Errorf("failed executing metrics query: %w", err)
logErrorToSpan(span, err)
return &metrics.MetricFamily{}, fmt.Errorf("failed executing metrics query: %w", err)
return &metrics.MetricFamily{}, err
}
if len(warnings) > 0 {
m.logger.Warn("Warnings detected on Prometheus query", zap.Any("warnings", warnings), zap.String("query", promQuery), zap.Any("range", queryRange))
Expand Down Expand Up @@ -287,17 +291,19 @@ func promqlDurationString(d *time.Duration) string {
return string(b)
}

func startSpanForQuery(ctx context.Context, metricName, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, metricName)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "prometheus")
ottag.Component.Set(span, "promql")
return span, ctx
func startSpanForQuery(ctx context.Context, metricName, query string, tp trace.Tracer) (context.Context, trace.Span) {
ctx, span := tp.Start(ctx, metricName)
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("prometheus"),
attribute.Key("component").String("promql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
func logErrorToSpan(span trace.Span, err error) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

func getHTTPRoundTripper(c *config.Configuration, logger *zap.Logger) (rt http.RoundTripper, err error) {
Expand Down
78 changes: 59 additions & 19 deletions plugin/metrics/prometheus/metricsstore/reader_test.go
Expand Up @@ -29,6 +29,10 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand Down Expand Up @@ -61,22 +65,38 @@ var defaultConfig = config.Configuration{
LatencyUnit: "ms",
}

func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter, func()) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
)
closer := func() {
assert.NoError(t, tp.Shutdown(context.Background()))
}
return tp, exporter, closer
}

func TestNewMetricsReaderValidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
defer closer()
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://localhost:1234",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.NoError(t, err)
assert.NotNil(t, reader)
}

func TestNewMetricsReaderInvalidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
defer closer()
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "\n",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to initialize prometheus client")
assert.Nil(t, reader)
Expand All @@ -85,14 +105,16 @@ func TestNewMetricsReaderInvalidAddress(t *testing.T) {
func TestGetMinStepDuration(t *testing.T) {
params := metricsstore.MinStepDurationQueryParameters{}
logger := zap.NewNop()
tracer, _, closer := tracerProvider(t)
defer closer()
listener, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)
assert.NotNil(t, listener)

reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + listener.Addr().String(),
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.NoError(t, err)

minStep, err := reader.GetMinStepDuration(context.Background(), &params)
Expand Down Expand Up @@ -121,18 +143,20 @@ func TestMetricsServerError(t *testing.T) {
defer mockPrometheus.Close()

logger := zap.NewNop()
tracer, exp, closer := tracerProvider(t)
defer closer()
address := mockPrometheus.Listener.Addr().String()
reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + address,
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.NoError(t, err)

m, err := reader.GetCallRates(context.Background(), &params)
assert.NotNil(t, m)

require.Error(t, err)
assert.Contains(t, err.Error(), "failed executing metrics query")
require.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
assert.Equal(t, codes.Error, exp.GetSpans()[0].Status.Code)
}

func TestGetLatencies(t *testing.T) {
Expand Down Expand Up @@ -221,16 +245,19 @@ func TestGetLatencies(t *testing.T) {
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
Quantile: 0.95,
}
tracer, exp, closer := tracerProvider(t)
defer closer()
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetLatencies(context.Background(), &params)
require.NoError(t, err)
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
})
}
}
Expand Down Expand Up @@ -318,16 +345,19 @@ func TestGetCallRates(t *testing.T) {
params := metricsstore.CallRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
}
tracer, exp, closer := tracerProvider(t)
defer closer()
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetCallRates(context.Background(), &params)
require.NoError(t, err)
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
})
}
}
Expand Down Expand Up @@ -440,16 +470,19 @@ func TestGetErrorRates(t *testing.T) {
params := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
}
tracer, exp, closer := tracerProvider(t)
defer closer()
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetErrorRates(context.Background(), &params)
require.NoError(t, err)
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
})
}
}
Expand All @@ -460,24 +493,29 @@ func TestInvalidLatencyUnit(t *testing.T) {
t.Errorf("Expected a panic due to invalid latency unit")
}
}()
tracer, _, closer := tracerProvider(t)
defer closer()
cfg := config.Configuration{
SupportSpanmetricsConnector: true,
NormalizeDuration: true,
LatencyUnit: "something invalid",
}
_, _ = NewMetricsReader(zap.NewNop(), cfg)
_, _ = NewMetricsReader(cfg, zap.NewNop(), tracer)
}

func TestWarningResponse(t *testing.T) {
params := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(metricsTestCase{serviceNames: []string{"foo"}}),
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"})
tracer, exp, closer := tracerProvider(t)
defer closer()
reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"}, tracer)
defer mockPrometheus.Close()

m, err := reader.GetErrorRates(context.Background(), &params)
require.NoError(t, err)
assert.NotNil(t, m)
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
}

func TestGetRoundTripperTLSConfig(t *testing.T) {
Expand Down Expand Up @@ -571,14 +609,16 @@ func TestGetRoundTripperTokenError(t *testing.T) {

func TestInvalidCertFile(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
defer closer()
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "https://localhost:1234",
ConnectTimeout: defaultTimeout,
TLS: tlscfg.Options{
Enabled: true,
CAPath: "foo",
},
})
}, logger, tracer)
require.Error(t, err)
assert.Nil(t, reader)
}
Expand Down Expand Up @@ -635,7 +675,7 @@ func buildTestBaseQueryParametersFrom(tc metricsTestCase) metricsstore.BaseQuery
}
}

func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string) (metricsstore.Reader, *httptest.Server) {
func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string, tracer trace.TracerProvider) (metricsstore.Reader, *httptest.Server) {
mockPrometheus := startMockPrometheusServer(t, wantPromQlQuery, wantWarnings)

logger := zap.NewNop()
Expand All @@ -644,7 +684,7 @@ func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wa
config.ServerURL = "http://" + address
config.ConnectTimeout = defaultTimeout

reader, err := NewMetricsReader(logger, config)
reader, err := NewMetricsReader(config, logger, tracer)
require.NoError(t, err)

return reader, mockPrometheus
Expand Down

0 comments on commit 287562c

Please sign in to comment.