diff --git a/.chloggen/prw_failure_translate.yaml b/.chloggen/prw_failure_translate.yaml new file mode 100755 index 0000000000000..9188e8920b37e --- /dev/null +++ b/.chloggen/prw_failure_translate.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Publish telemetry about translation of metrics from Otel to Prometheus. Don't drop all data points if some fail translation. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29729] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 7833efac299df..51825e709c0ab 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -25,12 +25,35 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/multierr" + "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter/internal/metadata" prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" ) +type prwTelemetry interface { + recordTranslationFailure(ctx context.Context) + recordTranslatedTimeSeries(ctx context.Context, numTS int) +} + +type prwTelemetryOtel struct { + failedTranslations metric.Int64Counter + translatedTimeSeries metric.Int64Counter + otelAttrs []attribute.KeyValue +} + +func (p *prwTelemetryOtel) recordTranslationFailure(ctx context.Context) { + p.failedTranslations.Add(ctx, 1, metric.WithAttributes(p.otelAttrs...)) +} + +func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS int) { + p.translatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...)) +} + // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type prwExporter struct { endpointURL *url.URL @@ -45,6 +68,31 @@ type prwExporter struct { retrySettings configretry.BackOffConfig wal *prweWAL exporterSettings prometheusremotewrite.Settings + telemetry prwTelemetry +} + +func newPRWTelemetry(set exporter.CreateSettings) (prwTelemetry, error) { + + meter := metadata.Meter(set.TelemetrySettings) + // TODO: create helper functions similar to the processor helper: BuildCustomMetricName + prefix := "exporter/" + metadata.Type.String() + "/" + failedTranslations, errFailedTranslation := meter.Int64Counter(prefix+"failed_translations", + metric.WithDescription("Number of translation operations that failed to translate metrics from Otel to Prometheus"), + metric.WithUnit("1"), + ) + + translatedTimeSeries, errTranslatedMetrics := meter.Int64Counter(prefix+"translated_time_series", + metric.WithDescription("Number of Prometheus time series that were translated from OTel metrics"), + metric.WithUnit("1"), + ) + + return &prwTelemetryOtel{ + failedTranslations: failedTranslations, + translatedTimeSeries: translatedTimeSeries, + otelAttrs: []attribute.KeyValue{ + attribute.String("exporter", set.ID.String()), + }, + }, errors.Join(errFailedTranslation, errTranslatedMetrics) } // newPRWExporter initializes a new prwExporter instance and sets fields accordingly. @@ -59,6 +107,11 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err return nil, errors.New("invalid endpoint") } + prwTelemetry, err := newPRWTelemetry(set) + if err != nil { + return nil, err + } + userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version) prwe := &prwExporter{ @@ -79,6 +132,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err AddMetricSuffixes: cfg.AddMetricSuffixes, SendMetadata: cfg.SendMetadata, }, + telemetry: prwTelemetry, } prwe.wal = newWAL(cfg.WAL, prwe.export) @@ -128,15 +182,19 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) if err != nil { - err = consumererror.NewPermanent(err) + prwe.telemetry.recordTranslationFailure(ctx) + prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap))) } + prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap)) + var m []*prompb.MetricMetadata if prwe.exporterSettings.SendMetadata { m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes) } + // Call export even if a conversion error, since there may be points that were successfully converted. - return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m)) + return prwe.handleExport(ctx, tsMap, m) } } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index ca146d84ff5a4..ef2f0f195c144 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -372,6 +372,19 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error { return prwe.handleExport(context.Background(), testmap, nil) } +type mockPRWTelemetry struct { + failedTranslations int + translatedTimeSeries int +} + +func (m *mockPRWTelemetry) recordTranslationFailure(_ context.Context) { + m.failedTranslations++ +} + +func (m *mockPRWTelemetry) recordTranslatedTimeSeries(_ context.Context, numTs int) { + m.translatedTimeSeries += numTs +} + // Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as // expected func Test_PushMetrics(t *testing.T) { @@ -420,6 +433,11 @@ func Test_PushMetrics(t *testing.T) { emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary]) + // partial success (or partial failure) cases + + partialSuccess1 := getMetricsFromMetricList(validMetrics1[validSum], validMetrics2[validSum], + validMetrics1[validIntGauge], validMetrics2[validIntGauge], invalidMetrics[emptyGauge]) + // staleNaN cases staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram]) staleNaNEmptyHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNEmptyHistogram]) @@ -457,20 +475,23 @@ func Test_PushMetrics(t *testing.T) { } tests := []struct { - name string - metrics pmetric.Metrics - reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) - expectedTimeSeries int - httpResponseCode int - returnErr bool - isStaleMarker bool - skipForWAL bool + name string + metrics pmetric.Metrics + reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) + expectedTimeSeries int + httpResponseCode int + returnErr bool + isStaleMarker bool + skipForWAL bool + expectedFailedTranslations int }{ { - name: "invalid_type_case", - metrics: invalidTypeBatch, - httpResponseCode: http.StatusAccepted, - returnErr: true, + name: "invalid_type_case", + metrics: invalidTypeBatch, + httpResponseCode: http.StatusAccepted, + reqTestFunc: checkFunc, + expectedTimeSeries: 1, // the resource target metric. + expectedFailedTranslations: 1, }, { name: "intSum_case", @@ -567,32 +588,40 @@ func Test_PushMetrics(t *testing.T) { skipForWAL: true, }, { - name: "emptyGauge_case", - metrics: emptyDoubleGaugeBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - returnErr: true, + name: "emptyGauge_case", + metrics: emptyDoubleGaugeBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedFailedTranslations: 1, + }, + { + name: "emptyCumulativeSum_case", + metrics: emptyCumulativeSumBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedFailedTranslations: 1, }, { - name: "emptyCumulativeSum_case", - metrics: emptyCumulativeSumBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - returnErr: true, + name: "emptyCumulativeHistogram_case", + metrics: emptyCumulativeHistogramBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedFailedTranslations: 1, }, { - name: "emptyCumulativeHistogram_case", - metrics: emptyCumulativeHistogramBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - returnErr: true, + name: "emptySummary_case", + metrics: emptySummaryBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedFailedTranslations: 1, }, { - name: "emptySummary_case", - metrics: emptySummaryBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - returnErr: true, + name: "partialSuccess_case", + metrics: partialSuccess1, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedTimeSeries: 4, + expectedFailedTranslations: 1, }, { name: "staleNaNIntGauge_case", @@ -668,6 +697,7 @@ func Test_PushMetrics(t *testing.T) { } t.Run(tt.name, func(t *testing.T) { t.Parallel() + mockTelemetry := &mockPRWTelemetry{} server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if tt.reqTestFunc != nil { tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker) @@ -716,7 +746,10 @@ func Test_PushMetrics(t *testing.T) { } set := exportertest.NewNopCreateSettings() set.BuildInfo = buildInfo + prwe, nErr := newPRWExporter(cfg, set) + prwe.telemetry = mockTelemetry + require.NoError(t, nErr) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -729,6 +762,9 @@ func Test_PushMetrics(t *testing.T) { assert.Error(t, err) return } + + assert.Equal(t, tt.expectedFailedTranslations, mockTelemetry.failedTranslations) + assert.Equal(t, tt.expectedTimeSeries, mockTelemetry.translatedTimeSeries) assert.NoError(t, err) }) } diff --git a/exporter/prometheusremotewriteexporter/go.mod b/exporter/prometheusremotewriteexporter/go.mod index dab7359e2b611..c4ebe696213ed 100644 --- a/exporter/prometheusremotewriteexporter/go.mod +++ b/exporter/prometheusremotewriteexporter/go.mod @@ -23,6 +23,7 @@ require ( go.opentelemetry.io/collector/consumer v0.96.1-0.20240306115632-b2693620eff6 go.opentelemetry.io/collector/exporter v0.96.1-0.20240306115632-b2693620eff6 go.opentelemetry.io/collector/pdata v1.3.1-0.20240306115632-b2693620eff6 + go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/goleak v1.3.0 @@ -70,7 +71,6 @@ require ( go.opentelemetry.io/collector/receiver v0.96.1-0.20240306115632-b2693620eff6 // indirect go.opentelemetry.io/collector/semconv v0.96.1-0.20240306115632-b2693620eff6 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect