diff --git a/.chloggen/ddog-exporter-fix-multi-resc-stats.yaml b/.chloggen/ddog-exporter-fix-multi-resc-stats.yaml new file mode 100755 index 0000000000000..a55c767e4c5ee --- /dev/null +++ b/.chloggen/ddog-exporter-fix-multi-resc-stats.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: datadogexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix bug where multiple resources would cause datadogexporter to send extraneous additional stats buckets. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31173] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ["user"] diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index 058fb504afc27..1cc2b6c019cd9 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -12,6 +12,8 @@ import ( pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" "github.com/DataDog/datadog-agent/pkg/trace/agent" + "github.com/DataDog/datadog-agent/pkg/trace/telemetry" + "github.com/DataDog/datadog-agent/pkg/trace/writer" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source" @@ -237,7 +239,7 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config { return cfg } -func (f *factory) consumeStatsPayload(ctx context.Context, out chan []byte, traceagent *agent.Agent, tracerVersion string, logger *zap.Logger) { +func (f *factory) consumeStatsPayload(ctx context.Context, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, tracerVersion string, logger *zap.Logger) { for i := 0; i < runtime.NumCPU(); i++ { f.wg.Add(1) go func() { @@ -246,7 +248,7 @@ func (f *factory) consumeStatsPayload(ctx context.Context, out chan []byte, trac select { case <-ctx.Done(): return - case msg := <-out: + case msg := <-statsIn: sp := &pb.StatsPayload{} err := proto.Unmarshal(msg, sp) @@ -254,9 +256,12 @@ func (f *factory) consumeStatsPayload(ctx context.Context, out chan []byte, trac logger.Error("failed to unmarshal stats payload", zap.Error(err)) continue } - for _, sc := range sp.Stats { - traceagent.ProcessStats(sc, "", tracerVersion) + for _, csp := range sp.Stats { + if csp.TracerVersion == "" { + csp.TracerVersion = tracerVersion + } } + statsToAgent <- sp } } }() @@ -278,16 +283,22 @@ func (f *factory) createMetricsExporter( ctx, cancel := context.WithCancel(ctx) // cancel() runs on shutdown var pushMetricsFn consumer.ConsumeMetricsFunc - traceagent, err := f.TraceAgent(ctx, set, cfg, hostProvider) + acfg, err := newTraceAgentConfig(ctx, set, cfg, hostProvider) if err != nil { cancel() - return nil, fmt.Errorf("failed to start trace-agent: %w", err) + return nil, err } - var statsOut chan []byte + statsToAgent := make(chan *pb.StatsPayload) + statsWriter := writer.NewStatsWriter(acfg, statsToAgent, telemetry.NewNoopCollector()) + + set.Logger.Debug("Starting Datadog Trace-Agent StatsWriter") + go statsWriter.Run() + + var statsIn chan []byte if datadog.ConnectorPerformanceFeatureGate.IsEnabled() { - statsOut = make(chan []byte, 1000) + statsIn = make(chan []byte, 1000) statsv := set.BuildInfo.Command + set.BuildInfo.Version - f.consumeStatsPayload(ctx, statsOut, traceagent, statsv, set.Logger) + f.consumeStatsPayload(ctx, statsIn, statsToAgent, statsv, set.Logger) } pcfg := newMetadataConfigfromConfig(cfg) metadataReporter, err := f.Reporter(set, pcfg) @@ -321,7 +332,7 @@ func (f *factory) createMetricsExporter( return nil } } else { - exp, metricsErr := newMetricsExporter(ctx, set, cfg, &f.onceMetadata, attrsTranslator, hostProvider, traceagent, metadataReporter, statsOut) + exp, metricsErr := newMetricsExporter(ctx, set, cfg, acfg, &f.onceMetadata, attrsTranslator, hostProvider, statsToAgent, metadataReporter, statsIn) if metricsErr != nil { cancel() // first cancel context f.wg.Wait() // then wait for shutdown @@ -345,8 +356,12 @@ func (f *factory) createMetricsExporter( exporterhelper.WithShutdown(func(context.Context) error { cancel() f.StopReporter() - if statsOut != nil { - close(statsOut) + statsWriter.Stop() + if statsIn != nil { + close(statsIn) + } + if statsToAgent != nil { + close(statsToAgent) } return nil }), diff --git a/exporter/datadogexporter/integrationtest/integration_test.go b/exporter/datadogexporter/integrationtest/integration_test.go index e10f8bc399f92..aea78582cc388 100644 --- a/exporter/datadogexporter/integrationtest/integration_test.go +++ b/exporter/datadogexporter/integrationtest/integration_test.go @@ -34,6 +34,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/protobuf/proto" @@ -104,7 +105,7 @@ func TestIntegration(t *testing.T) { for _, chunks := range tps.Chunks { spans = append(spans, chunks.Spans...) for _, span := range chunks.Spans { - assert.Equal(t, span.Meta["_dd.stats_computed"], "true") + assert.Equal(t, "true", span.Meta["_dd.stats_computed"]) } } } @@ -118,8 +119,8 @@ func TestIntegration(t *testing.T) { stats = append(stats, csbs.Stats...) for _, stat := range csbs.Stats { assert.True(t, strings.HasPrefix(stat.Resource, "TestSpan")) - assert.Equal(t, stat.Hits, uint64(1)) - assert.Equal(t, stat.TopLevelHits, uint64(1)) + assert.Equal(t, uint64(1), stat.Hits) + assert.Equal(t, uint64(1), stat.TopLevelHits) } } } @@ -279,18 +280,34 @@ func sendTraces(t *testing.T) { traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure()) require.NoError(t, err) bsp := sdktrace.NewBatchSpanProcessor(traceExporter) + r1, _ := resource.New(ctx, resource.WithAttributes(attribute.String("k8s.node.name", "aaaa"))) + r2, _ := resource.New(ctx, resource.WithAttributes(attribute.String("k8s.node.name", "bbbb"))) tracerProvider := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), sdktrace.WithSpanProcessor(bsp), + sdktrace.WithResource(r1), + ) + tracerProvider2 := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSpanProcessor(bsp), + sdktrace.WithResource(r2), ) otel.SetTracerProvider(tracerProvider) defer func() { require.NoError(t, tracerProvider.Shutdown(ctx)) + require.NoError(t, tracerProvider2.Shutdown(ctx)) }() tracer := otel.Tracer("test-tracer") for i := 0; i < 10; i++ { _, span := tracer.Start(ctx, fmt.Sprintf("TestSpan%d", i)) + + if i == 3 { + // Send some traces from a different resource + // This verifies that stats from different hosts don't accidentally create extraneous empty stats buckets + otel.SetTracerProvider(tracerProvider2) + tracer = otel.Tracer("test-tracer2") + } // Only sample 5 out of the 10 spans if i < 5 { span.SetAttributes(attribute.Bool("sampled", true)) diff --git a/exporter/datadogexporter/metrics_exporter.go b/exporter/datadogexporter/metrics_exporter.go index 1f7958963b008..2d39f7f8675a8 100644 --- a/exporter/datadogexporter/metrics_exporter.go +++ b/exporter/datadogexporter/metrics_exporter.go @@ -13,7 +13,7 @@ import ( "time" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" - "github.com/DataDog/datadog-agent/pkg/trace/api" + "github.com/DataDog/datadog-agent/pkg/trace/config" "github.com/DataDog/datadog-api-client-go/v2/api/datadogV2" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes" @@ -37,6 +37,7 @@ import ( type metricsExporter struct { params exporter.CreateSettings cfg *Config + agntConfig *config.AgentConfig ctx context.Context client *zorkian.Client metricsAPI *datadogV2.MetricsApi @@ -48,8 +49,8 @@ type metricsExporter struct { metadataReporter *inframetadata.Reporter // getPushTime returns a Unix time in nanoseconds, representing the time pushing metrics. // It will be overwritten in tests. - getPushTime func() uint64 - apmStatsProcessor api.StatsProcessor + getPushTime func() uint64 + statsToAgent chan<- *pb.StatsPayload } // translatorFromConfig creates a new metrics translator from the exporter @@ -95,10 +96,11 @@ func newMetricsExporter( ctx context.Context, params exporter.CreateSettings, cfg *Config, + agntConfig *config.AgentConfig, onceMetadata *sync.Once, attrsTranslator *attributes.Translator, sourceProvider source.Provider, - apmStatsProcessor api.StatsProcessor, + statsToAgent chan<- *pb.StatsPayload, metadataReporter *inframetadata.Reporter, statsOut chan []byte, ) (*metricsExporter, error) { @@ -109,17 +111,18 @@ func newMetricsExporter( scrubber := scrub.NewScrubber() exporter := &metricsExporter{ - params: params, - cfg: cfg, - ctx: ctx, - tr: tr, - scrubber: scrubber, - retrier: clientutil.NewRetrier(params.Logger, cfg.BackOffConfig, scrubber), - onceMetadata: onceMetadata, - sourceProvider: sourceProvider, - getPushTime: func() uint64 { return uint64(time.Now().UTC().UnixNano()) }, - apmStatsProcessor: apmStatsProcessor, - metadataReporter: metadataReporter, + params: params, + cfg: cfg, + ctx: ctx, + agntConfig: agntConfig, + tr: tr, + scrubber: scrubber, + retrier: clientutil.NewRetrier(params.Logger, cfg.BackOffConfig, scrubber), + onceMetadata: onceMetadata, + sourceProvider: sourceProvider, + getPushTime: func() uint64 { return uint64(time.Now().UTC().UnixNano()) }, + statsToAgent: statsToAgent, + metadataReporter: metadataReporter, } errchan := make(chan error) if isMetricExportV2Enabled() { @@ -260,8 +263,18 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pmetric.Metr if len(sp) > 0 { exp.params.Logger.Debug("exporting APM stats payloads", zap.Any("stats_payloads", sp)) statsv := exp.params.BuildInfo.Command + exp.params.BuildInfo.Version - for _, p := range sp { - exp.apmStatsProcessor.ProcessStats(p, "", statsv) + for _, csp := range sp { + if csp.TracerVersion == "" { + csp.TracerVersion = statsv + } + } + exp.statsToAgent <- &pb.StatsPayload{ + AgentHostname: exp.agntConfig.Hostname, // This is "dead-code". We will be removing this code path entirely + AgentEnv: exp.agntConfig.DefaultEnv, + Stats: sp, + AgentVersion: exp.agntConfig.AgentVersion, + ClientComputed: false, + SplitPayload: false, } } diff --git a/exporter/datadogexporter/metrics_exporter_test.go b/exporter/datadogexporter/metrics_exporter_test.go index 55eb3e71b1c32..bb0a6a685aa53 100644 --- a/exporter/datadogexporter/metrics_exporter_test.go +++ b/exporter/datadogexporter/metrics_exporter_test.go @@ -16,6 +16,7 @@ import ( "github.com/DataDog/agent-payload/v5/gogen" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" + traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config" "github.com/DataDog/datadog-api-client-go/v2/api/datadogV2" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata/payload" @@ -297,23 +298,24 @@ func Test_metricsExporter_PushMetricsData(t *testing.T) { defer server.Close() var ( - once sync.Once - statsRecorder testutil.MockStatsProcessor + once sync.Once ) - + statsToAgent := make(chan *pb.StatsPayload, 1000) // Buffer the channel to allow test to pass without go-routines pusher := newTestPusher(t) reporter, err := inframetadata.NewReporter(zap.NewNop(), pusher, 1*time.Second) require.NoError(t, err) attributesTranslator, err := attributes.NewTranslator(componenttest.NewNopTelemetrySettings()) require.NoError(t, err) + acfg := traceconfig.New() exp, err := newMetricsExporter( context.Background(), exportertest.NewNopCreateSettings(), newTestConfig(t, server.URL, tt.hostTags, tt.histogramMode), + acfg, &once, attributesTranslator, &testutil.MockSourceProvider{Src: tt.source}, - &statsRecorder, + statsToAgent, reporter, nil, ) @@ -357,10 +359,18 @@ func Test_metricsExporter_PushMetricsData(t *testing.T) { assert.NoError(t, err) assert.Equal(t, expected, sketchRecorder.ByteBody) } - if tt.expectedStats == nil { - assert.Len(t, statsRecorder.In, 0) - } else { - assert.ElementsMatch(t, statsRecorder.In, tt.expectedStats) + if tt.expectedStats != nil { + var actualStats []*pb.ClientStatsPayload + pullStats: + for len(actualStats) < len(tt.expectedStats) { + select { + case <-time.After(10 * time.Second): + break pullStats + case sp := <-statsToAgent: + actualStats = append(actualStats, sp.Stats...) + } + } + assert.ElementsMatch(t, actualStats, tt.expectedStats) } }) } @@ -690,22 +700,24 @@ func Test_metricsExporter_PushMetricsData_Zorkian(t *testing.T) { defer server.Close() var ( - once sync.Once - statsRecorder testutil.MockStatsProcessor + once sync.Once ) + statsToAgent := make(chan *pb.StatsPayload, 1000) pusher := newTestPusher(t) reporter, err := inframetadata.NewReporter(zap.NewNop(), pusher, 1*time.Second) require.NoError(t, err) attributesTranslator, err := attributes.NewTranslator(componenttest.NewNopTelemetrySettings()) require.NoError(t, err) + acfg := traceconfig.New() exp, err := newMetricsExporter( context.Background(), exportertest.NewNopCreateSettings(), newTestConfig(t, server.URL, tt.hostTags, tt.histogramMode), + acfg, &once, attributesTranslator, &testutil.MockSourceProvider{Src: tt.source}, - &statsRecorder, + statsToAgent, reporter, nil, ) @@ -744,10 +756,18 @@ func Test_metricsExporter_PushMetricsData_Zorkian(t *testing.T) { assert.NoError(t, err) assert.Equal(t, expected, sketchRecorder.ByteBody) } - if tt.expectedStats == nil { - assert.Len(t, statsRecorder.In, 0) - } else { - assert.ElementsMatch(t, statsRecorder.In, tt.expectedStats) + if tt.expectedStats != nil { + var actualStats []*pb.ClientStatsPayload + pullStats: + for len(actualStats) < len(tt.expectedStats) { + select { + case <-time.After(10 * time.Second): + break pullStats + case sp := <-statsToAgent: + actualStats = append(actualStats, sp.Stats...) + } + } + assert.ElementsMatch(t, actualStats, tt.expectedStats) } }) } diff --git a/exporter/datadogexporter/traces_exporter.go b/exporter/datadogexporter/traces_exporter.go index dd9eaff0d619d..92ff1bbb7bd30 100644 --- a/exporter/datadogexporter/traces_exporter.go +++ b/exporter/datadogexporter/traces_exporter.go @@ -182,6 +182,14 @@ func (exp *traceExporter) exportUsageMetrics(ctx context.Context, hosts map[stri } func newTraceAgent(ctx context.Context, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider) (*agent.Agent, error) { + acfg, err := newTraceAgentConfig(ctx, params, cfg, sourceProvider) + if err != nil { + return nil, err + } + return agent.NewAgent(ctx, acfg, telemetry.NewNoopCollector()), nil +} + +func newTraceAgentConfig(ctx context.Context, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider) (*traceconfig.AgentConfig, error) { acfg := traceconfig.New() src, err := sourceProvider.Source(ctx) if err != nil { @@ -210,6 +218,6 @@ func newTraceAgent(ctx context.Context, params exporter.CreateSettings, cfg *Con if addr := cfg.Traces.Endpoint; addr != "" { acfg.Endpoints[0].Host = addr } - tracelog.SetLogger(&zaplogger{params.Logger}) - return agent.NewAgent(ctx, acfg, telemetry.NewNoopCollector()), nil + tracelog.SetLogger(&zaplogger{params.Logger}) //TODO: This shouldn't be a singleton + return acfg, nil }