Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OTEL tracer to metricsstore reader component #4595

Merged
merged 13 commits into from Jul 29, 2023
7 changes: 5 additions & 2 deletions pkg/jtracer/jtracer.go
Expand Up @@ -61,7 +61,7 @@ func New(serviceName string) (*JTracer, error) {
}

func NoOp() *JTracer {
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider()}
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider(), closer: nil}
}

// initOTEL initializes OTEL Tracer
Expand Down Expand Up @@ -107,5 +107,8 @@ func otelExporter(ctx context.Context) (sdktrace.SpanExporter, error) {

// Shutdown the tracerProvider to clean up resources
func (jt *JTracer) Close(ctx context.Context) error {
return jt.closer(ctx)
if jt.closer != nil {
return jt.closer(ctx)
}
return nil
}
6 changes: 5 additions & 1 deletion plugin/metrics/prometheus/factory.go
Expand Up @@ -18,6 +18,8 @@ import (
"flag"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin"
Expand All @@ -31,11 +33,13 @@ var _ plugin.Configurable = (*Factory)(nil)
type Factory struct {
options *Options
logger *zap.Logger
tracer trace.TracerProvider
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
options: NewOptions("prometheus"),
}
}
Expand All @@ -60,5 +64,5 @@ func (f *Factory) Initialize(logger *zap.Logger) error {

// CreateMetricsReader implements storage.MetricsFactory.
func (f *Factory) CreateMetricsReader() (metricsstore.Reader, error) {
return prometheusstore.NewMetricsReader(f.logger, f.options.Primary.Configuration)
return prometheusstore.NewMetricsReader(f.options.Primary.Configuration, f.logger, f.tracer)
}
33 changes: 18 additions & 15 deletions plugin/metrics/prometheus/metricsstore/reader.go
Expand Up @@ -26,11 +26,11 @@ import (
"time"
"unicode"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/api"
promapi "github.com/prometheus/client_golang/api/prometheus/v1"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand All @@ -49,6 +49,7 @@ type (
MetricsReader struct {
client promapi.API
logger *zap.Logger
tracer trace.Tracer

metricsTranslator dbmodel.Translator
latencyMetricName string
Expand All @@ -73,7 +74,7 @@ type (
)

// NewMetricsReader returns a new MetricsReader.
func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsReader, error) {
func NewMetricsReader(cfg config.Configuration, logger *zap.Logger, tracer trace.TracerProvider) (*MetricsReader, error) {
logger.Info("Creating metrics reader", zap.Any("configuration", cfg))

roundTripper, err := getHTTPRoundTripper(&cfg, logger)
Expand All @@ -96,6 +97,7 @@ func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsRea
mr := &MetricsReader{
client: promapi.NewAPI(client),
logger: logger,
tracer: tracer.Tracer("prom-metrics-reader"),

metricsTranslator: dbmodel.New(operationLabel),
callsMetricName: buildFullCallsMetricName(cfg),
Expand Down Expand Up @@ -224,8 +226,8 @@ func (m MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) (
}
promQuery := m.buildPromQuery(p)

span, ctx := startSpanForQuery(ctx, p.metricName, promQuery)
defer span.Finish()
ctx, span := startSpanForQuery(ctx, p.metricName, promQuery, m.tracer)
defer span.End()

queryRange := promapi.Range{
Start: p.EndTime.Add(-1 * *p.Lookback),
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -287,17 +289,18 @@ func promqlDurationString(d *time.Duration) string {
return string(b)
}

func startSpanForQuery(ctx context.Context, metricName, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, metricName)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "prometheus")
ottag.Component.Set(span, "promql")
return span, ctx
func startSpanForQuery(ctx context.Context, metricName, query string, tp trace.Tracer) (context.Context, trace.Span) {
ctx, span := tp.Start(ctx, metricName)
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("prometheus"),
attribute.Key("component").String("promql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
func logErrorToSpan(span trace.Span, err error) {
span.RecordError(err, trace.WithAttributes(semconv.OTelStatusCodeError))
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
}

func getHTTPRoundTripper(c *config.Configuration, logger *zap.Logger) (rt http.RoundTripper, err error) {
Expand Down
80 changes: 62 additions & 18 deletions plugin/metrics/prometheus/metricsstore/reader_test.go
Expand Up @@ -29,6 +29,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand Down Expand Up @@ -61,22 +64,38 @@ var defaultConfig = config.Configuration{
LatencyUnit: "ms",
}

func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter, func()) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
)
closer := func() {
assert.NoError(t, tp.Shutdown(context.Background()))
}
return tp, exporter, closer
}

func TestNewMetricsReaderValidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://localhost:1234",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
defer closer()
require.NoError(t, err)
assert.NotNil(t, reader)
}

func TestNewMetricsReaderInvalidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "\n",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
defer closer()
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to initialize prometheus client")
assert.Nil(t, reader)
Expand All @@ -85,14 +104,16 @@ func TestNewMetricsReaderInvalidAddress(t *testing.T) {
func TestGetMinStepDuration(t *testing.T) {
params := metricsstore.MinStepDurationQueryParameters{}
logger := zap.NewNop()
tracer, _, closer := tracerProvider(t)
listener, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)
assert.NotNil(t, listener)

reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + listener.Addr().String(),
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
defer closer()
require.NoError(t, err)

minStep, err := reader.GetMinStepDuration(context.Background(), &params)
Expand Down Expand Up @@ -121,16 +142,19 @@ func TestMetricsServerError(t *testing.T) {
defer mockPrometheus.Close()

logger := zap.NewNop()
tracer, exp, closer := tracerProvider(t)
address := mockPrometheus.Listener.Addr().String()
reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + address,
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.NoError(t, err)

m, err := reader.GetCallRates(context.Background(), &params)
assert.NotEmpty(t, exp.GetSpans(), "Expected spans are recorded")
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
defer closer()
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
assert.NotNil(t, m)

require.Error(t, err)
assert.Contains(t, err.Error(), "failed executing metrics query")
}
Expand Down Expand Up @@ -221,14 +245,18 @@ func TestGetLatencies(t *testing.T) {
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
Quantile: 0.95,
}
tracer, exp, closer := tracerProvider(t)
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetLatencies(context.Background(), &params)
assert.NotEmpty(t, exp.GetSpans(), "Spans recorded during the test.")
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
defer closer()
require.NoError(t, err)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
})
Expand Down Expand Up @@ -318,14 +346,18 @@ func TestGetCallRates(t *testing.T) {
params := metricsstore.CallRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
}
tracer, exp, closer := tracerProvider(t)
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetCallRates(context.Background(), &params)
assert.NotEmpty(t, exp.GetSpans(), "Spans recorded during the test.")
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
defer closer()
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
})
Expand Down Expand Up @@ -440,14 +472,18 @@ func TestGetErrorRates(t *testing.T) {
params := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
}
tracer, exp, closer := tracerProvider(t)
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetErrorRates(context.Background(), &params)
assert.NotEmpty(t, exp.GetSpans(), "Spans recorded during the test.")
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
defer closer()
require.NoError(t, err)
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
})
Expand All @@ -460,23 +496,29 @@ func TestInvalidLatencyUnit(t *testing.T) {
t.Errorf("Expected a panic due to invalid latency unit")
}
}()
tracer, _, closer := tracerProvider(t)
cfg := config.Configuration{
SupportSpanmetricsConnector: true,
NormalizeDuration: true,
LatencyUnit: "something invalid",
}
_, _ = NewMetricsReader(zap.NewNop(), cfg)
_, _ = NewMetricsReader(cfg, zap.NewNop(), tracer)
defer closer()
}

func TestWarningResponse(t *testing.T) {
params := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(metricsTestCase{serviceNames: []string{"foo"}}),
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"})
tracer, exp, closer := tracerProvider(t)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"}, tracer)
defer mockPrometheus.Close()

m, err := reader.GetErrorRates(context.Background(), &params)
defer closer()
require.NoError(t, err)
assert.NotEmpty(t, exp.GetSpans(), "Spans recorded during the test.")
assert.Len(t, exp.GetSpans(), 1, "HTTP request was traced and span reported")
assert.NotNil(t, m)
}

Expand Down Expand Up @@ -571,14 +613,16 @@ func TestGetRoundTripperTokenError(t *testing.T) {

func TestInvalidCertFile(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "https://localhost:1234",
ConnectTimeout: defaultTimeout,
TLS: tlscfg.Options{
Enabled: true,
CAPath: "foo",
},
})
}, logger, tracer)
defer closer()
require.Error(t, err)
assert.Nil(t, reader)
}
Expand Down Expand Up @@ -635,7 +679,7 @@ func buildTestBaseQueryParametersFrom(tc metricsTestCase) metricsstore.BaseQuery
}
}

func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string) (metricsstore.Reader, *httptest.Server) {
func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string, tracer trace.TracerProvider) (metricsstore.Reader, *httptest.Server) {
mockPrometheus := startMockPrometheusServer(t, wantPromQlQuery, wantWarnings)

logger := zap.NewNop()
Expand All @@ -644,7 +688,7 @@ func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wa
config.ServerURL = "http://" + address
config.ConnectTimeout = defaultTimeout

reader, err := NewMetricsReader(logger, config)
reader, err := NewMetricsReader(config, logger, tracer)
require.NoError(t, err)

return reader, mockPrometheus
Expand Down