Skip to content

Commit

Permalink
fix: add mutex to span attributes (#3371)
Browse files Browse the repository at this point in the history
* fix: use mutex in span attributes

* fix problems

* fix tests

* fix unmarshal

* lock mutex on Values()

* fix
  • Loading branch information
mathnogueira committed Nov 17, 2023
1 parent 7f39b4a commit a369d6e
Show file tree
Hide file tree
Showing 30 changed files with 302 additions and 209 deletions.
40 changes: 20 additions & 20 deletions agent/workers/datastores/awsxray.go
Expand Up @@ -194,7 +194,7 @@ func segToSpans(seg segment, traceID string, parent *traces.Span) ([]traces.Span
}

func generateSpan(seg *segment, parent *traces.Span) (traces.Span, error) {
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
span := traces.Span{
Parent: parent,
Name: *seg.Name,
Expand All @@ -206,9 +206,9 @@ func generateSpan(seg *segment, parent *traces.Span) (traces.Span, error) {
return span, err
}

attributes[traces.TracetestMetadataFieldParentID] = parentID.String()
attributes.Set(traces.TracetestMetadataFieldParentID, parentID.String())
} else if parent != nil {
attributes[traces.TracetestMetadataFieldParentID] = parent.ID.String()
attributes.Set(traces.TracetestMetadataFieldParentID, parent.ID.String())
}

// decode span id
Expand All @@ -229,7 +229,7 @@ func generateSpan(seg *segment, parent *traces.Span) (traces.Span, error) {
}

if seg.InProgress != nil {
attributes[AWSXRayInProgressAttribute] = strconv.FormatBool(*seg.InProgress)
attributes.Set(AWSXRayInProgressAttribute, strconv.FormatBool(*seg.InProgress))
}

attributes.SetPointerValue(conventions.AttributeEnduserID, seg.User)
Expand All @@ -241,7 +241,7 @@ func generateSpan(seg *segment, parent *traces.Span) (traces.Span, error) {
}

if seg.Traced != nil {
attributes[AWSXRayTracedAttribute] = strconv.FormatBool(*seg.Traced)
attributes.Set(AWSXRayTracedAttribute, strconv.FormatBool(*seg.Traced))
}

addAnnotations(seg.Annotations, attributes)
Expand Down Expand Up @@ -288,21 +288,21 @@ func addHTTP(seg *segment, attributes traces.Attributes) {
attributes.SetPointerValue(conventions.AttributeHTTPURL, req.URL)

if req.XForwardedFor != nil {
attributes[AWSXRayXForwardedForAttribute] = strconv.FormatBool(*req.XForwardedFor)
attributes.Set(AWSXRayXForwardedForAttribute, strconv.FormatBool(*req.XForwardedFor))
}
}

if resp := seg.HTTP.Response; resp != nil {
if resp.status != nil {
attributes[conventions.AttributeHTTPStatusCode] = fmt.Sprintf("%v", *resp.status)
attributes.Set(conventions.AttributeHTTPStatusCode, fmt.Sprintf("%v", *resp.status))
}

switch val := resp.contentLength.(type) {
case string:
attributes[conventions.AttributeHTTPResponseContentLength] = val
attributes.Set(conventions.AttributeHTTPResponseContentLength, val)
case float64:
lengthPointer := int64(val)
attributes[conventions.AttributeHTTPResponseContentLength] = fmt.Sprintf("%v", lengthPointer)
attributes.Set(conventions.AttributeHTTPResponseContentLength, fmt.Sprintf("%v", lengthPointer))
}
}
}
Expand All @@ -317,7 +317,7 @@ func addAWSToSpan(aws *aWSData, attrs traces.Attributes) {
attrs.SetPointerValue(AWSTableNameAttribute, aws.TableName)

if aws.Retries != nil {
attrs[AWSXrayRetriesAttribute] = fmt.Sprintf("%v", *aws.Retries)
attrs.Set(AWSXrayRetriesAttribute, fmt.Sprintf("%v", *aws.Retries))
}
}
}
Expand All @@ -333,8 +333,8 @@ func addSQLToSpan(sql *sQLData, attrs traces.Attributes) error {
return err
}

attrs[conventions.AttributeDBConnectionString] = dbURL
attrs[conventions.AttributeDBName] = dbName
attrs.Set(conventions.AttributeDBConnectionString, dbURL)
attrs.Set(conventions.AttributeDBName, dbName)
}
// not handling sql.ConnectionString for now because the X-Ray exporter
// does not support it
Expand All @@ -349,19 +349,19 @@ func addAnnotations(annos map[string]interface{}, attrs traces.Attributes) {
for k, v := range annos {
switch t := v.(type) {
case int:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
case int32:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
case int64:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
case string:
attrs[k] = t
attrs.Set(k, t)
case bool:
attrs[k] = strconv.FormatBool(t)
attrs.Set(k, strconv.FormatBool(t))
case float32:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
case float64:
attrs[k] = fmt.Sprintf("%v", t)
attrs.Set(k, fmt.Sprintf("%v", t))
default:
}
}
Expand All @@ -374,7 +374,7 @@ func addMetadata(meta map[string]map[string]interface{}, attrs traces.Attributes
if err != nil {
return err
}
attrs[AWSXraySegmentMetadataAttributePrefix+k] = string(val)
attrs.Set(AWSXraySegmentMetadataAttributePrefix+k, string(val))
}
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions agent/workers/datastores/azureappinsights.go
Expand Up @@ -242,7 +242,7 @@ func parseEvent(row spanRow) (traces.SpanEvent, error) {

event.Timestamp = timestamp

attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
rawAttributes := row.Get("customDimensions").(string)
err = json.Unmarshal([]byte(rawAttributes), &attributes)
if err != nil {
Expand All @@ -255,7 +255,7 @@ func parseEvent(row spanRow) (traces.SpanEvent, error) {
}

func parseRowToSpan(row spanRow) (traces.Span, error) {
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
span := traces.Span{
Attributes: attributes,
}
Expand Down Expand Up @@ -313,25 +313,25 @@ func parseSpanID(span *traces.Span, value any) error {
}

func parseAttributes(span *traces.Span, value any) error {
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
rawAttributes := value.(string)
err := json.Unmarshal([]byte(rawAttributes), &attributes)
if err != nil {
return fmt.Errorf("failed to parse attributes: %w", err)
}

for key, value := range attributes {
span.Attributes[key] = value
for key, value := range attributes.Values() {
span.Attributes.Set(key, value)
}
return nil
}

func parseParentID(span *traces.Span, value any) error {
rawParentID, ok := value.(string)
if ok {
span.Attributes[traces.TracetestMetadataFieldParentID] = rawParentID
span.Attributes.Set(traces.TracetestMetadataFieldParentID, rawParentID)
} else {
span.Attributes[traces.TracetestMetadataFieldParentID] = ""
span.Attributes.Set(traces.TracetestMetadataFieldParentID, "")
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions agent/workers/datastores/elasticsearchdb.go
Expand Up @@ -205,19 +205,19 @@ func convertElasticSearchSpanIntoSpan(input map[string]interface{}) traces.Span
endTime := startTime.Add(time.Microsecond * time.Duration(duration))

// Attributes
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()

for attrName, attrValue := range flatInput {
name := attrName
name = strings.ReplaceAll(name, "transaction.", "")
name = strings.ReplaceAll(name, "span.", "")
attributes[name] = fmt.Sprintf("%v", attrValue)
attributes.Set(name, fmt.Sprintf("%v", attrValue))
}

// ParentId
parentId := flatInput["parent.id"]
if parentId != nil {
attributes[traces.TracetestMetadataFieldParentID] = flatInput["parent.id"].(string)
attributes.Set(traces.TracetestMetadataFieldParentID, flatInput["parent.id"].(string))
}

return traces.Span{
Expand Down
8 changes: 4 additions & 4 deletions agent/workers/datastores/opensearchdb.go
Expand Up @@ -144,7 +144,7 @@ func convertOpensearchSpanIntoSpan(input map[string]interface{}) traces.Span {
startTime, _ := time.Parse(time.RFC3339, input["startTime"].(string))
endTime, _ := time.Parse(time.RFC3339, input["endTime"].(string))

attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()

for attrName, attrValue := range input {
if !strings.HasPrefix(attrName, "span.attributes.") && !strings.HasPrefix(attrName, "resource.attributes.") {
Expand All @@ -158,11 +158,11 @@ func convertOpensearchSpanIntoSpan(input map[string]interface{}) traces.Span {
// Opensearch's data-prepper replaces "." with "@". We have to revert it. Example:
// "service.name" becomes "service@name"
name = strings.ReplaceAll(name, "@", ".")
attributes[name] = fmt.Sprintf("%v", attrValue)
attributes.Set(name, fmt.Sprintf("%v", attrValue))
}

attributes[traces.TracetestMetadataFieldKind] = input["kind"].(string)
attributes[traces.TracetestMetadataFieldKind] = input["parentSpanId"].(string)
attributes.Set(traces.TracetestMetadataFieldKind, input["kind"].(string))
attributes.Set(traces.TracetestMetadataFieldKind, input["parentSpanId"].(string))

return traces.Span{
ID: spanId,
Expand Down
12 changes: 6 additions & 6 deletions agent/workers/datastores/signalfxdb.go
Expand Up @@ -169,18 +169,18 @@ func (db signalfxDB) getSegmentSpans(ctx context.Context, traceID string, timest
}

func convertSignalFXSpan(in signalFXSpan) traces.Span {
attributes := make(traces.Attributes, 0)
attributes := traces.NewAttributes()
for name, value := range in.Tags {
attributes[name] = value
attributes.Set(name, value)
}

for name, value := range in.ProcessTags {
attributes[name] = value
attributes.Set(name, value)
}

attributes[traces.TracetestMetadataFieldParentID] = in.ParentID
attributes[traces.TracetestMetadataFieldKind] = attributes["span.kind"]
delete(attributes, "span.kind")
attributes.Set(traces.TracetestMetadataFieldParentID, in.ParentID)
attributes.Set(traces.TracetestMetadataFieldKind, attributes.Get("span.kind"))
attributes.Delete("span.kind")

spanID, _ := trace.SpanIDFromHex(in.SpanID)
startTime, _ := time.Parse(time.RFC3339, in.StartTime)
Expand Down
4 changes: 2 additions & 2 deletions agent/workers/poller.go
Expand Up @@ -198,8 +198,8 @@ func convertProtoToDataStore(r *proto.DataStore) (*datastore.DataStore, error) {
func convertTraceInToProtoSpans(trace traces.Trace) []*proto.Span {
spans := make([]*proto.Span, 0, len(trace.Flat))
for _, span := range trace.Flat {
attributes := make([]*proto.KeyValuePair, 0, len(span.Attributes))
for name, value := range span.Attributes {
attributes := make([]*proto.KeyValuePair, 0, span.Attributes.Len())
for name, value := range span.Attributes.Values() {
attributes = append(attributes, &proto.KeyValuePair{
Key: name,
Value: value,
Expand Down
16 changes: 8 additions & 8 deletions server/assertions/selectors/selector_test.go
Expand Up @@ -22,38 +22,38 @@ var pokeshopTrace = traces.Trace{
ID: gen.TraceID(),
RootSpan: traces.Span{
ID: postImportSpanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"tracetest.span.type": "http",
"http.status_code": "201",
},
}),
Name: "POST /import",
Children: []*traces.Span{
{
ID: insertPokemonDatabaseSpanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"tracetest.span.type": "db",
"db.statement": "INSERT INTO pokemon (id) values (?)",
},
}),
Name: "Insert pokemon into database",
},
{
ID: getPokemonFromExternalAPISpanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop-worker",
"tracetest.span.type": "http",
"http.status_code": "200",
},
}),
Name: "Get pokemon from external API",
Children: []*traces.Span{
{
ID: updatePokemonDatabaseSpanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop-worker",
"tracetest.span.type": "db",
"db.statement": "UPDATE pokemon (name = ?) WHERE id = ?",
},
}),
Name: "Update pokemon on database",
},
},
Expand Down
20 changes: 10 additions & 10 deletions server/executor/assetion_executor_test.go
Expand Up @@ -39,10 +39,10 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"tracetest.span.duration": "2000",
},
}),
},
},
expectedAllPassed: true,
Expand Down Expand Up @@ -78,10 +78,10 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"tracetest.span.duration": "2000",
},
}),
},
},
expectedAllPassed: true,
Expand Down Expand Up @@ -124,11 +124,11 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"http.response.body": `{"id":52}`,
"tracetest.span.duration": "21000000",
},
}),
},
},
expectedAllPassed: true,
Expand Down Expand Up @@ -169,11 +169,11 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"http.response.body": `{"id":52}`,
"tracetest.span.duration": "25187564", // 25ms
},
}),
},
},
expectedAllPassed: true,
Expand Down Expand Up @@ -204,11 +204,11 @@ func TestAssertion(t *testing.T) {
trace: traces.Trace{
RootSpan: traces.Span{
ID: spanID,
Attributes: traces.Attributes{
Attributes: traces.NewAttributes(map[string]string{
"service.name": "Pokeshop",
"http.response.body": `{"id":52}`,
"tracetest.span.duration": "35000000", // 35ms
},
}),
},
},
expectedAllPassed: false,
Expand Down

0 comments on commit a369d6e

Please sign in to comment.