Skip to content

Commit

Permalink
[datadogexporter] Further improvements to traces exporter performance (
Browse files Browse the repository at this point in the history
…#1707)

- Predefine map capacity wherever possible
- Use a single HTTP client for all trace edge connections
- Use a single sublayer calculator for all APM stats calculations (one per `pushTraceData` since it is not thread-safe otherwise)

**Link to tracking Issue:** aws-observability/aws-otel-collector#179

**Testing:** 

- Updated unit tests
- Tested in an end to end environment
  • Loading branch information
mx-psi committed Nov 27, 2020
1 parent 1baaf1c commit fabde15
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 57 deletions.
3 changes: 1 addition & 2 deletions exporter/datadogexporter/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ const (
)

// ComputeAPMStats calculates the stats that should be submitted to APM about a given trace
func ComputeAPMStats(tracePayload *pb.TracePayload, pushTime int64) *stats.Payload {
func ComputeAPMStats(tracePayload *pb.TracePayload, calculator *stats.SublayerCalculator, pushTime int64) *stats.Payload {

statsRawBuckets := make(map[int64]*stats.RawBucket)

bucketTS := pushTime - statsBucketDuration
calculator := stats.NewSublayerCalculator()
for _, trace := range tracePayload.Traces {
spans := GetAnalyzedSpans(trace.Spans)
sublayers := calculator.ComputeSublayers(trace.Spans)
Expand Down
5 changes: 3 additions & 2 deletions exporter/datadogexporter/trace_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type traceEdgeConnection struct {
traceURL string
statsURL string
apiKey string
client *http.Client
startInfo component.ApplicationStartInfo
InsecureSkipVerify bool
}
Expand All @@ -56,6 +57,7 @@ func CreateTraceEdgeConnection(rootURL, apiKey string, startInfo component.Appli
statsURL: rootURL + "/api/v0.2/stats",
startInfo: startInfo,
apiKey: apiKey,
client: utils.NewHTTPClient(traceEdgeTimeout),
}
}

Expand Down Expand Up @@ -155,8 +157,7 @@ func (con *traceEdgeConnection) sendPayloadToTraceEdge(ctx context.Context, apiK
utils.SetDDHeaders(req.Header, con.startInfo, apiKey)
utils.SetExtraHeaders(req.Header, payload.Headers)

client := utils.NewHTTPClient(traceEdgeTimeout)
resp, err := client.Do(req)
resp, err := con.client.Do(req)

if err != nil {
// in this case, the payload and client are malformed in some way, so we should not retry
Expand Down
7 changes: 4 additions & 3 deletions exporter/datadogexporter/traces_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (exp *traceExporter) pushTraceData(
) (int, error) {

calculator := stats.NewSublayerCalculator()

// convert traces to datadog traces and group trace payloads by env
// we largely apply the same logic as the serverless implementation, simplified a bit
// https://github.com/DataDog/datadog-serverless-functions/blob/f5c3aedfec5ba223b11b76a4239fcbf35ec7d045/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go#L61-L83
Expand All @@ -116,7 +117,7 @@ func (exp *traceExporter) pushTraceData(
for _, ddTracePayload := range aggregatedTraces {
// currently we don't want to do retries since api endpoints may not dedupe in certain situations
// adding a helper function here to make custom retry logic easier in the future
exp.pushWithRetry(ctx, ddTracePayload, 1, pushTime, func() error {
exp.pushWithRetry(ctx, ddTracePayload, calculator, 1, pushTime, func() error {
return nil
})
}
Expand All @@ -130,15 +131,15 @@ func (exp *traceExporter) pushTraceData(
}

// gives us flexibility to add custom retry logic later
func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb.TracePayload, maxRetries int, pushTime int64, fn func() error) error {
func (exp *traceExporter) pushWithRetry(ctx context.Context, ddTracePayload *pb.TracePayload, calculator *stats.SublayerCalculator, maxRetries int, pushTime int64, fn func() error) error {
err := exp.edgeConnection.SendTraces(ctx, ddTracePayload, maxRetries)

if err != nil {
exp.logger.Info("failed to send traces", zap.Error(err))
}

// this is for generating metrics like hits, errors, and latency, it uses a separate endpoint than Traces
stats := ComputeAPMStats(ddTracePayload, pushTime)
stats := ComputeAPMStats(ddTracePayload, calculator, pushTime)
errStats := exp.edgeConnection.SendStats(context.Background(), stats, maxRetries)

if errStats != nil {
Expand Down
105 changes: 55 additions & 50 deletions exporter/datadogexporter/translate_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,10 @@ func spanToDatadogSpan(s pdata.Span,
if rsTagVersion := tags[conventions.AttributeServiceVersion]; rsTagVersion != "" {
tags[versionTag] = rsTagVersion
} else {
version := cfg.Version

// if no version tag exists, set it if provided via config
if version != "" {
if cfg.Version != "" {
if tagVersion := tags[versionTag]; tagVersion == "" {
tags[versionTag] = version
tags[versionTag] = cfg.Version
}
}
}
Expand All @@ -228,7 +226,9 @@ func spanToDatadogSpan(s pdata.Span,
duration = 0
}

datadogType := spanKindToDatadogType(s.Kind())
// by checking for error and setting error tags before creating datadog span
// we can then set Error field when creating and predefine a max meta capacity
isSpanError := getSpanErrorAndSetTags(s, tags)

span := &pb.Span{
TraceID: decodeAPMTraceID(s.TraceID().Bytes()),
Expand All @@ -239,54 +239,15 @@ func spanToDatadogSpan(s pdata.Span,
Start: int64(startTime),
Duration: duration,
Metrics: map[string]float64{},
Meta: map[string]string{},
Type: datadogType,
Meta: make(map[string]string, len(tags)),
Type: spanKindToDatadogType(s.Kind()),
Error: isSpanError,
}

if s.ParentSpanID().IsValid() {
span.ParentID = decodeAPMSpanID(s.ParentSpanID().Bytes())
}

// Set Span Status and any response or error details
if status := s.Status(); !status.IsNil() {
isError := okCode
switch status.Code() {
case pdata.StatusCodeOk:
isError = okCode
case pdata.StatusCodeError:
isError = errorCode
default:
isError = okCode
}

span.Error = isError

if isError == errorCode {
tags[ext.ErrorType] = "ERR_CODE_" + strconv.FormatInt(int64(status.Code()), 10)

// try to add a message if possible
if status.Message() != "" {
tags[ext.ErrorMsg] = status.Message()
} else {
tags[ext.ErrorMsg] = "ERR_CODE_" + strconv.FormatInt(int64(status.Code()), 10)
}
}

// if status code exists check if error depending on type
if tags[conventions.AttributeHTTPStatusCode] != "" {
httpStatusCode, err := strconv.ParseInt(tags[conventions.AttributeHTTPStatusCode], 10, 64)
if err == nil {
// for 500 type, always mark as error
if httpStatusCode >= 500 {
span.Error = errorCode
// for 400 type, mark as error if it is an http client
} else if s.Kind() == pdata.SpanKindCLIENT && httpStatusCode >= 400 {
span.Error = errorCode
}
}
}
}

// Set Attributes as Tags
for key, val := range tags {
setStringTag(span, key, val)
Expand All @@ -298,9 +259,10 @@ func spanToDatadogSpan(s pdata.Span,
func resourceToDatadogServiceNameAndAttributeMap(
resource pdata.Resource,
) (serviceName string, datadogTags map[string]string) {

datadogTags = make(map[string]string)
attrs := resource.Attributes()
// predefine capacity where possible
datadogTags = make(map[string]string, attrs.Len())

if attrs.Len() == 0 {
return tracetranslator.ResourceNoServiceName, datadogTags
}
Expand Down Expand Up @@ -334,7 +296,9 @@ func extractInstrumentationLibraryTags(il pdata.InstrumentationLibrary, datadogT
}

func aggregateSpanTags(span pdata.Span, datadogTags map[string]string) map[string]string {
spanTags := make(map[string]string)
// predefine capacity as at most the size attributes and global tags
// there may be overlap between the two.
spanTags := make(map[string]string, span.Attributes().Len()+len(datadogTags))
for key, val := range datadogTags {
spanTags[key] = val
}
Expand Down Expand Up @@ -453,3 +417,44 @@ func getDatadogResourceName(s pdata.Span, datadogTags map[string]string) string

return s.Name()
}

func getSpanErrorAndSetTags(s pdata.Span, tags map[string]string) int32 {
isError := okCode
// Set Span Status and any response or error details
if status := s.Status(); !status.IsNil() {
switch status.Code() {
case pdata.StatusCodeOk:
isError = okCode
case pdata.StatusCodeError:
isError = errorCode
default:
isError = okCode
}

if isError == errorCode {
tags[ext.ErrorType] = "ERR_CODE_" + strconv.FormatInt(int64(status.Code()), 10)

// try to add a message if possible
if status.Message() != "" {
tags[ext.ErrorMsg] = status.Message()
} else {
tags[ext.ErrorMsg] = "ERR_CODE_" + strconv.FormatInt(int64(status.Code()), 10)
}
}

// if status code exists check if error depending on type
if tags[conventions.AttributeHTTPStatusCode] != "" {
httpStatusCode, err := strconv.ParseInt(tags[conventions.AttributeHTTPStatusCode], 10, 64)
if err == nil {
// for 500 type, always mark as error
if httpStatusCode >= 500 {
isError = errorCode
// for 400 type, mark as error if it is an http client
} else if s.Kind() == pdata.SpanKindCLIENT && httpStatusCode >= 400 {
isError = errorCode
}
}
}
}
return isError
}

0 comments on commit fabde15

Please sign in to comment.