Skip to content

Commit

Permalink
[datadogexporter] Use a singleton for sublayer calculation (#1759)
Browse files Browse the repository at this point in the history
* Use a single Sublayer Calculator

Ideally, we would use one sublayer calculator per worker as the Datadog
Agent does, but we can't here since we don't have access to the queue
consumers.

* Amend tests

* Address linter issue

* Unexport all functions that can be kept private
  • Loading branch information
mx-psi committed Dec 11, 2020
1 parent 241c509 commit f53bd40
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 42 deletions.
12 changes: 6 additions & 6 deletions exporter/datadogexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/exportable/traceutil"
)

// ObfuscatePayload applies obfuscator rules to the trace payloads
func ObfuscatePayload(obfuscator *obfuscate.Obfuscator, tracePayloads []*pb.TracePayload) {
// obfuscatePayload applies obfuscator rules to the trace payloads
func obfuscatePayload(obfuscator *obfuscate.Obfuscator, tracePayloads []*pb.TracePayload) {
for _, tracePayload := range tracePayloads {

// Obfuscate the traces in the payload
Expand All @@ -36,15 +36,15 @@ func ObfuscatePayload(obfuscator *obfuscate.Obfuscator, tracePayloads []*pb.Trac
}
}

// GetAnalyzedSpans finds all the analyzed spans in a trace, including top level spans
// getAnalyzedSpans finds all the analyzed spans in a trace, including top level spans
// and spans marked as analyzed by the tracer.
// A span is considered top-level if:
// - it's a root span
// - its parent is unknown (other part of the code, distributed trace)
// - its parent belongs to another service (in that case it's a "local root"
// being the highest ancestor of other spans belonging to this service and
// attached to it).
func GetAnalyzedSpans(sps []*pb.Span) []*pb.Span {
func getAnalyzedSpans(sps []*pb.Span) []*pb.Span {
// build a lookup map
spanIDToIdx := make(map[uint64]int, len(sps))
for i, span := range sps {
Expand Down Expand Up @@ -77,14 +77,14 @@ func GetAnalyzedSpans(sps []*pb.Span) []*pb.Span {

// Compute Sublayers updates a spans metrics with relevant metadata so that it's duration and breakdown between different services can
// be accurately displayed in the Datadog UI
func ComputeSublayerMetrics(calculator *stats.SublayerCalculator, t pb.Trace) {
func computeSublayerMetrics(calculator *sublayerCalculator, t pb.Trace) {
root := traceutil.GetRoot(t)
traceutil.ComputeTopLevel(t)

subtraces := stats.ExtractSubtraces(t, root)
sublayers := make(map[*pb.Span][]stats.SublayerValue)
for _, subtrace := range subtraces {
subtraceSublayers := calculator.ComputeSublayers(subtrace.Trace)
subtraceSublayers := calculator.computeSublayers(subtrace.Trace)
sublayers[subtrace.Root] = subtraceSublayers
stats.SetSublayersOnSpan(subtrace.Root, subtraceSublayers)
}
Expand Down
7 changes: 3 additions & 4 deletions exporter/datadogexporter/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ const (
)

// ComputeAPMStats calculates the stats that should be submitted to APM about a given trace
func ComputeAPMStats(tracePayload *pb.TracePayload, calculator *stats.SublayerCalculator, pushTime int64) *stats.Payload {

func computeAPMStats(tracePayload *pb.TracePayload, calculator *sublayerCalculator, pushTime int64) *stats.Payload {
statsRawBuckets := make(map[int64]*stats.RawBucket)

bucketTS := pushTime - statsBucketDuration
for _, trace := range tracePayload.Traces {
spans := GetAnalyzedSpans(trace.Spans)
sublayers := calculator.ComputeSublayers(trace.Spans)
spans := getAnalyzedSpans(trace.Spans)
sublayers := calculator.computeSublayers(trace.Spans)
for _, span := range spans {

// TODO: While this is hardcoded to assume a single 10s buckets for now,
Expand Down
4 changes: 2 additions & 2 deletions exporter/datadogexporter/trace_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
traceEdgeRetryInterval time.Duration = 10 * time.Second
)

// CreateTraceEdgeConnection returns a new TraceEdgeConnection
func CreateTraceEdgeConnection(rootURL, apiKey string, startInfo component.ApplicationStartInfo) TraceEdgeConnection {
// createTraceEdgeConnection returns a new TraceEdgeConnection
func createTraceEdgeConnection(rootURL, apiKey string, startInfo component.ApplicationStartInfo) TraceEdgeConnection {

return &traceEdgeConnection{
traceURL: rootURL + "/api/v0.2/traces",
Expand Down
35 changes: 26 additions & 9 deletions exporter/datadogexporter/traces_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package datadogexporter

import (
"context"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/trace/exportable/config/configdefs"
Expand All @@ -32,11 +33,27 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/utils"
)

// sublayerCalculator is thread safe wrapper of a sublayer
// calculator. Each trace exporter has a single sublayer
// calculator that is reused by each push
type sublayerCalculator struct {
sc *stats.SublayerCalculator
mutex sync.Mutex
}

// ComputeSublayers computes the sublayers of a trace
func (s *sublayerCalculator) computeSublayers(trace pb.Trace) []stats.SublayerValue {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.sc.ComputeSublayers(trace)
}

type traceExporter struct {
logger *zap.Logger
cfg *config.Config
edgeConnection TraceEdgeConnection
obfuscator *obfuscate.Obfuscator
calculator *sublayerCalculator
client *datadog.Client
}

Expand Down Expand Up @@ -67,11 +84,13 @@ func newTraceExporter(params component.ExporterCreateParams, cfg *config.Config)
// https://github.com/DataDog/datadog-serverless-functions/blob/11f170eac105d66be30f18eda09eca791bc0d31b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go#L43
obfuscator := obfuscate.NewObfuscator(obfuscatorConfig)

calculator := &sublayerCalculator{sc: stats.NewSublayerCalculator()}
exporter := &traceExporter{
logger: params.Logger,
cfg: cfg,
edgeConnection: CreateTraceEdgeConnection(cfg.Traces.TCPAddr.Endpoint, cfg.API.Key, params.ApplicationStartInfo),
edgeConnection: createTraceEdgeConnection(cfg.Traces.TCPAddr.Endpoint, cfg.API.Key, params.ApplicationStartInfo),
obfuscator: obfuscator,
calculator: calculator,
client: client,
}

Expand All @@ -94,30 +113,28 @@ func (exp *traceExporter) pushTraceData(
td pdata.Traces,
) (int, error) {

calculator := stats.NewSublayerCalculator()

// convert traces to datadog traces and group trace payloads by env
// we largely apply the same logic as the serverless implementation, simplified a bit
// https://github.com/DataDog/datadog-serverless-functions/blob/f5c3aedfec5ba223b11b76a4239fcbf35ec7d045/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go#L61-L83
ddTraces, err := ConvertToDatadogTd(td, calculator, exp.cfg)
ddTraces, err := convertToDatadogTd(td, exp.calculator, exp.cfg)

if err != nil {
exp.logger.Info("failed to convert traces", zap.Error(err))
return 0, err
}

// group the traces by env to reduce the number of flushes
aggregatedTraces := AggregateTracePayloadsByEnv(ddTraces)
aggregatedTraces := aggregateTracePayloadsByEnv(ddTraces)

// security/obfuscation for db, query strings, stack traces, pii, etc
// TODO: is there any config we want here? OTEL has their own pipeline for regex obfuscation
ObfuscatePayload(exp.obfuscator, aggregatedTraces)
obfuscatePayload(exp.obfuscator, aggregatedTraces)

pushTime := time.Now().UTC().UnixNano()
for _, ddTracePayload := range aggregatedTraces {
// currently we don't want to do retries since api endpoints may not dedupe in certain situations
// adding a helper function here to make custom retry logic easier in the future
exp.pushWithRetry(ctx, ddTracePayload, calculator, 1, pushTime, func() error {
exp.pushWithRetry(ctx, ddTracePayload, 1, pushTime, func() error {
return nil
})
}
Expand All @@ -131,15 +148,15 @@ func (exp *traceExporter) pushTraceData(
}

// gives us flexibility to add custom retry logic later
func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb.TracePayload, calculator *stats.SublayerCalculator, maxRetries int, pushTime int64, fn func() error) error {
func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb.TracePayload, maxRetries int, pushTime int64, fn func() error) error {
err := exp.edgeConnection.SendTraces(ctx, ddTracePayload, maxRetries)

if err != nil {
exp.logger.Info("failed to send traces", zap.Error(err))
}

// this is for generating metrics like hits, errors, and latency, it uses a separate endpoint than Traces
stats := ComputeAPMStats(ddTracePayload, calculator, pushTime)
stats := computeAPMStats(ddTracePayload, exp.calculator, pushTime)
errStats := exp.edgeConnection.SendStats(context.Background(), stats, maxRetries)

if errStats != nil {
Expand Down
11 changes: 5 additions & 6 deletions exporter/datadogexporter/translate_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"strconv"

"github.com/DataDog/datadog-agent/pkg/trace/exportable/pb"
"github.com/DataDog/datadog-agent/pkg/trace/exportable/stats"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
Expand All @@ -45,7 +44,7 @@ const (
)

// converts Traces into an array of datadog trace payloads grouped by env
func ConvertToDatadogTd(td pdata.Traces, calculator *stats.SublayerCalculator, cfg *config.Config) ([]*pb.TracePayload, error) {
func convertToDatadogTd(td pdata.Traces, calculator *sublayerCalculator, cfg *config.Config) ([]*pb.TracePayload, error) {
// TODO:
// do we apply other global tags, like version+service, to every span or only root spans of a service
// should globalTags['service'] take precedence over a trace's resource.service.name? I don't believe so, need to confirm
Expand Down Expand Up @@ -75,7 +74,7 @@ func ConvertToDatadogTd(td pdata.Traces, calculator *stats.SublayerCalculator, c
return traces, nil
}

func AggregateTracePayloadsByEnv(tracePayloads []*pb.TracePayload) []*pb.TracePayload {
func aggregateTracePayloadsByEnv(tracePayloads []*pb.TracePayload) []*pb.TracePayload {
lookup := make(map[string]*pb.TracePayload)
for _, tracePayload := range tracePayloads {
key := fmt.Sprintf("%s|%s", tracePayload.HostName, tracePayload.Env)
Expand All @@ -102,7 +101,7 @@ func AggregateTracePayloadsByEnv(tracePayloads []*pb.TracePayload) []*pb.TracePa
}

// converts a Trace's resource spans into a trace payload
func resourceSpansToDatadogSpans(rs pdata.ResourceSpans, calculator *stats.SublayerCalculator, hostname string, cfg *config.Config) (pb.TracePayload, error) {
func resourceSpansToDatadogSpans(rs pdata.ResourceSpans, calculator *sublayerCalculator, hostname string, cfg *config.Config) (pb.TracePayload, error) {
// get env tag
env := cfg.Env

Expand Down Expand Up @@ -164,12 +163,12 @@ func resourceSpansToDatadogSpans(rs pdata.ResourceSpans, calculator *stats.Subla
// calculates analyzed spans for use in trace search and app analytics
// appends a specific piece of metadata to these spans marking them as analyzed
// TODO: allow users to configure specific spans to be marked as an analyzed spans for app analytics
top := GetAnalyzedSpans(apiTrace.Spans)
top := getAnalyzedSpans(apiTrace.Spans)

// calculates span metrics for representing direction and timing among it's different services for display in
// service overview graphs
// see: https://github.com/DataDog/datadog-agent/blob/f69a7d35330c563e9cad4c5b8865a357a87cd0dc/pkg/trace/stats/sublayers.go#L204
ComputeSublayerMetrics(calculator, apiTrace.Spans)
computeSublayerMetrics(calculator, apiTrace.Spans)
payload.Transactions = append(payload.Transactions, top...)
payload.Traces = append(payload.Traces, apiTrace)
}
Expand Down
34 changes: 19 additions & 15 deletions exporter/datadogexporter/translate_traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,33 @@ func NewResourceSpansData(mockTraceID [16]byte, mockSpanID [8]byte, mockParentSp
return rs
}

func newSublayerCalculator() *sublayerCalculator {
return &sublayerCalculator{sc: stats.NewSublayerCalculator()}
}

func TestConvertToDatadogTd(t *testing.T) {
traces := pdata.NewTraces()
traces.ResourceSpans().Resize(1)
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

outputTraces, err := ConvertToDatadogTd(traces, calculator, &config.Config{})
outputTraces, err := convertToDatadogTd(traces, calculator, &config.Config{})

assert.NoError(t, err)
assert.Equal(t, 1, len(outputTraces))
}

func TestConvertToDatadogTdNoResourceSpans(t *testing.T) {
traces := pdata.NewTraces()
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

outputTraces, err := ConvertToDatadogTd(traces, calculator, &config.Config{})
outputTraces, err := convertToDatadogTd(traces, calculator, &config.Config{})

assert.NoError(t, err)
assert.Equal(t, 0, len(outputTraces))
}

func TestObfuscation(t *testing.T) {
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

traces := pdata.NewTraces()
traces.ResourceSpans().Resize(1)
Expand All @@ -157,22 +161,22 @@ func TestObfuscation(t *testing.T) {
// of them is currently not supported.
span.Attributes().InsertString("testinfo?=123", "http.route")

outputTraces, err := ConvertToDatadogTd(traces, calculator, &config.Config{})
outputTraces, err := convertToDatadogTd(traces, calculator, &config.Config{})

assert.NoError(t, err)

aggregatedTraces := AggregateTracePayloadsByEnv(outputTraces)
aggregatedTraces := aggregateTracePayloadsByEnv(outputTraces)

obfuscator := obfuscate.NewObfuscator(obfuscatorConfig)

ObfuscatePayload(obfuscator, aggregatedTraces)
obfuscatePayload(obfuscator, aggregatedTraces)

assert.Equal(t, 1, len(aggregatedTraces))
}

func TestBasicTracesTranslation(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

// generate mock trace, span and parent span ids
mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
Expand Down Expand Up @@ -239,7 +243,7 @@ func TestBasicTracesTranslation(t *testing.T) {

func TestTracesTranslationErrorsAndResource(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

// generate mock trace, span and parent span ids
mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
Expand Down Expand Up @@ -290,7 +294,7 @@ func TestTracesTranslationErrorsAndResource(t *testing.T) {

func TestTracesTranslationOkStatus(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

// generate mock trace, span and parent span ids
mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
Expand Down Expand Up @@ -342,7 +346,7 @@ func TestTracesTranslationOkStatus(t *testing.T) {
// ensure that the datadog span uses the configured unified service tags
func TestTracesTranslationConfig(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

// generate mock trace, span and parent span ids
mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
Expand Down Expand Up @@ -392,7 +396,7 @@ func TestTracesTranslationConfig(t *testing.T) {
// ensure that the translation returns early if no resource instrumentation library spans
func TestTracesTranslationNoIls(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

rs := pdata.NewResourceSpans()

Expand Down Expand Up @@ -541,7 +545,7 @@ func TestTracePayloadAggr(t *testing.T) {
originalPayload = append(originalPayload, &payloadOne)
originalPayload = append(originalPayload, &payloadTwo)

updatedPayloads := AggregateTracePayloadsByEnv(originalPayload)
updatedPayloads := aggregateTracePayloadsByEnv(originalPayload)

assert.Equal(t, 2, len(originalPayload))
assert.Equal(t, 1, len(updatedPayloads))
Expand All @@ -567,7 +571,7 @@ func TestTracePayloadAggr(t *testing.T) {
originalPayloadDifferentEnv = append(originalPayloadDifferentEnv, &payloadThree)
originalPayloadDifferentEnv = append(originalPayloadDifferentEnv, &payloadFour)

updatedPayloadsDifferentEnv := AggregateTracePayloadsByEnv(originalPayloadDifferentEnv)
updatedPayloadsDifferentEnv := aggregateTracePayloadsByEnv(originalPayloadDifferentEnv)

assert.Equal(t, 2, len(originalPayloadDifferentEnv))
assert.Equal(t, 2, len(updatedPayloadsDifferentEnv))
Expand Down

0 comments on commit f53bd40

Please sign in to comment.