From ba0504d126256cd8cbd5dc4215cd63c3651529ba Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Fri, 9 Feb 2024 15:04:02 -0500 Subject: [PATCH 01/13] use statswriter, client aggregator is for tracer stats only --- exporter/datadogexporter/factory.go | 31 +++++++++++++-------- exporter/datadogexporter/traces_exporter.go | 12 ++++++-- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index 058fb504afc2..226ad670252c 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -12,6 +12,8 @@ import ( pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" "github.com/DataDog/datadog-agent/pkg/trace/agent" + "github.com/DataDog/datadog-agent/pkg/trace/telemetry" + "github.com/DataDog/datadog-agent/pkg/trace/writer" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source" @@ -237,7 +239,7 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config { return cfg } -func (f *factory) consumeStatsPayload(ctx context.Context, out chan []byte, traceagent *agent.Agent, tracerVersion string, logger *zap.Logger) { +func (f *factory) consumeStatsPayload(ctx context.Context, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, logger *zap.Logger) { for i := 0; i < runtime.NumCPU(); i++ { f.wg.Add(1) go func() { @@ -246,7 +248,7 @@ func (f *factory) consumeStatsPayload(ctx context.Context, out chan []byte, trac select { case <-ctx.Done(): return - case msg := <-out: + case msg := <-statsIn: sp := &pb.StatsPayload{} err := proto.Unmarshal(msg, sp) @@ -254,9 +256,7 @@ func (f *factory) consumeStatsPayload(ctx context.Context, out chan []byte, trac logger.Error("failed to unmarshal stats payload", zap.Error(err)) continue } - for _, sc := range sp.Stats { - traceagent.ProcessStats(sc, "", tracerVersion) - } + statsToAgent <- sp } } }() @@ -283,11 +283,18 @@ func (f *factory) createMetricsExporter( cancel() return nil, fmt.Errorf("failed to start trace-agent: %w", err) } - var statsOut chan []byte + acfg, err := newTraceAgentConfig(ctx, set, cfg, hostProvider) + if err != nil { + return nil, err + } + statsToAgent := make(chan *pb.StatsPayload) + statsWriter := writer.NewStatsWriter(acfg, statsToAgent, telemetry.NewNoopCollector()) + go statsWriter.Run() // TODO: stop this + + var statsIn chan []byte if datadog.ConnectorPerformanceFeatureGate.IsEnabled() { - statsOut = make(chan []byte, 1000) - statsv := set.BuildInfo.Command + set.BuildInfo.Version - f.consumeStatsPayload(ctx, statsOut, traceagent, statsv, set.Logger) + statsIn = make(chan []byte, 1000) + f.consumeStatsPayload(ctx, statsIn, statsToAgent, set.Logger) } pcfg := newMetadataConfigfromConfig(cfg) metadataReporter, err := f.Reporter(set, pcfg) @@ -321,7 +328,7 @@ func (f *factory) createMetricsExporter( return nil } } else { - exp, metricsErr := newMetricsExporter(ctx, set, cfg, &f.onceMetadata, attrsTranslator, hostProvider, traceagent, metadataReporter, statsOut) + exp, metricsErr := newMetricsExporter(ctx, set, cfg, &f.onceMetadata, attrsTranslator, hostProvider, traceagent, metadataReporter, statsIn) if metricsErr != nil { cancel() // first cancel context f.wg.Wait() // then wait for shutdown @@ -345,8 +352,8 @@ func (f *factory) createMetricsExporter( exporterhelper.WithShutdown(func(context.Context) error { cancel() f.StopReporter() - if statsOut != nil { - close(statsOut) + if statsIn != nil { + close(statsIn) } return nil }), diff --git a/exporter/datadogexporter/traces_exporter.go b/exporter/datadogexporter/traces_exporter.go index e9ce7a8ba65c..218457436569 100644 --- a/exporter/datadogexporter/traces_exporter.go +++ b/exporter/datadogexporter/traces_exporter.go @@ -182,6 +182,15 @@ func (exp *traceExporter) exportUsageMetrics(ctx context.Context, hosts map[stri } func newTraceAgent(ctx context.Context, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider) (*agent.Agent, error) { + acfg, err := newTraceAgentConfig(ctx, params, cfg, sourceProvider) + if err != nil { + return nil, err + } + tracelog.SetLogger(&zaplogger{params.Logger}) + return agent.NewAgent(ctx, acfg, telemetry.NewNoopCollector()), nil +} + +func newTraceAgentConfig(ctx context.Context, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider) (*traceconfig.AgentConfig, error) { acfg := traceconfig.New() src, err := sourceProvider.Source(ctx) if err != nil { @@ -209,6 +218,5 @@ func newTraceAgent(ctx context.Context, params exporter.CreateSettings, cfg *Con if addr := cfg.Traces.Endpoint; addr != "" { acfg.Endpoints[0].Host = addr } - tracelog.SetLogger(&zaplogger{params.Logger}) - return agent.NewAgent(ctx, acfg, telemetry.NewNoopCollector()), nil + return acfg, nil } From 78a1cd5ad3c62c8334955c0fa23dc3d4d337508f Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Fri, 9 Feb 2024 15:50:34 -0500 Subject: [PATCH 02/13] Stop the statswriter on shutdown --- exporter/datadogexporter/factory.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index 226ad670252c..d7d142e7d257 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -289,7 +289,7 @@ func (f *factory) createMetricsExporter( } statsToAgent := make(chan *pb.StatsPayload) statsWriter := writer.NewStatsWriter(acfg, statsToAgent, telemetry.NewNoopCollector()) - go statsWriter.Run() // TODO: stop this + go statsWriter.Run() var statsIn chan []byte if datadog.ConnectorPerformanceFeatureGate.IsEnabled() { @@ -352,6 +352,7 @@ func (f *factory) createMetricsExporter( exporterhelper.WithShutdown(func(context.Context) error { cancel() f.StopReporter() + statsWriter.Stop() if statsIn != nil { close(statsIn) } From 2aa0bcc4a61ac907256efd64bfe3fd7e6b6245d6 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Fri, 9 Feb 2024 16:15:40 -0500 Subject: [PATCH 03/13] Close the channel and log on startup --- exporter/datadogexporter/factory.go | 5 +++++ exporter/datadogexporter/traces_exporter.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index d7d142e7d257..f373dff9080d 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -289,6 +289,8 @@ func (f *factory) createMetricsExporter( } statsToAgent := make(chan *pb.StatsPayload) statsWriter := writer.NewStatsWriter(acfg, statsToAgent, telemetry.NewNoopCollector()) + + set.Logger.Debug("Starting Datadog Trace-Agent StatsWriter") go statsWriter.Run() var statsIn chan []byte @@ -356,6 +358,9 @@ func (f *factory) createMetricsExporter( if statsIn != nil { close(statsIn) } + if statsToAgent != nil { + close(statsToAgent) + } return nil }), ) diff --git a/exporter/datadogexporter/traces_exporter.go b/exporter/datadogexporter/traces_exporter.go index 218457436569..a25d3d80583a 100644 --- a/exporter/datadogexporter/traces_exporter.go +++ b/exporter/datadogexporter/traces_exporter.go @@ -186,7 +186,6 @@ func newTraceAgent(ctx context.Context, params exporter.CreateSettings, cfg *Con if err != nil { return nil, err } - tracelog.SetLogger(&zaplogger{params.Logger}) return agent.NewAgent(ctx, acfg, telemetry.NewNoopCollector()), nil } @@ -218,5 +217,6 @@ func newTraceAgentConfig(ctx context.Context, params exporter.CreateSettings, cf if addr := cfg.Traces.Endpoint; addr != "" { acfg.Endpoints[0].Host = addr } + tracelog.SetLogger(&zaplogger{params.Logger}) //TODO: This shouldn't be a singleton return acfg, nil } From 4b59856965af16729c8f4bcffdb9507ed9c7d998 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Mon, 12 Feb 2024 09:14:40 -0500 Subject: [PATCH 04/13] must call cancel before returning error --- exporter/datadogexporter/factory.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index f373dff9080d..e3892c328fd4 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -285,6 +285,7 @@ func (f *factory) createMetricsExporter( } acfg, err := newTraceAgentConfig(ctx, set, cfg, hostProvider) if err != nil { + cancel() return nil, err } statsToAgent := make(chan *pb.StatsPayload) From 0430a477a6c966a77585336943d1653220bb2228 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Mon, 12 Feb 2024 10:22:17 -0500 Subject: [PATCH 05/13] Update integration test to cover this change, (test fails on main) --- .../integrationtest/integration_test.go | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/exporter/datadogexporter/integrationtest/integration_test.go b/exporter/datadogexporter/integrationtest/integration_test.go index e10f8bc399f9..aea78582cc38 100644 --- a/exporter/datadogexporter/integrationtest/integration_test.go +++ b/exporter/datadogexporter/integrationtest/integration_test.go @@ -34,6 +34,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/protobuf/proto" @@ -104,7 +105,7 @@ func TestIntegration(t *testing.T) { for _, chunks := range tps.Chunks { spans = append(spans, chunks.Spans...) for _, span := range chunks.Spans { - assert.Equal(t, span.Meta["_dd.stats_computed"], "true") + assert.Equal(t, "true", span.Meta["_dd.stats_computed"]) } } } @@ -118,8 +119,8 @@ func TestIntegration(t *testing.T) { stats = append(stats, csbs.Stats...) for _, stat := range csbs.Stats { assert.True(t, strings.HasPrefix(stat.Resource, "TestSpan")) - assert.Equal(t, stat.Hits, uint64(1)) - assert.Equal(t, stat.TopLevelHits, uint64(1)) + assert.Equal(t, uint64(1), stat.Hits) + assert.Equal(t, uint64(1), stat.TopLevelHits) } } } @@ -279,18 +280,34 @@ func sendTraces(t *testing.T) { traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure()) require.NoError(t, err) bsp := sdktrace.NewBatchSpanProcessor(traceExporter) + r1, _ := resource.New(ctx, resource.WithAttributes(attribute.String("k8s.node.name", "aaaa"))) + r2, _ := resource.New(ctx, resource.WithAttributes(attribute.String("k8s.node.name", "bbbb"))) tracerProvider := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), sdktrace.WithSpanProcessor(bsp), + sdktrace.WithResource(r1), + ) + tracerProvider2 := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSpanProcessor(bsp), + sdktrace.WithResource(r2), ) otel.SetTracerProvider(tracerProvider) defer func() { require.NoError(t, tracerProvider.Shutdown(ctx)) + require.NoError(t, tracerProvider2.Shutdown(ctx)) }() tracer := otel.Tracer("test-tracer") for i := 0; i < 10; i++ { _, span := tracer.Start(ctx, fmt.Sprintf("TestSpan%d", i)) + + if i == 3 { + // Send some traces from a different resource + // This verifies that stats from different hosts don't accidentally create extraneous empty stats buckets + otel.SetTracerProvider(tracerProvider2) + tracer = otel.Tracer("test-tracer2") + } // Only sample 5 out of the 10 spans if i < 5 { span.SetAttributes(attribute.Bool("sampled", true)) From 1d2870cf1b416ab1403fbabbcbc8a3b7f8379b79 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Mon, 12 Feb 2024 10:28:36 -0500 Subject: [PATCH 06/13] Add changelog --- .../ddog-exporter-fix-multi-resc-stats.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100755 .chloggen/ddog-exporter-fix-multi-resc-stats.yaml diff --git a/.chloggen/ddog-exporter-fix-multi-resc-stats.yaml b/.chloggen/ddog-exporter-fix-multi-resc-stats.yaml new file mode 100755 index 000000000000..a55c767e4c5e --- /dev/null +++ b/.chloggen/ddog-exporter-fix-multi-resc-stats.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: datadogexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix bug where multiple resources would cause datadogexporter to send extraneous additional stats buckets. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31173] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ["user"] From 3bdca5da05ba4ddd94a3621af516e14455a4a3d4 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Mon, 12 Feb 2024 16:57:51 -0500 Subject: [PATCH 07/13] Fix other branch of metrics exporting logic, also needs to use stats writer --- exporter/datadogexporter/factory.go | 7 +-- exporter/datadogexporter/metrics_exporter.go | 43 +++++++++------- .../datadogexporter/metrics_exporter_test.go | 50 +++++++++++++------ 3 files changed, 61 insertions(+), 39 deletions(-) diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index e3892c328fd4..c5c3bffbe8f5 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -278,11 +278,6 @@ func (f *factory) createMetricsExporter( ctx, cancel := context.WithCancel(ctx) // cancel() runs on shutdown var pushMetricsFn consumer.ConsumeMetricsFunc - traceagent, err := f.TraceAgent(ctx, set, cfg, hostProvider) - if err != nil { - cancel() - return nil, fmt.Errorf("failed to start trace-agent: %w", err) - } acfg, err := newTraceAgentConfig(ctx, set, cfg, hostProvider) if err != nil { cancel() @@ -331,7 +326,7 @@ func (f *factory) createMetricsExporter( return nil } } else { - exp, metricsErr := newMetricsExporter(ctx, set, cfg, &f.onceMetadata, attrsTranslator, hostProvider, traceagent, metadataReporter, statsIn) + exp, metricsErr := newMetricsExporter(ctx, set, cfg, acfg, &f.onceMetadata, attrsTranslator, hostProvider, statsToAgent, metadataReporter, statsIn) if metricsErr != nil { cancel() // first cancel context f.wg.Wait() // then wait for shutdown diff --git a/exporter/datadogexporter/metrics_exporter.go b/exporter/datadogexporter/metrics_exporter.go index 1f7958963b00..527b7c7d6928 100644 --- a/exporter/datadogexporter/metrics_exporter.go +++ b/exporter/datadogexporter/metrics_exporter.go @@ -13,7 +13,7 @@ import ( "time" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" - "github.com/DataDog/datadog-agent/pkg/trace/api" + "github.com/DataDog/datadog-agent/pkg/trace/config" "github.com/DataDog/datadog-api-client-go/v2/api/datadogV2" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes" @@ -37,6 +37,7 @@ import ( type metricsExporter struct { params exporter.CreateSettings cfg *Config + agntConfig *config.AgentConfig ctx context.Context client *zorkian.Client metricsAPI *datadogV2.MetricsApi @@ -48,8 +49,8 @@ type metricsExporter struct { metadataReporter *inframetadata.Reporter // getPushTime returns a Unix time in nanoseconds, representing the time pushing metrics. // It will be overwritten in tests. - getPushTime func() uint64 - apmStatsProcessor api.StatsProcessor + getPushTime func() uint64 + statsToAgent chan<- *pb.StatsPayload } // translatorFromConfig creates a new metrics translator from the exporter @@ -95,10 +96,11 @@ func newMetricsExporter( ctx context.Context, params exporter.CreateSettings, cfg *Config, + agntConfig *config.AgentConfig, onceMetadata *sync.Once, attrsTranslator *attributes.Translator, sourceProvider source.Provider, - apmStatsProcessor api.StatsProcessor, + statsToAgent chan<- *pb.StatsPayload, metadataReporter *inframetadata.Reporter, statsOut chan []byte, ) (*metricsExporter, error) { @@ -109,17 +111,18 @@ func newMetricsExporter( scrubber := scrub.NewScrubber() exporter := &metricsExporter{ - params: params, - cfg: cfg, - ctx: ctx, - tr: tr, - scrubber: scrubber, - retrier: clientutil.NewRetrier(params.Logger, cfg.BackOffConfig, scrubber), - onceMetadata: onceMetadata, - sourceProvider: sourceProvider, - getPushTime: func() uint64 { return uint64(time.Now().UTC().UnixNano()) }, - apmStatsProcessor: apmStatsProcessor, - metadataReporter: metadataReporter, + params: params, + cfg: cfg, + ctx: ctx, + agntConfig: agntConfig, + tr: tr, + scrubber: scrubber, + retrier: clientutil.NewRetrier(params.Logger, cfg.BackOffConfig, scrubber), + onceMetadata: onceMetadata, + sourceProvider: sourceProvider, + getPushTime: func() uint64 { return uint64(time.Now().UTC().UnixNano()) }, + statsToAgent: statsToAgent, + metadataReporter: metadataReporter, } errchan := make(chan error) if isMetricExportV2Enabled() { @@ -259,9 +262,13 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pmetric.Metr if len(sp) > 0 { exp.params.Logger.Debug("exporting APM stats payloads", zap.Any("stats_payloads", sp)) - statsv := exp.params.BuildInfo.Command + exp.params.BuildInfo.Version - for _, p := range sp { - exp.apmStatsProcessor.ProcessStats(p, "", statsv) + exp.statsToAgent <- &pb.StatsPayload{ + AgentHostname: exp.agntConfig.Hostname, // This is "dead-code". We will be removing this code path entirely + AgentEnv: exp.agntConfig.DefaultEnv, + Stats: sp, + AgentVersion: exp.agntConfig.AgentVersion, + ClientComputed: false, + SplitPayload: false, } } diff --git a/exporter/datadogexporter/metrics_exporter_test.go b/exporter/datadogexporter/metrics_exporter_test.go index 55eb3e71b1c3..aaca971a07e5 100644 --- a/exporter/datadogexporter/metrics_exporter_test.go +++ b/exporter/datadogexporter/metrics_exporter_test.go @@ -16,6 +16,7 @@ import ( "github.com/DataDog/agent-payload/v5/gogen" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" + traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config" "github.com/DataDog/datadog-api-client-go/v2/api/datadogV2" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata" "github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata/payload" @@ -297,23 +298,24 @@ func Test_metricsExporter_PushMetricsData(t *testing.T) { defer server.Close() var ( - once sync.Once - statsRecorder testutil.MockStatsProcessor + once sync.Once ) - + statsToAgent := make(chan *pb.StatsPayload, 1000) //just to unblock me pusher := newTestPusher(t) reporter, err := inframetadata.NewReporter(zap.NewNop(), pusher, 1*time.Second) require.NoError(t, err) attributesTranslator, err := attributes.NewTranslator(componenttest.NewNopTelemetrySettings()) require.NoError(t, err) + acfg := traceconfig.New() exp, err := newMetricsExporter( context.Background(), exportertest.NewNopCreateSettings(), newTestConfig(t, server.URL, tt.hostTags, tt.histogramMode), + acfg, &once, attributesTranslator, &testutil.MockSourceProvider{Src: tt.source}, - &statsRecorder, + statsToAgent, reporter, nil, ) @@ -357,10 +359,18 @@ func Test_metricsExporter_PushMetricsData(t *testing.T) { assert.NoError(t, err) assert.Equal(t, expected, sketchRecorder.ByteBody) } - if tt.expectedStats == nil { - assert.Len(t, statsRecorder.In, 0) - } else { - assert.ElementsMatch(t, statsRecorder.In, tt.expectedStats) + if tt.expectedStats != nil { + var actualStats []*pb.ClientStatsPayload + pullStats: + for len(actualStats) < len(tt.expectedStats) { + select { + case <-time.After(10 * time.Second): + break pullStats + case sp := <-statsToAgent: + actualStats = append(actualStats, sp.Stats...) + } + } + assert.ElementsMatch(t, actualStats, tt.expectedStats) } }) } @@ -690,22 +700,24 @@ func Test_metricsExporter_PushMetricsData_Zorkian(t *testing.T) { defer server.Close() var ( - once sync.Once - statsRecorder testutil.MockStatsProcessor + once sync.Once ) + statsToAgent := make(chan *pb.StatsPayload, 1000) pusher := newTestPusher(t) reporter, err := inframetadata.NewReporter(zap.NewNop(), pusher, 1*time.Second) require.NoError(t, err) attributesTranslator, err := attributes.NewTranslator(componenttest.NewNopTelemetrySettings()) require.NoError(t, err) + acfg := traceconfig.New() exp, err := newMetricsExporter( context.Background(), exportertest.NewNopCreateSettings(), newTestConfig(t, server.URL, tt.hostTags, tt.histogramMode), + acfg, &once, attributesTranslator, &testutil.MockSourceProvider{Src: tt.source}, - &statsRecorder, + statsToAgent, reporter, nil, ) @@ -744,10 +756,18 @@ func Test_metricsExporter_PushMetricsData_Zorkian(t *testing.T) { assert.NoError(t, err) assert.Equal(t, expected, sketchRecorder.ByteBody) } - if tt.expectedStats == nil { - assert.Len(t, statsRecorder.In, 0) - } else { - assert.ElementsMatch(t, statsRecorder.In, tt.expectedStats) + if tt.expectedStats != nil { + var actualStats []*pb.ClientStatsPayload + pullStats: + for len(actualStats) < len(tt.expectedStats) { + select { + case <-time.After(10 * time.Second): + break pullStats + case sp := <-statsToAgent: + actualStats = append(actualStats, sp.Stats...) + } + } + assert.ElementsMatch(t, actualStats, tt.expectedStats) } }) } From bbfdb351d0298815d67dfbe0d8b20a2df5c318ac Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 13 Feb 2024 08:45:47 -0500 Subject: [PATCH 08/13] Fix linting error, clarify comment --- exporter/datadogexporter/metrics_exporter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/datadogexporter/metrics_exporter_test.go b/exporter/datadogexporter/metrics_exporter_test.go index aaca971a07e5..bb0a6a685aa5 100644 --- a/exporter/datadogexporter/metrics_exporter_test.go +++ b/exporter/datadogexporter/metrics_exporter_test.go @@ -300,7 +300,7 @@ func Test_metricsExporter_PushMetricsData(t *testing.T) { var ( once sync.Once ) - statsToAgent := make(chan *pb.StatsPayload, 1000) //just to unblock me + statsToAgent := make(chan *pb.StatsPayload, 1000) // Buffer the channel to allow test to pass without go-routines pusher := newTestPusher(t) reporter, err := inframetadata.NewReporter(zap.NewNop(), pusher, 1*time.Second) require.NoError(t, err) From 2fe4344628115e22cd90d98464e518d44629ff3a Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 13 Feb 2024 10:09:01 -0500 Subject: [PATCH 09/13] re-generate otelcontribcol --- cmd/otelcontribcol/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 0b368473b08f..5d58b538ebc6 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -4,7 +4,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/cmd/otelcontrib go 1.21 -toolchain go1.21.6 +toolchain go1.21.5 require ( github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector v0.94.0 From 41ebaffdf8c13b9bdeab1b19f34fa1ab3f7c2d53 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 13 Feb 2024 10:20:27 -0500 Subject: [PATCH 10/13] Keep tracerVersion behavior for perf feature gate disabled --- exporter/datadogexporter/metrics_exporter.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/exporter/datadogexporter/metrics_exporter.go b/exporter/datadogexporter/metrics_exporter.go index 527b7c7d6928..2d39f7f8675a 100644 --- a/exporter/datadogexporter/metrics_exporter.go +++ b/exporter/datadogexporter/metrics_exporter.go @@ -262,6 +262,12 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pmetric.Metr if len(sp) > 0 { exp.params.Logger.Debug("exporting APM stats payloads", zap.Any("stats_payloads", sp)) + statsv := exp.params.BuildInfo.Command + exp.params.BuildInfo.Version + for _, csp := range sp { + if csp.TracerVersion == "" { + csp.TracerVersion = statsv + } + } exp.statsToAgent <- &pb.StatsPayload{ AgentHostname: exp.agntConfig.Hostname, // This is "dead-code". We will be removing this code path entirely AgentEnv: exp.agntConfig.DefaultEnv, From 8d62f2b7d556aae791c70f042ef392b3abe19621 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 13 Feb 2024 10:37:06 -0500 Subject: [PATCH 11/13] keep previous behavior of setting tracerVersion --- exporter/datadogexporter/factory.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/exporter/datadogexporter/factory.go b/exporter/datadogexporter/factory.go index c5c3bffbe8f5..1cc2b6c019cd 100644 --- a/exporter/datadogexporter/factory.go +++ b/exporter/datadogexporter/factory.go @@ -239,7 +239,7 @@ func checkAndCastConfig(c component.Config, logger *zap.Logger) *Config { return cfg } -func (f *factory) consumeStatsPayload(ctx context.Context, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, logger *zap.Logger) { +func (f *factory) consumeStatsPayload(ctx context.Context, statsIn <-chan []byte, statsToAgent chan<- *pb.StatsPayload, tracerVersion string, logger *zap.Logger) { for i := 0; i < runtime.NumCPU(); i++ { f.wg.Add(1) go func() { @@ -256,6 +256,11 @@ func (f *factory) consumeStatsPayload(ctx context.Context, statsIn <-chan []byte logger.Error("failed to unmarshal stats payload", zap.Error(err)) continue } + for _, csp := range sp.Stats { + if csp.TracerVersion == "" { + csp.TracerVersion = tracerVersion + } + } statsToAgent <- sp } } @@ -292,7 +297,8 @@ func (f *factory) createMetricsExporter( var statsIn chan []byte if datadog.ConnectorPerformanceFeatureGate.IsEnabled() { statsIn = make(chan []byte, 1000) - f.consumeStatsPayload(ctx, statsIn, statsToAgent, set.Logger) + statsv := set.BuildInfo.Command + set.BuildInfo.Version + f.consumeStatsPayload(ctx, statsIn, statsToAgent, statsv, set.Logger) } pcfg := newMetadataConfigfromConfig(cfg) metadataReporter, err := f.Reporter(set, pcfg) From a862b2d960af8d8eead3281b181649e35f995276 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 13 Feb 2024 10:46:52 -0500 Subject: [PATCH 12/13] Set toolchain to latest to try and get genotelcontribcol check to pass --- cmd/otelcontribcol/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 5d58b538ebc6..271a2734ef8d 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -4,7 +4,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/cmd/otelcontrib go 1.21 -toolchain go1.21.5 +toolchain go1.21.7 require ( github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector v0.94.0 From df1c58e7a510d606e654150c7ca4a6179af875b0 Mon Sep 17 00:00:00 2001 From: Andrew Glaude Date: Tue, 13 Feb 2024 11:08:24 -0500 Subject: [PATCH 13/13] make genoteltestbedcol changes toolchain version --- cmd/oteltestbedcol/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/oteltestbedcol/go.mod b/cmd/oteltestbedcol/go.mod index 20dfdb446402..efebc1cd90d7 100644 --- a/cmd/oteltestbedcol/go.mod +++ b/cmd/oteltestbedcol/go.mod @@ -4,7 +4,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/cmd/oteltestbed go 1.21 -toolchain go1.21.6 +toolchain go1.21.7 require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter v0.94.0