Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[datadogexporter] Use a singleton for sublayer calculation #1759

Merged
merged 4 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporter/datadogexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func GetAnalyzedSpans(sps []*pb.Span) []*pb.Span {

// Compute Sublayers updates a spans metrics with relevant metadata so that it's duration and breakdown between different services can
// be accurately displayed in the Datadog UI
func ComputeSublayerMetrics(calculator *stats.SublayerCalculator, t pb.Trace) {
func ComputeSublayerMetrics(calculator *sublayerCalculator, t pb.Trace) {
root := traceutil.GetRoot(t)
traceutil.ComputeTopLevel(t)

Expand Down
3 changes: 1 addition & 2 deletions exporter/datadogexporter/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ const (
)

// ComputeAPMStats calculates the stats that should be submitted to APM about a given trace
func ComputeAPMStats(tracePayload *pb.TracePayload, calculator *stats.SublayerCalculator, pushTime int64) *stats.Payload {

func ComputeAPMStats(tracePayload *pb.TracePayload, calculator *sublayerCalculator, pushTime int64) *stats.Payload {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why public?

Copy link
Member Author

@mx-psi mx-psi Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to maintain the visibility of existing functions that were already public. I have double checked with the person who wrote this code and these and other functions can be made private, so I have done so in commit 1345a4d.

I can also make these changes in a separate PR if need be.

statsRawBuckets := make(map[int64]*stats.RawBucket)

bucketTS := pushTime - statsBucketDuration
Expand Down
29 changes: 23 additions & 6 deletions exporter/datadogexporter/traces_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package datadogexporter

import (
"context"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/trace/exportable/config/configdefs"
Expand All @@ -32,11 +33,27 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/utils"
)

// sublayerCalculator is thread safe wrapper of a sublayer
// calculator. Each trace exporter has a single sublayer
// calculator that is reused by each push
type sublayerCalculator struct {
sc *stats.SublayerCalculator
mutex sync.Mutex
}

// ComputeSublayers computes the sublayers of a trace
func (s *sublayerCalculator) ComputeSublayers(trace pb.Trace) []stats.SublayerValue {
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
s.mutex.Lock()
defer s.mutex.Unlock()
return s.sc.ComputeSublayers(trace)
}

type traceExporter struct {
logger *zap.Logger
cfg *config.Config
edgeConnection TraceEdgeConnection
obfuscator *obfuscate.Obfuscator
calculator *sublayerCalculator
client *datadog.Client
}

Expand Down Expand Up @@ -67,11 +84,13 @@ func newTraceExporter(params component.ExporterCreateParams, cfg *config.Config)
// https://github.com/DataDog/datadog-serverless-functions/blob/11f170eac105d66be30f18eda09eca791bc0d31b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go#L43
obfuscator := obfuscate.NewObfuscator(obfuscatorConfig)

calculator := &sublayerCalculator{sc: stats.NewSublayerCalculator()}
exporter := &traceExporter{
logger: params.Logger,
cfg: cfg,
edgeConnection: CreateTraceEdgeConnection(cfg.Traces.TCPAddr.Endpoint, cfg.API.Key, params.ApplicationStartInfo),
obfuscator: obfuscator,
calculator: calculator,
client: client,
}

Expand All @@ -94,12 +113,10 @@ func (exp *traceExporter) pushTraceData(
td pdata.Traces,
) (int, error) {

calculator := stats.NewSublayerCalculator()

// convert traces to datadog traces and group trace payloads by env
// we largely apply the same logic as the serverless implementation, simplified a bit
// https://github.com/DataDog/datadog-serverless-functions/blob/f5c3aedfec5ba223b11b76a4239fcbf35ec7d045/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go#L61-L83
ddTraces, err := ConvertToDatadogTd(td, calculator, exp.cfg)
ddTraces, err := ConvertToDatadogTd(td, exp.calculator, exp.cfg)

if err != nil {
exp.logger.Info("failed to convert traces", zap.Error(err))
Expand All @@ -117,7 +134,7 @@ func (exp *traceExporter) pushTraceData(
for _, ddTracePayload := range aggregatedTraces {
// currently we don't want to do retries since api endpoints may not dedupe in certain situations
// adding a helper function here to make custom retry logic easier in the future
exp.pushWithRetry(ctx, ddTracePayload, calculator, 1, pushTime, func() error {
exp.pushWithRetry(ctx, ddTracePayload, 1, pushTime, func() error {
return nil
})
}
Expand All @@ -131,15 +148,15 @@ func (exp *traceExporter) pushTraceData(
}

// gives us flexibility to add custom retry logic later
func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb.TracePayload, calculator *stats.SublayerCalculator, maxRetries int, pushTime int64, fn func() error) error {
func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb.TracePayload, maxRetries int, pushTime int64, fn func() error) error {
err := exp.edgeConnection.SendTraces(ctx, ddTracePayload, maxRetries)

if err != nil {
exp.logger.Info("failed to send traces", zap.Error(err))
}

// this is for generating metrics like hits, errors, and latency, it uses a separate endpoint than Traces
stats := ComputeAPMStats(ddTracePayload, calculator, pushTime)
stats := ComputeAPMStats(ddTracePayload, exp.calculator, pushTime)
errStats := exp.edgeConnection.SendStats(context.Background(), stats, maxRetries)

if errStats != nil {
Expand Down
5 changes: 2 additions & 3 deletions exporter/datadogexporter/translate_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"strconv"

"github.com/DataDog/datadog-agent/pkg/trace/exportable/pb"
"github.com/DataDog/datadog-agent/pkg/trace/exportable/stats"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
Expand All @@ -45,7 +44,7 @@ const (
)

// converts Traces into an array of datadog trace payloads grouped by env
func ConvertToDatadogTd(td pdata.Traces, calculator *stats.SublayerCalculator, cfg *config.Config) ([]*pb.TracePayload, error) {
func ConvertToDatadogTd(td pdata.Traces, calculator *sublayerCalculator, cfg *config.Config) ([]*pb.TracePayload, error) {
// TODO:
// do we apply other global tags, like version+service, to every span or only root spans of a service
// should globalTags['service'] take precedence over a trace's resource.service.name? I don't believe so, need to confirm
Expand Down Expand Up @@ -102,7 +101,7 @@ func AggregateTracePayloadsByEnv(tracePayloads []*pb.TracePayload) []*pb.TracePa
}

// converts a Trace's resource spans into a trace payload
func resourceSpansToDatadogSpans(rs pdata.ResourceSpans, calculator *stats.SublayerCalculator, hostname string, cfg *config.Config) (pb.TracePayload, error) {
func resourceSpansToDatadogSpans(rs pdata.ResourceSpans, calculator *sublayerCalculator, hostname string, cfg *config.Config) (pb.TracePayload, error) {
// get env tag
env := cfg.Env

Expand Down
20 changes: 12 additions & 8 deletions exporter/datadogexporter/translate_traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,14 @@ func NewResourceSpansData(mockTraceID [16]byte, mockSpanID [8]byte, mockParentSp
return rs
}

func newSublayerCalculator() *sublayerCalculator {
return &sublayerCalculator{sc: stats.NewSublayerCalculator()}
}

func TestConvertToDatadogTd(t *testing.T) {
traces := pdata.NewTraces()
traces.ResourceSpans().Resize(1)
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

outputTraces, err := ConvertToDatadogTd(traces, calculator, &config.Config{})

Expand All @@ -128,7 +132,7 @@ func TestConvertToDatadogTd(t *testing.T) {

func TestConvertToDatadogTdNoResourceSpans(t *testing.T) {
traces := pdata.NewTraces()
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

outputTraces, err := ConvertToDatadogTd(traces, calculator, &config.Config{})

Expand All @@ -137,7 +141,7 @@ func TestConvertToDatadogTdNoResourceSpans(t *testing.T) {
}

func TestObfuscation(t *testing.T) {
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

traces := pdata.NewTraces()
traces.ResourceSpans().Resize(1)
Expand Down Expand Up @@ -174,7 +178,7 @@ func TestObfuscation(t *testing.T) {

func TestBasicTracesTranslation(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

// generate mock trace, span and parent span ids
mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
Expand Down Expand Up @@ -241,7 +245,7 @@ func TestBasicTracesTranslation(t *testing.T) {

func TestTracesTranslationErrorsAndResource(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

// generate mock trace, span and parent span ids
mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
Expand Down Expand Up @@ -292,7 +296,7 @@ func TestTracesTranslationErrorsAndResource(t *testing.T) {

func TestTracesTranslationOkStatus(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

// generate mock trace, span and parent span ids
mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
Expand Down Expand Up @@ -344,7 +348,7 @@ func TestTracesTranslationOkStatus(t *testing.T) {
// ensure that the datadog span uses the configured unified service tags
func TestTracesTranslationConfig(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

// generate mock trace, span and parent span ids
mockTraceID := [16]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
Expand Down Expand Up @@ -394,7 +398,7 @@ func TestTracesTranslationConfig(t *testing.T) {
// ensure that the translation returns early if no resource instrumentation library spans
func TestTracesTranslationNoIls(t *testing.T) {
hostname := "testhostname"
calculator := stats.NewSublayerCalculator()
calculator := newSublayerCalculator()

rs := pdata.NewResourceSpans()

Expand Down