Skip to content

Commit

Permalink
Add unit tests coverage for OTel model serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksmaus committed Jul 13, 2024
1 parent 76824b2 commit 59e272d
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 14 deletions.
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ behaviours, which may be configured through the following settings:
- `mode` (default=none): The fields naming mode. valid modes are:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
- `otel`: Try to map fields defined in the
- `otel`: Try to adopt fields defined in the
[OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/semantic-conventions) (version 1.22.0)
to Elastic's preferred "OTel-native" convention. :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.
Expand Down
15 changes: 2 additions & 13 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,6 @@ func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error {
return nil
}

// Set of prefixes for the OTel attributes that needs to stay flattened
var otelPrefixSet = map[string]struct{}{
"attributes.": struct{}{},
"resource.attributes.": struct{}{},
"scope.attributes.": struct{}{},
}

func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error {
objPrefix := ""
level := 0
Expand Down Expand Up @@ -345,12 +338,8 @@ func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error {
for {

// Otel mode serialization
if otel {
// Check the prefix
_, isOtelPrefix := otelPrefixSet[objPrefix]
if isOtelPrefix {
break
}
if otel && strings.HasPrefix(objPrefix, "attributes.") {
break
}

start := len(objPrefix)
Expand Down
302 changes: 302 additions & 0 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package elasticsearchexporter

import (
"encoding/hex"
"encoding/json"
"fmt"
"os"
"sort"
Expand Down Expand Up @@ -886,3 +888,303 @@ func TestMapLogAttributesToECS(t *testing.T) {
})
}
}

// JSON serializable structs for OTel test convenience
type OTelRecord struct {
TraceID OTelTraceID `json:"trace_id"`
SpanID OTelSpanID `json:"span_id"`
Timestamp time.Time `json:"@timestamp"`
ObservedTimestamp time.Time `json:"observed_timestamp"`
TraceFlags uint32 `json:"trace_flags"`
SeverityNumber int32 `json:"severity_number"`
SeverityText string `json:"severity_text"`
Attributes map[string]any `json:"attributes"`
DroppedAttributesCount uint32 `json:"dropped_attrbutes_count"`
Scope OTelScope `json:"scope"`
Resource OTelResource `json:"resource"`
Datastream OTelRecordDatastream `json:"data_stream"`
}

type OTelRecordDatastream struct {
Dataset string `json:"dataset"`
Namespace string `json:"namespace"`
Type string `json:"type"`
}

type OTelScope struct {
Name string `json:"name"`
Version string `json:"version"`
Attributes map[string]any `json:"attributes"`
DroppedAttributesCount uint32 `json:"dropped_attrbutes_count"`
Schema string `json:"schema"`
}

type OTelResource struct {
Attributes map[string]any `json:"attributes"`
DroppedAttributesCount uint32 `json:"dropped_attrbutes_count"`
Schema string `json:"schema"`
}

type OTelSpanID pcommon.SpanID

func (o OTelSpanID) MarshalJSON() ([]byte, error) {
return nil, nil
}

func (o *OTelSpanID) UnmarshalJSON(data []byte) error {
b, err := decodeOTelID(data)
if err != nil {
return err
}
copy(o[:], b)
return nil
}

type OTelTraceID pcommon.TraceID

func (o OTelTraceID) MarshalJSON() ([]byte, error) {
return nil, nil
}

func (o *OTelTraceID) UnmarshalJSON(data []byte) error {
b, err := decodeOTelID(data)
if err != nil {
return err
}
copy(o[:], b)
return nil
}

func decodeOTelID(data []byte) ([]byte, error) {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return nil, err
}

return hex.DecodeString(s)
}

func TestEncodeLogOtelMode(t *testing.T) {

tests := []struct {
name string
rec OTelRecord
wantFn func(OTelRecord) OTelRecord // Allows each test to customized the expectations from the original test record data
}{
{
name: "default", // Expecting default data_stream values
rec: buildOTelRecordTestData(t, nil),
wantFn: func(or OTelRecord) OTelRecord {
return assignDatastreamData(or)
},
},
{
name: "custom dataset",
rec: buildOTelRecordTestData(t, func(or OTelRecord) OTelRecord {
or.Attributes["data_stream.dataset"] = "custom"
return or
}),
wantFn: func(or OTelRecord) OTelRecord {
// Datastream attributes are expected to be deleted from under the attributes
deleteDatasetAttributes(or)
return assignDatastreamData(or, "", "custom.otel")
},
},
{
name: "custom dataset with otel suffix",
rec: buildOTelRecordTestData(t, func(or OTelRecord) OTelRecord {
or.Attributes["data_stream.dataset"] = "custom.otel"
return or
}),
wantFn: func(or OTelRecord) OTelRecord {
deleteDatasetAttributes(or)
return assignDatastreamData(or, "", "custom.otel.otel")
},
},
{
name: "custom dataset/namespace",
rec: buildOTelRecordTestData(t, func(or OTelRecord) OTelRecord {
or.Attributes["data_stream.dataset"] = "customds"
or.Attributes["data_stream.namespace"] = "customns"
return or
}),
wantFn: func(or OTelRecord) OTelRecord {
deleteDatasetAttributes(or)
return assignDatastreamData(or, "", "customds.otel", "customns")
},
},
{
name: "dataset attributes priority",
rec: buildOTelRecordTestData(t, func(or OTelRecord) OTelRecord {
or.Attributes["data_stream.dataset"] = "first"
or.Scope.Attributes["data_stream.dataset"] = "second"
or.Resource.Attributes["data_stream.dataset"] = "third"
return or
}),
wantFn: func(or OTelRecord) OTelRecord {
deleteDatasetAttributes(or)
return assignDatastreamData(or, "", "first.otel")
},
},
{
name: "dataset scope attribute priority",
rec: buildOTelRecordTestData(t, func(or OTelRecord) OTelRecord {
or.Scope.Attributes["data_stream.dataset"] = "second"
or.Resource.Attributes["data_stream.dataset"] = "third"
return or
}),
wantFn: func(or OTelRecord) OTelRecord {
deleteDatasetAttributes(or)
return assignDatastreamData(or, "", "second.otel")
},
},
{
name: "dataset resource attribute priority",
rec: buildOTelRecordTestData(t, func(or OTelRecord) OTelRecord {
or.Resource.Attributes["data_stream.dataset"] = "third"
return or
}),
wantFn: func(or OTelRecord) OTelRecord {
deleteDatasetAttributes(or)
return assignDatastreamData(or, "", "third.otel")
},
},
}

m := encodeModel{
dedot: true, // default
mode: MappingOTel,
}

for _, tc := range tests {
record, scope, resource := createTestOTelLogRecord(t, tc.rec)

// This sets the data_stream values default or derived from the record/scope/resources
routeLogRecord(record, scope, resource, "", true)

b, err := m.encodeLog(resource, tc.rec.Resource.Schema, record, scope, tc.rec.Scope.Schema)
require.NoError(t, err)

want := tc.rec
if tc.wantFn != nil {
want = tc.wantFn(want)
}

var got OTelRecord
err = json.Unmarshal(b, &got)

require.NoError(t, err)

assert.Equal(t, want, got)
}
}

// helper function that creates the OTel LogRecord from the test structure
func createTestOTelLogRecord(t *testing.T, rec OTelRecord) (plog.LogRecord, pcommon.InstrumentationScope, pcommon.Resource) {
record := plog.NewLogRecord()
record.SetTimestamp(pcommon.Timestamp(uint64(rec.Timestamp.UnixNano())))
record.SetObservedTimestamp(pcommon.Timestamp(uint64(rec.ObservedTimestamp.UnixNano())))

record.SetTraceID(pcommon.TraceID(rec.TraceID))
record.SetSpanID(pcommon.SpanID(rec.SpanID))
record.SetFlags(plog.LogRecordFlags(rec.TraceFlags))
record.SetSeverityNumber(plog.SeverityNumber(rec.SeverityNumber))
record.SetSeverityText(rec.SeverityText)
record.SetDroppedAttributesCount(rec.DroppedAttributesCount)

err := record.Attributes().FromRaw(rec.Attributes)
require.NoError(t, err)

scope := pcommon.NewInstrumentationScope()
scope.SetName(rec.Scope.Name)
scope.SetVersion(rec.Scope.Version)
scope.SetDroppedAttributesCount(rec.Scope.DroppedAttributesCount)
err = scope.Attributes().FromRaw(rec.Scope.Attributes)
require.NoError(t, err)

resource := pcommon.NewResource()
resource.SetDroppedAttributesCount(rec.Resource.DroppedAttributesCount)
err = resource.Attributes().FromRaw(rec.Resource.Attributes)
require.NoError(t, err)

return record, scope, resource
}

func buildOTelRecordTestData(t *testing.T, fn func(OTelRecord) OTelRecord) OTelRecord {

s := `{
"@timestamp": "2024-03-12T20:00:41.123456780Z",
"attributes": {
"event.name": "user-password-change",
"foo.some": "bar"
},
"dropped_attributes_count": 1,
"observed_timestamp": "2024-03-12T20:00:41.123456789Z",
"resource": {
"attributes": {
"host.name": "lebuntu",
"host.os.type": "linux"
},
"dropped_attributes_count": 2,
"schema_url": "https://opentelemetry.io/schemas/1.6.0"
},
"scope": {
"attributes": {
"attr.num": 1234,
"attr.str": "val1"
},
"dropped_attributes_count": 2,
"name": "foobar",
"schema_url": "https://opentelemetry.io/schemas/1.6.1",
"version": "42"
},
"severity_number": 17,
"severity_text": "ERROR",
"span_id": "0102030405060708",
"trace_flags": 1234,
"trace_id": "01020304050607080900010203040506"
}`

var record OTelRecord
err := json.Unmarshal([]byte(s), &record)
assert.NoError(t, err)
if fn != nil {
record = fn(record)
}
return record

}

func deleteDatasetAttributes(or OTelRecord) {
deleteDatasetAttributesFromMap(or.Attributes)
deleteDatasetAttributesFromMap(or.Scope.Attributes)
deleteDatasetAttributesFromMap(or.Resource.Attributes)
}

func deleteDatasetAttributesFromMap(m map[string]any) {
delete(m, "data_stream.dataset")
delete(m, "data_stream.namespace")
delete(m, "data_stream.type")
}

func assignDatastreamData(or OTelRecord, a ...string) OTelRecord {
r := OTelRecordDatastream{
Dataset: "generic.otel",
Namespace: "default",
Type: "logs",
}

if len(a) > 0 && a[0] != "" {
r.Type = a[0]
}
if len(a) > 1 && a[1] != "" {
r.Dataset = a[1]
}
if len(a) > 2 && a[2] != "" {
r.Namespace = a[2]
}

or.Datastream = r

return or
}

0 comments on commit 59e272d

Please sign in to comment.