From f53bd40a82e881e77b9032a058950d46e4f4dc08 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 11 Dec 2020 17:11:32 +0100 Subject: [PATCH] [datadogexporter] Use a singleton for sublayer calculation (#1759) * Use a single Sublayer Calculator Ideally, we would use one sublayer calculator per worker as the Datadog Agent does, but we can't here since we don't have access to the queue consumers. * Amend tests * Address linter issue * Unexport all functions that can be kept private --- exporter/datadogexporter/model.go | 12 +++---- exporter/datadogexporter/stats.go | 7 ++-- exporter/datadogexporter/trace_connection.go | 4 +-- exporter/datadogexporter/traces_exporter.go | 35 ++++++++++++++----- exporter/datadogexporter/translate_traces.go | 11 +++--- .../datadogexporter/translate_traces_test.go | 34 ++++++++++-------- 6 files changed, 61 insertions(+), 42 deletions(-) diff --git a/exporter/datadogexporter/model.go b/exporter/datadogexporter/model.go index 34a5a6169011..d0fc4525e3a4 100644 --- a/exporter/datadogexporter/model.go +++ b/exporter/datadogexporter/model.go @@ -23,8 +23,8 @@ import ( "github.com/DataDog/datadog-agent/pkg/trace/exportable/traceutil" ) -// ObfuscatePayload applies obfuscator rules to the trace payloads -func ObfuscatePayload(obfuscator *obfuscate.Obfuscator, tracePayloads []*pb.TracePayload) { +// obfuscatePayload applies obfuscator rules to the trace payloads +func obfuscatePayload(obfuscator *obfuscate.Obfuscator, tracePayloads []*pb.TracePayload) { for _, tracePayload := range tracePayloads { // Obfuscate the traces in the payload @@ -36,7 +36,7 @@ func ObfuscatePayload(obfuscator *obfuscate.Obfuscator, tracePayloads []*pb.Trac } } -// GetAnalyzedSpans finds all the analyzed spans in a trace, including top level spans +// getAnalyzedSpans finds all the analyzed spans in a trace, including top level spans // and spans marked as analyzed by the tracer. // A span is considered top-level if: // - it's a root span @@ -44,7 +44,7 @@ func ObfuscatePayload(obfuscator *obfuscate.Obfuscator, tracePayloads []*pb.Trac // - its parent belongs to another service (in that case it's a "local root" // being the highest ancestor of other spans belonging to this service and // attached to it). -func GetAnalyzedSpans(sps []*pb.Span) []*pb.Span { +func getAnalyzedSpans(sps []*pb.Span) []*pb.Span { // build a lookup map spanIDToIdx := make(map[uint64]int, len(sps)) for i, span := range sps { @@ -77,14 +77,14 @@ func GetAnalyzedSpans(sps []*pb.Span) []*pb.Span { // Compute Sublayers updates a spans metrics with relevant metadata so that it's duration and breakdown between different services can // be accurately displayed in the Datadog UI -func ComputeSublayerMetrics(calculator *stats.SublayerCalculator, t pb.Trace) { +func computeSublayerMetrics(calculator *sublayerCalculator, t pb.Trace) { root := traceutil.GetRoot(t) traceutil.ComputeTopLevel(t) subtraces := stats.ExtractSubtraces(t, root) sublayers := make(map[*pb.Span][]stats.SublayerValue) for _, subtrace := range subtraces { - subtraceSublayers := calculator.ComputeSublayers(subtrace.Trace) + subtraceSublayers := calculator.computeSublayers(subtrace.Trace) sublayers[subtrace.Root] = subtraceSublayers stats.SetSublayersOnSpan(subtrace.Root, subtraceSublayers) } diff --git a/exporter/datadogexporter/stats.go b/exporter/datadogexporter/stats.go index 8dd55f0260d1..d72785896a44 100644 --- a/exporter/datadogexporter/stats.go +++ b/exporter/datadogexporter/stats.go @@ -26,14 +26,13 @@ const ( ) // ComputeAPMStats calculates the stats that should be submitted to APM about a given trace -func ComputeAPMStats(tracePayload *pb.TracePayload, calculator *stats.SublayerCalculator, pushTime int64) *stats.Payload { - +func computeAPMStats(tracePayload *pb.TracePayload, calculator *sublayerCalculator, pushTime int64) *stats.Payload { statsRawBuckets := make(map[int64]*stats.RawBucket) bucketTS := pushTime - statsBucketDuration for _, trace := range tracePayload.Traces { - spans := GetAnalyzedSpans(trace.Spans) - sublayers := calculator.ComputeSublayers(trace.Spans) + spans := getAnalyzedSpans(trace.Spans) + sublayers := calculator.computeSublayers(trace.Spans) for _, span := range spans { // TODO: While this is hardcoded to assume a single 10s buckets for now, diff --git a/exporter/datadogexporter/trace_connection.go b/exporter/datadogexporter/trace_connection.go index fac5cfb684e8..a6aa3a481a4f 100644 --- a/exporter/datadogexporter/trace_connection.go +++ b/exporter/datadogexporter/trace_connection.go @@ -49,8 +49,8 @@ const ( traceEdgeRetryInterval time.Duration = 10 * time.Second ) -// CreateTraceEdgeConnection returns a new TraceEdgeConnection -func CreateTraceEdgeConnection(rootURL, apiKey string, startInfo component.ApplicationStartInfo) TraceEdgeConnection { +// createTraceEdgeConnection returns a new TraceEdgeConnection +func createTraceEdgeConnection(rootURL, apiKey string, startInfo component.ApplicationStartInfo) TraceEdgeConnection { return &traceEdgeConnection{ traceURL: rootURL + "/api/v0.2/traces", diff --git a/exporter/datadogexporter/traces_exporter.go b/exporter/datadogexporter/traces_exporter.go index 2d9416de810b..f022896a401f 100644 --- a/exporter/datadogexporter/traces_exporter.go +++ b/exporter/datadogexporter/traces_exporter.go @@ -16,6 +16,7 @@ package datadogexporter import ( "context" + "sync" "time" "github.com/DataDog/datadog-agent/pkg/trace/exportable/config/configdefs" @@ -32,11 +33,27 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/utils" ) +// sublayerCalculator is thread safe wrapper of a sublayer +// calculator. Each trace exporter has a single sublayer +// calculator that is reused by each push +type sublayerCalculator struct { + sc *stats.SublayerCalculator + mutex sync.Mutex +} + +// ComputeSublayers computes the sublayers of a trace +func (s *sublayerCalculator) computeSublayers(trace pb.Trace) []stats.SublayerValue { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.sc.ComputeSublayers(trace) +} + type traceExporter struct { logger *zap.Logger cfg *config.Config edgeConnection TraceEdgeConnection obfuscator *obfuscate.Obfuscator + calculator *sublayerCalculator client *datadog.Client } @@ -67,11 +84,13 @@ func newTraceExporter(params component.ExporterCreateParams, cfg *config.Config) // https://github.com/DataDog/datadog-serverless-functions/blob/11f170eac105d66be30f18eda09eca791bc0d31b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go#L43 obfuscator := obfuscate.NewObfuscator(obfuscatorConfig) + calculator := &sublayerCalculator{sc: stats.NewSublayerCalculator()} exporter := &traceExporter{ logger: params.Logger, cfg: cfg, - edgeConnection: CreateTraceEdgeConnection(cfg.Traces.TCPAddr.Endpoint, cfg.API.Key, params.ApplicationStartInfo), + edgeConnection: createTraceEdgeConnection(cfg.Traces.TCPAddr.Endpoint, cfg.API.Key, params.ApplicationStartInfo), obfuscator: obfuscator, + calculator: calculator, client: client, } @@ -94,12 +113,10 @@ func (exp *traceExporter) pushTraceData( td pdata.Traces, ) (int, error) { - calculator := stats.NewSublayerCalculator() - // convert traces to datadog traces and group trace payloads by env // we largely apply the same logic as the serverless implementation, simplified a bit // https://github.com/DataDog/datadog-serverless-functions/blob/f5c3aedfec5ba223b11b76a4239fcbf35ec7d045/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go#L61-L83 - ddTraces, err := ConvertToDatadogTd(td, calculator, exp.cfg) + ddTraces, err := convertToDatadogTd(td, exp.calculator, exp.cfg) if err != nil { exp.logger.Info("failed to convert traces", zap.Error(err)) @@ -107,17 +124,17 @@ func (exp *traceExporter) pushTraceData( } // group the traces by env to reduce the number of flushes - aggregatedTraces := AggregateTracePayloadsByEnv(ddTraces) + aggregatedTraces := aggregateTracePayloadsByEnv(ddTraces) // security/obfuscation for db, query strings, stack traces, pii, etc // TODO: is there any config we want here? OTEL has their own pipeline for regex obfuscation - ObfuscatePayload(exp.obfuscator, aggregatedTraces) + obfuscatePayload(exp.obfuscator, aggregatedTraces) pushTime := time.Now().UTC().UnixNano() for _, ddTracePayload := range aggregatedTraces { // currently we don't want to do retries since api endpoints may not dedupe in certain situations // adding a helper function here to make custom retry logic easier in the future - exp.pushWithRetry(ctx, ddTracePayload, calculator, 1, pushTime, func() error { + exp.pushWithRetry(ctx, ddTracePayload, 1, pushTime, func() error { return nil }) } @@ -131,7 +148,7 @@ func (exp *traceExporter) pushTraceData( } // gives us flexibility to add custom retry logic later -func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb.TracePayload, calculator *stats.SublayerCalculator, maxRetries int, pushTime int64, fn func() error) error { +func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb.TracePayload, maxRetries int, pushTime int64, fn func() error) error { err := exp.edgeConnection.SendTraces(ctx, ddTracePayload, maxRetries) if err != nil { @@ -139,7 +156,7 @@ func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb. } // this is for generating metrics like hits, errors, and latency, it uses a separate endpoint than Traces - stats := ComputeAPMStats(ddTracePayload, calculator, pushTime) + stats := computeAPMStats(ddTracePayload, exp.calculator, pushTime) errStats := exp.edgeConnection.SendStats(context.Background(), stats, maxRetries) if errStats != nil { diff --git a/exporter/datadogexporter/translate_traces.go b/exporter/datadogexporter/translate_traces.go index e347a9dcd0ca..894dde5521a4 100644 --- a/exporter/datadogexporter/translate_traces.go +++ b/exporter/datadogexporter/translate_traces.go @@ -20,7 +20,6 @@ import ( "strconv" "github.com/DataDog/datadog-agent/pkg/trace/exportable/pb" - "github.com/DataDog/datadog-agent/pkg/trace/exportable/stats" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/translator/conventions" tracetranslator "go.opentelemetry.io/collector/translator/trace" @@ -45,7 +44,7 @@ const ( ) // converts Traces into an array of datadog trace payloads grouped by env -func ConvertToDatadogTd(td pdata.Traces, calculator *stats.SublayerCalculator, cfg *config.Config) ([]*pb.TracePayload, error) { +func convertToDatadogTd(td pdata.Traces, calculator *sublayerCalculator, cfg *config.Config) ([]*pb.TracePayload, error) { // TODO: // do we apply other global tags, like version+service, to every span or only root spans of a service // should globalTags['service'] take precedence over a trace's resource.service.name? I don't believe so, need to confirm @@ -75,7 +74,7 @@ func ConvertToDatadogTd(td pdata.Traces, calculator *stats.SublayerCalculator, c return traces, nil } -func AggregateTracePayloadsByEnv(tracePayloads []*pb.TracePayload) []*pb.TracePayload { +func aggregateTracePayloadsByEnv(tracePayloads []*pb.TracePayload) []*pb.TracePayload { lookup := make(map[string]*pb.TracePayload) for _, tracePayload := range tracePayloads { key := fmt.Sprintf("%s|%s", tracePayload.HostName, tracePayload.Env) @@ -102,7 +101,7 @@ func AggregateTracePayloadsByEnv(tracePayloads []*pb.TracePayload) []*pb.TracePa } // converts a Trace's resource spans into a trace payload -func resourceSpansToDatadogSpans(rs pdata.ResourceSpans, calculator *stats.SublayerCalculator, hostname string, cfg *config.Config) (pb.TracePayload, error) { +func resourceSpansToDatadogSpans(rs pdata.ResourceSpans, calculator *sublayerCalculator, hostname string, cfg *config.Config) (pb.TracePayload, error) { // get env tag env := cfg.Env @@ -164,12 +163,12 @@ func resourceSpansToDatadogSpans(rs pdata.ResourceSpans, calculator *stats.Subla // calculates analyzed spans for use in trace search and app analytics // appends a specific piece of metadata to these spans marking them as analyzed // TODO: allow users to configure specific spans to be marked as an analyzed spans for app analytics - top := GetAnalyzedSpans(apiTrace.Spans) + top := getAnalyzedSpans(apiTrace.Spans) // calculates span metrics for representing direction and timing among it's different services for display in // service overview graphs // see: https://github.com/DataDog/datadog-agent/blob/f69a7d35330c563e9cad4c5b8865a357a87cd0dc/pkg/trace/stats/sublayers.go#L204 - ComputeSublayerMetrics(calculator, apiTrace.Spans) + computeSublayerMetrics(calculator, apiTrace.Spans) payload.Transactions = append(payload.Transactions, top...) payload.Traces = append(payload.Traces, apiTrace) } diff --git a/exporter/datadogexporter/translate_traces_test.go b/exporter/datadogexporter/translate_traces_test.go index 16e5edb1dd4a..172f4834eb21 100644 --- a/exporter/datadogexporter/translate_traces_test.go +++ b/exporter/datadogexporter/translate_traces_test.go @@ -113,12 +113,16 @@ func NewResourceSpansData(mockTraceID [16]byte, mockSpanID [8]byte, mockParentSp return rs } +func newSublayerCalculator() *sublayerCalculator { + return &sublayerCalculator{sc: stats.NewSublayerCalculator()} +} + func TestConvertToDatadogTd(t *testing.T) { traces := pdata.NewTraces() traces.ResourceSpans().Resize(1) - calculator := stats.NewSublayerCalculator() + calculator := newSublayerCalculator() - outputTraces, err := ConvertToDatadogTd(traces, calculator, &config.Config{}) + outputTraces, err := convertToDatadogTd(traces, calculator, &config.Config{}) assert.NoError(t, err) assert.Equal(t, 1, len(outputTraces)) @@ -126,16 +130,16 @@ func TestConvertToDatadogTd(t *testing.T) { func TestConvertToDatadogTdNoResourceSpans(t *testing.T) { traces := pdata.NewTraces() - calculator := stats.NewSublayerCalculator() + calculator := newSublayerCalculator() - outputTraces, err := ConvertToDatadogTd(traces, calculator, &config.Config{}) + outputTraces, err := convertToDatadogTd(traces, calculator, &config.Config{}) assert.NoError(t, err) assert.Equal(t, 0, len(outputTraces)) } func TestObfuscation(t *testing.T) { - calculator := stats.NewSublayerCalculator() + calculator := newSublayerCalculator() traces := pdata.NewTraces() traces.ResourceSpans().Resize(1) @@ -157,22 +161,22 @@ func TestObfuscation(t *testing.T) { // of them is currently not supported. span.Attributes().InsertString("testinfo?=123", "http.route") - outputTraces, err := ConvertToDatadogTd(traces, calculator, &config.Config{}) + outputTraces, err := convertToDatadogTd(traces, calculator, &config.Config{}) assert.NoError(t, err) - aggregatedTraces := AggregateTracePayloadsByEnv(outputTraces) + aggregatedTraces := aggregateTracePayloadsByEnv(outputTraces) obfuscator := obfuscate.NewObfuscator(obfuscatorConfig) - ObfuscatePayload(obfuscator, aggregatedTraces) + obfuscatePayload(obfuscator, aggregatedTraces) assert.Equal(t, 1, len(aggregatedTraces)) } func TestBasicTracesTranslation(t *testing.T) { hostname := "testhostname" - calculator := stats.NewSublayerCalculator() + calculator := newSublayerCalculator() // generate mock trace, span and parent span ids mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} @@ -239,7 +243,7 @@ func TestBasicTracesTranslation(t *testing.T) { func TestTracesTranslationErrorsAndResource(t *testing.T) { hostname := "testhostname" - calculator := stats.NewSublayerCalculator() + calculator := newSublayerCalculator() // generate mock trace, span and parent span ids mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} @@ -290,7 +294,7 @@ func TestTracesTranslationErrorsAndResource(t *testing.T) { func TestTracesTranslationOkStatus(t *testing.T) { hostname := "testhostname" - calculator := stats.NewSublayerCalculator() + calculator := newSublayerCalculator() // generate mock trace, span and parent span ids mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} @@ -342,7 +346,7 @@ func TestTracesTranslationOkStatus(t *testing.T) { // ensure that the datadog span uses the configured unified service tags func TestTracesTranslationConfig(t *testing.T) { hostname := "testhostname" - calculator := stats.NewSublayerCalculator() + calculator := newSublayerCalculator() // generate mock trace, span and parent span ids mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} @@ -392,7 +396,7 @@ func TestTracesTranslationConfig(t *testing.T) { // ensure that the translation returns early if no resource instrumentation library spans func TestTracesTranslationNoIls(t *testing.T) { hostname := "testhostname" - calculator := stats.NewSublayerCalculator() + calculator := newSublayerCalculator() rs := pdata.NewResourceSpans() @@ -541,7 +545,7 @@ func TestTracePayloadAggr(t *testing.T) { originalPayload = append(originalPayload, &payloadOne) originalPayload = append(originalPayload, &payloadTwo) - updatedPayloads := AggregateTracePayloadsByEnv(originalPayload) + updatedPayloads := aggregateTracePayloadsByEnv(originalPayload) assert.Equal(t, 2, len(originalPayload)) assert.Equal(t, 1, len(updatedPayloads)) @@ -567,7 +571,7 @@ func TestTracePayloadAggr(t *testing.T) { originalPayloadDifferentEnv = append(originalPayloadDifferentEnv, &payloadThree) originalPayloadDifferentEnv = append(originalPayloadDifferentEnv, &payloadFour) - updatedPayloadsDifferentEnv := AggregateTracePayloadsByEnv(originalPayloadDifferentEnv) + updatedPayloadsDifferentEnv := aggregateTracePayloadsByEnv(originalPayloadDifferentEnv) assert.Equal(t, 2, len(originalPayloadDifferentEnv)) assert.Equal(t, 2, len(updatedPayloadsDifferentEnv))