Skip to content

Commit

Permalink
Add AWS X-Ray Exporter (#41)
Browse files Browse the repository at this point in the history
* initial def of aws xray exporter

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of export handler

* fix formatting and lint errors

* initial dev of export handler

* added to component list

* fix issues raised during code review

* switch user attribute name to constant

* fixed additional code review issues

* fixed additional code review issues

* temporarily change package name

* temporarily change package name

* revert temporarily change package name

* fixed additional code review issues

* switched to constants defined in collector

* switched to status conversion functions defined in collector

* fix latest code review issues

* fix new static check issues

* fix test that breaks if no valid aws session available

* Update Collector Core dependency to latest `master` (#61)

- Updated go.mod and testbed/go.mod to point to latest `master` commit
  for github.com/open-telemetry/opentelemetry-collector dependencies and
  fixed the code as needed.
- Run `go mod tidy` on both go.mod files.

* Ported kinesis exporter from Omnition (#60)

Porting the existing kinesis exporter from Omnition's Otel distribution
to contrib.

Porting from: https://github.com/Omnition/omnition-opentelemetry-collector/tree/master/exporter/kinesis

* fix pull request issues

* Updated SAPM exporter dependencies (#59)

* Add E2E test for SAPM Receiver (#63)

- Added DataSender for SAPM protocol.
- Added SAPM protocol receiver test to TestTrace10K scenario.

* Updated component dependencies (#64)

* Fix build for go 1.13 (#65)

* Add E2E test for SAPM Exporter (#66)

SAPM trace test was using SAPM Receiver and Jaeger Exporter.
Now SAPM is used for both ends.

* initial def of aws xray exporter

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of xray data structures and converters

* initial dev of export handler

* fix formatting and lint errors

* initial dev of export handler

* added to component list

* fix issues raised during code review

* switch user attribute name to constant

* fixed additional code review issues

* fixed additional code review issues

* temporarily change package name

* temporarily change package name

* revert temporarily change package name

* fixed additional code review issues

* switched to constants defined in collector

* switched to status conversion functions defined in collector

* fix latest code review issues

* fix new static check issues

* fix test that breaks if no valid aws session available

* fix pull request issues
  • Loading branch information
kbrockhoff authored and Paulo Janotti committed Dec 16, 2019
1 parent b5057f5 commit 2b856d9
Show file tree
Hide file tree
Showing 30 changed files with 4,379 additions and 1 deletion.
1 change: 1 addition & 0 deletions exporter/awsxrayexporter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
64 changes: 64 additions & 0 deletions exporter/awsxrayexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# AWS X-Ray Tracing Exporter for OpenTelemetry Collector

This exporter converts OpenTelemetry spans to
[AWS X-Ray Segment Documents](https://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html)
and then sends them directly to X-Ray using the
[PutTraceSegements](https://docs.aws.amazon.com/xray/latest/api/API_PutTraceSegments.html) API.

## Data Conversion

Trace IDs and Span IDs are expected to be originally generated by either AWS API Gateway or AWS ALB and
propagated by them using the `X-Amzn-Trace-Id` HTTP header. However, other generation sources are
supported by replacing Trace IDs where necessary. For consistency, you may want to consider using the
X-Ray approach if generating Trace IDs within the application.

> AWS X-Ray IDs are the same size as W3C Trace Context IDs but differ in that the first 32 bits of a Trace ID
> is the Unix epoch time when the trace was started. Since X-Ray only allows submission of Trace IDs from the
> past 30 days, received Trace IDs are checked. If outside the allowed range, a replacement is generated using
> the current time.
The `http` object is populated when the `component` attribute value is `grpc` as well as `http`. Other
synchronous call types should also result in the `http` object being populated.

## AWS Specific Attributes

The following AWS-specific Span attributes are supported in addition to the standard names and values
defined in the OpenTelemetry Semantic Conventions.

| Attribute name | Notes and examples | Required? |
| :--------------- | :--------------------------------------------------------------------- | --------- |
| `aws.operation` | The name of the API action invoked against an AWS service or resource. | No |
| `aws.account_id` | The AWS account number if accessing resource in different account. | No |
| `aws.region` | The AWS region if accessing resource in different region from app. | No |
| `aws.request_id` | AWS-generated unique identifier for the request. | No |
| `aws.queue_url` | For operations on an Amazon SQS queue, the queue's URL. | No |
| `aws.table_name` | For operations on a DynamoDB table, the name of the table. | No |

Any of these values supplied are used to populate the `aws` object in addition to any relevant data supplied
by the Span Resource object. X-Ray uses this data to generate inferred segments for the remote APIs.

## Exporter Configuration

The following exporter configuration parameters are supported. They mirror and have the same affect as the
comparable AWS X-Ray Daemon configuration values.

| Name | Description | Default |
| :---------------- | :--------------------------------------------------------------------- | ------- |
| `num_workers` | Maximum number of concurrent calls to AWS X-Ray to upload documents. | 8 |
| `endpoint` | Optionally override the default X-Ray service endpoint. | |
| `request_timeout` | Number of seconds before timing out a request. | 30 |
| `max_retries` | Maximun number of attempts to post a batch before failing. | 2 |
| `no_verify_ssl` | Enable or disable TLS certificate verification. | false |
| `proxy_address` | Upload segments to AWS X-Ray through a proxy. | |
| `region` | Send segments to AWS X-Ray service in a specific region. | |
| `local_mode` | Local mode to skip EC2 instance metadata check. | false |
| `resource_arn` | Amazon Resource Name (ARN) of the AWS resource running the collector. | |
| `role_arn` | IAM role to upload segments to a different account. | |

## AWS Credential Configuration

This exporter follows default credential resolution for the
[aws-sdk-go](https://docs.aws.amazon.com/sdk-for-go/api/index.html).

Follow the [guidelines](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html) for the
credential configuration.
80 changes: 80 additions & 0 deletions exporter/awsxrayexporter/awsxray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awsxrayexporter

import (
"context"

"github.com/aws/aws-sdk-go/service/xray"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter/translator"
)

// NewTraceExporter creates an exporter.TraceExporter that converts to an X-Ray PutTraceSegments
// request and then posts the request to the configured region's X-Ray endpoint.
func NewTraceExporter(config configmodels.Exporter, logger *zap.Logger, cn connAttr) (exporter.TraceExporter, error) {
typeLog := zap.String("type", config.Type())
nameLog := zap.String("name", config.Name())
awsConfig, session, err := GetAWSConfigSession(logger, cn, config.(*Config))
if err != nil {
return nil, err
}
xrayClient := NewXRay(logger, awsConfig, session)
return exporterhelper.NewTraceExporter(
config,
func(ctx context.Context, td consumerdata.TraceData) (int, error) {
logger.Debug("TraceExporter", typeLog, nameLog, zap.Int("#spans", len(td.Spans)))
droppedSpans, input := assembleRequest(td, logger)
logger.Debug("request: " + input.String())
output, err := xrayClient.PutTraceSegments(input)
if config.(*Config).LocalMode {
err = nil // test mode, ignore errors
}
logger.Debug("response: " + output.String())
if output != nil && output.UnprocessedTraceSegments != nil {
droppedSpans += len(output.UnprocessedTraceSegments)
}
return droppedSpans, err
},
exporterhelper.WithTracing(true),
exporterhelper.WithMetrics(false),
exporterhelper.WithShutdown(logger.Sync),
)
}

func assembleRequest(td consumerdata.TraceData, logger *zap.Logger) (int, *xray.PutTraceSegmentsInput) {
documents := make([]*string, len(td.Spans))
droppedSpans := int(0)
for i, span := range td.Spans {
if span == nil || span.Name == nil {
droppedSpans++
continue
}
spanName := span.Name.Value
jsonStr, err := translator.MakeSegmentDocumentString(spanName, span)
if err != nil {
droppedSpans++
logger.Warn("Unable to convert span", zap.Error(err))
}
logger.Debug(jsonStr)
documents[i] = &jsonStr
}
return droppedSpans, &xray.PutTraceSegmentsInput{TraceSegmentDocuments: documents}
}
219 changes: 219 additions & 0 deletions exporter/awsxrayexporter/awsxray_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awsxrayexporter

import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"os"
"reflect"
"testing"
"time"

resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
semconventions "github.com/open-telemetry/opentelemetry-collector/translator/conventions"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func TestTraceExport(t *testing.T) {
traceExporter := initializeTraceExporter()
ctx := context.Background()
td := constructSpanData()
err := traceExporter.ConsumeTraceData(ctx, td)
assert.Nil(t, err)
}

func initializeTraceExporter() exporter.TraceExporter {
os.Setenv("AWS_ACCESS_KEY_ID", "AKIASSWVJUY4PZXXXXXX")
os.Setenv("AWS_SECRET_ACCESS_KEY", "XYrudg2H87u+ADAAq19Wqx3D41a09RsTXXXXXXXX")
os.Setenv("AWS_DEFAULT_REGION", "us-east-1")
os.Setenv("AWS_REGION", "us-east-1")
logger := zap.NewNop()
factory := Factory{}
config := factory.CreateDefaultConfig()
config.(*Config).Region = "us-east-1"
config.(*Config).LocalMode = true
mconn := new(mockConn)
mconn.sn, _ = getDefaultSession(logger)
traceExporter, err := NewTraceExporter(config, logger, mconn)
if err != nil {
panic(err)
}
return traceExporter
}

func constructSpanData() consumerdata.TraceData {
resource := constructResource()
spans := make([]*tracepb.Span, 2)
spans[0] = constructHTTPClientSpan()
spans[0].Resource = resource
spans[1] = constructHTTPServerSpan()
spans[1].Resource = resource
return consumerdata.TraceData{
Node: nil,
Resource: resource,
Spans: spans,
SourceFormat: "oc",
}
}

func constructResource() *resourcepb.Resource {
labels := make(map[string]string)
labels[semconventions.AttributeServiceName] = "signup_aggregator"
labels[semconventions.AttributeContainerName] = "signup_aggregator"
labels[semconventions.AttributeContainerImage] = "otel/signupaggregator"
labels[semconventions.AttributeContainerTag] = "v1"
labels[semconventions.AttributeCloudProvider] = "aws"
labels[semconventions.AttributeCloudAccount] = "999999998"
labels[semconventions.AttributeCloudRegion] = "us-west-2"
labels[semconventions.AttributeCloudZone] = "us-west-1b"
return &resourcepb.Resource{
Type: "container",
Labels: labels,
}
}

func constructHTTPClientSpan() *tracepb.Span {
attributes := make(map[string]interface{})
attributes[semconventions.AttributeComponent] = semconventions.ComponentTypeHTTP
attributes[semconventions.AttributeHTTPMethod] = "GET"
attributes[semconventions.AttributeHTTPURL] = "https://api.example.com/users/junit"
attributes[semconventions.AttributeHTTPStatusCode] = 200
endTime := time.Now().Round(time.Second)
startTime := endTime.Add(-90 * time.Second)
spanAttributes := constructSpanAttributes(attributes)

return &tracepb.Span{
TraceId: newTraceID(),
SpanId: newSegmentID(),
ParentSpanId: newSegmentID(),
Name: &tracepb.TruncatableString{Value: "/users/junit"},
Kind: tracepb.Span_CLIENT,
StartTime: convertTimeToTimestamp(startTime),
EndTime: convertTimeToTimestamp(endTime),
Status: &tracepb.Status{
Code: 0,
Message: "OK",
},
SameProcessAsParentSpan: &wrappers.BoolValue{Value: false},
Tracestate: &tracepb.Span_Tracestate{
Entries: []*tracepb.Span_Tracestate_Entry{
{Key: "foo", Value: "bar"},
{Key: "a", Value: "b"},
},
},
Attributes: &tracepb.Span_Attributes{
AttributeMap: spanAttributes,
},
}
}

func constructHTTPServerSpan() *tracepb.Span {
attributes := make(map[string]interface{})
attributes[semconventions.AttributeComponent] = semconventions.ComponentTypeHTTP
attributes[semconventions.AttributeHTTPMethod] = "GET"
attributes[semconventions.AttributeHTTPURL] = "https://api.example.com/users/junit"
attributes[semconventions.AttributeHTTPClientIP] = "192.168.15.32"
attributes[semconventions.AttributeHTTPStatusCode] = 200
endTime := time.Now().Round(time.Second)
startTime := endTime.Add(-90 * time.Second)
spanAttributes := constructSpanAttributes(attributes)

return &tracepb.Span{
TraceId: newTraceID(),
SpanId: newSegmentID(),
ParentSpanId: newSegmentID(),
Name: &tracepb.TruncatableString{Value: "/users/junit"},
Kind: tracepb.Span_SERVER,
StartTime: convertTimeToTimestamp(startTime),
EndTime: convertTimeToTimestamp(endTime),
Status: &tracepb.Status{
Code: 0,
Message: "OK",
},
SameProcessAsParentSpan: &wrappers.BoolValue{Value: false},
Tracestate: &tracepb.Span_Tracestate{
Entries: []*tracepb.Span_Tracestate_Entry{
{Key: "foo", Value: "bar"},
{Key: "a", Value: "b"},
},
},
Attributes: &tracepb.Span_Attributes{
AttributeMap: spanAttributes,
},
}
}

func convertTimeToTimestamp(t time.Time) *timestamp.Timestamp {
if t.IsZero() {
return nil
}
nanoTime := t.UnixNano()
return &timestamp.Timestamp{
Seconds: nanoTime / 1e9,
Nanos: int32(nanoTime % 1e9),
}
}

func constructSpanAttributes(attributes map[string]interface{}) map[string]*tracepb.AttributeValue {
attrs := make(map[string]*tracepb.AttributeValue)
for key, value := range attributes {
valType := reflect.TypeOf(value)
var attrVal tracepb.AttributeValue
if valType.Kind() == reflect.Int {
attrVal = tracepb.AttributeValue{Value: &tracepb.AttributeValue_IntValue{
IntValue: int64(value.(int)),
}}
} else if valType.Kind() == reflect.Int64 {
attrVal = tracepb.AttributeValue{Value: &tracepb.AttributeValue_IntValue{
IntValue: value.(int64),
}}
} else {
attrVal = tracepb.AttributeValue{Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{Value: fmt.Sprintf("%v", value)},
}}
}
attrs[key] = &attrVal
}
return attrs
}

func newTraceID() []byte {
var r [16]byte
epoch := time.Now().Unix()
binary.BigEndian.PutUint32(r[0:4], uint32(epoch))
_, err := rand.Read(r[4:])
if err != nil {
panic(err)
}
return r[:]
}

func newSegmentID() []byte {
var r [8]byte
_, err := rand.Read(r[:])
if err != nil {
panic(err)
}
return r[:]
}
Loading

0 comments on commit 2b856d9

Please sign in to comment.