Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OTEL tracer to metricsstore reader component #4595

Merged
merged 13 commits into from Jul 29, 2023
7 changes: 5 additions & 2 deletions pkg/jtracer/jtracer.go
Expand Up @@ -61,7 +61,7 @@ func New(serviceName string) (*JTracer, error) {
}

func NoOp() *JTracer {
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider()}
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider(), closer: nil}
}

// initOTEL initializes OTEL Tracer
Expand Down Expand Up @@ -107,5 +107,8 @@ func otelExporter(ctx context.Context) (sdktrace.SpanExporter, error) {

// Shutdown the tracerProvider to clean up resources
func (jt *JTracer) Close(ctx context.Context) error {
return jt.closer(ctx)
if jt.closer != nil {
return jt.closer(ctx)
}
return nil
}
6 changes: 5 additions & 1 deletion plugin/metrics/prometheus/factory.go
Expand Up @@ -18,6 +18,8 @@ import (
"flag"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin"
Expand All @@ -31,11 +33,13 @@ var _ plugin.Configurable = (*Factory)(nil)
type Factory struct {
options *Options
logger *zap.Logger
tracer trace.TracerProvider
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
options: NewOptions("prometheus"),
}
}
Expand All @@ -60,5 +64,5 @@ func (f *Factory) Initialize(logger *zap.Logger) error {

// CreateMetricsReader implements storage.MetricsFactory.
func (f *Factory) CreateMetricsReader() (metricsstore.Reader, error) {
return prometheusstore.NewMetricsReader(f.logger, f.options.Primary.Configuration)
return prometheusstore.NewMetricsReader(f.options.Primary.Configuration, f.logger, f.tracer)
}
33 changes: 18 additions & 15 deletions plugin/metrics/prometheus/metricsstore/reader.go
Expand Up @@ -26,11 +26,11 @@ import (
"time"
"unicode"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/api"
promapi "github.com/prometheus/client_golang/api/prometheus/v1"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand All @@ -49,6 +49,7 @@ type (
MetricsReader struct {
client promapi.API
logger *zap.Logger
tracer trace.Tracer

metricsTranslator dbmodel.Translator
latencyMetricName string
Expand All @@ -73,7 +74,7 @@ type (
)

// NewMetricsReader returns a new MetricsReader.
func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsReader, error) {
func NewMetricsReader(cfg config.Configuration, logger *zap.Logger, tracer trace.TracerProvider) (*MetricsReader, error) {
logger.Info("Creating metrics reader", zap.Any("configuration", cfg))

roundTripper, err := getHTTPRoundTripper(&cfg, logger)
Expand All @@ -96,6 +97,7 @@ func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsRea
mr := &MetricsReader{
client: promapi.NewAPI(client),
logger: logger,
tracer: tracer.Tracer("prom-metrics-reader"),

metricsTranslator: dbmodel.New(operationLabel),
callsMetricName: buildFullCallsMetricName(cfg),
Expand Down Expand Up @@ -224,8 +226,8 @@ func (m MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) (
}
promQuery := m.buildPromQuery(p)

span, ctx := startSpanForQuery(ctx, p.metricName, promQuery)
defer span.Finish()
ctx, span := startSpanForQuery(ctx, p.metricName, promQuery, m.tracer)
defer span.End()

queryRange := promapi.Range{
Start: p.EndTime.Add(-1 * *p.Lookback),
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -287,17 +289,18 @@ func promqlDurationString(d *time.Duration) string {
return string(b)
}

func startSpanForQuery(ctx context.Context, metricName, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, metricName)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "prometheus")
ottag.Component.Set(span, "promql")
return span, ctx
func startSpanForQuery(ctx context.Context, metricName, query string, tp trace.Tracer) (context.Context, trace.Span) {
ctx, span := tp.Start(ctx, metricName)
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("prometheus"),
attribute.Key("component").String("promql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
func logErrorToSpan(span trace.Span, err error) {
span.RecordError(err, trace.WithAttributes(semconv.OTelStatusCodeError))
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
}

func getHTTPRoundTripper(c *config.Configuration, logger *zap.Logger) (rt http.RoundTripper, err error) {
Expand Down
85 changes: 67 additions & 18 deletions plugin/metrics/prometheus/metricsstore/reader_test.go
Expand Up @@ -29,6 +29,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand Down Expand Up @@ -61,43 +64,64 @@ var defaultConfig = config.Configuration{
LatencyUnit: "ms",
}

func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter, func(ctx context.Context) error) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
)
closer := func(ctx context.Context) error {
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
return tp.Shutdown(ctx)
}
return tp, exporter, closer
}

func TestNewMetricsReaderValidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://localhost:1234",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.NoError(t, err)
assert.NotNil(t, reader)
err = closer(context.Background())
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
}

func TestNewMetricsReaderInvalidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "\n",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to initialize prometheus client")
assert.Nil(t, reader)
err = closer(context.Background())
require.NoError(t, err)
}

func TestGetMinStepDuration(t *testing.T) {
params := metricsstore.MinStepDurationQueryParameters{}
logger := zap.NewNop()
tracer, _, closer := tracerProvider(t)
listener, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)
assert.NotNil(t, listener)

reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + listener.Addr().String(),
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

minStep, err := reader.GetMinStepDuration(context.Background(), &params)
require.NoError(t, err)
assert.Equal(t, time.Millisecond, minStep)
err = closer(context.Background())
require.NoError(t, err)
}

func TestMetricsServerError(t *testing.T) {
Expand All @@ -121,18 +145,21 @@ func TestMetricsServerError(t *testing.T) {
defer mockPrometheus.Close()

logger := zap.NewNop()
tracer, exp, closer := tracerProvider(t)
address := mockPrometheus.Listener.Addr().String()
reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + address,
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
require.NoError(t, err)

m, err := reader.GetCallRates(context.Background(), &params)
assert.NotNil(t, m)

assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
require.Error(t, err)
assert.Contains(t, err.Error(), "failed executing metrics query")
err = closer(context.Background())
require.NoError(t, err)
}

func TestGetLatencies(t *testing.T) {
Expand Down Expand Up @@ -221,16 +248,20 @@ func TestGetLatencies(t *testing.T) {
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
Quantile: 0.95,
}
tracer, exp, closer := tracerProvider(t)
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetLatencies(context.Background(), &params)
require.NoError(t, err)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
err = closer(context.Background())
require.NoError(t, err)
})
}
}
Expand Down Expand Up @@ -318,16 +349,20 @@ func TestGetCallRates(t *testing.T) {
params := metricsstore.CallRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
}
tracer, exp, closer := tracerProvider(t)
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetCallRates(context.Background(), &params)
require.NoError(t, err)
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
err = closer(context.Background())
require.NoError(t, err)
})
}
}
Expand Down Expand Up @@ -440,16 +475,20 @@ func TestGetErrorRates(t *testing.T) {
params := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(tc),
}
tracer, exp, closer := tracerProvider(t)
cfg := defaultConfig
if tc.updateConfig != nil {
cfg = tc.updateConfig(cfg)
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, cfg, tc.wantPromQlQuery, nil, tracer)
defer mockPrometheus.Close()

m, err := reader.GetErrorRates(context.Background(), &params)
require.NoError(t, err)
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
assertMetrics(t, m, tc.wantLabels, tc.wantName, tc.wantDescription)
err = closer(context.Background())
require.NoError(t, err)
})
}
}
Expand All @@ -460,24 +499,31 @@ func TestInvalidLatencyUnit(t *testing.T) {
t.Errorf("Expected a panic due to invalid latency unit")
}
}()
tracer, _, closer := tracerProvider(t)
cfg := config.Configuration{
SupportSpanmetricsConnector: true,
NormalizeDuration: true,
LatencyUnit: "something invalid",
}
_, _ = NewMetricsReader(zap.NewNop(), cfg)
_, _ = NewMetricsReader(cfg, zap.NewNop(), tracer)
err := closer(context.Background())
require.NoError(t, err)
}

func TestWarningResponse(t *testing.T) {
params := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: buildTestBaseQueryParametersFrom(metricsTestCase{serviceNames: []string{"foo"}}),
}
reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"})
tracer, exp, closer := tracerProvider(t)
reader, mockPrometheus := prepareMetricsReaderAndServer(t, config.Configuration{}, "", []string{"warning0", "warning1"}, tracer)
defer mockPrometheus.Close()

m, err := reader.GetErrorRates(context.Background(), &params)
require.NoError(t, err)
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
assert.NotNil(t, m)
err = closer(context.Background())
require.NoError(t, err)
}

func TestGetRoundTripperTLSConfig(t *testing.T) {
Expand Down Expand Up @@ -571,16 +617,19 @@ func TestGetRoundTripperTokenError(t *testing.T) {

func TestInvalidCertFile(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, _, closer := tracerProvider(t)
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "https://localhost:1234",
ConnectTimeout: defaultTimeout,
TLS: tlscfg.Options{
Enabled: true,
CAPath: "foo",
},
})
}, logger, tracer)
require.Error(t, err)
assert.Nil(t, reader)
err = closer(context.Background())
require.NoError(t, err)
}

func startMockPrometheusServer(t *testing.T, wantPromQlQuery string, wantWarnings []string) *httptest.Server {
Expand Down Expand Up @@ -635,7 +684,7 @@ func buildTestBaseQueryParametersFrom(tc metricsTestCase) metricsstore.BaseQuery
}
}

func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string) (metricsstore.Reader, *httptest.Server) {
func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wantPromQlQuery string, wantWarnings []string, tracer trace.TracerProvider) (metricsstore.Reader, *httptest.Server) {
mockPrometheus := startMockPrometheusServer(t, wantPromQlQuery, wantWarnings)

logger := zap.NewNop()
Expand All @@ -644,7 +693,7 @@ func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wa
config.ServerURL = "http://" + address
config.ConnectTimeout = defaultTimeout

reader, err := NewMetricsReader(logger, config)
reader, err := NewMetricsReader(config, logger, tracer)
require.NoError(t, err)

return reader, mockPrometheus
Expand Down