From 131671cf8962d5bac1612286b83bdcd20b9d5d91 Mon Sep 17 00:00:00 2001 From: Jimmie Han Date: Fri, 25 Mar 2022 13:15:48 +0800 Subject: [PATCH 1/4] Implement unmarshal traces with jsoniter, make 40x faster than jsonpb. Signed-off-by: Jimmie Han --- CHANGELOG.md | 1 + cmd/otelcorecol/go.mod | 3 + cmd/otelcorecol/go.sum | 3 + go.mod | 3 + go.sum | 3 + pdata/go.mod | 5 +- pdata/go.sum | 11 +- pdata/ptrace/jsoniter.go | 369 +++++++++++++++++++++++++++++++ pdata/ptrace/jsoniter_test.go | 401 ++++++++++++++++++++++++++++++++++ 9 files changed, 797 insertions(+), 2 deletions(-) create mode 100644 pdata/ptrace/jsoniter.go create mode 100644 pdata/ptrace/jsoniter_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a22084c03da..57f53215c79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ - Deprecate global `featuregate` funcs in favor of `GetRegistry` and a public `Registry` type (#5160) ### 💡 Enhancements 💡 +- Add `jsoniter` Unmarshaller (#4817) - Extend config.Map.Unmarshal hook to check map key string to any TextUnmarshaler not only ComponentID (#5244) - Collector will no longer print error with stack trace when the collector is shutdown due to a context cancel. (#5258) diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index d5adab9a601..00cd2613546 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -28,6 +28,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.3.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.15.1 // indirect github.com/knadh/koanf v1.4.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect @@ -36,6 +37,8 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.1.16 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/cmd/otelcorecol/go.sum b/cmd/otelcorecol/go.sum index ad1fd366028..eace5618a58 100644 --- a/cmd/otelcorecol/go.sum +++ b/cmd/otelcorecol/go.sum @@ -231,6 +231,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= @@ -278,9 +279,11 @@ github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mostynb/go-grpc-compression v1.1.16 h1:D9tGUINmcII049pxOj9dl32Fzhp26TrDVQXECoKJqQg= github.com/mostynb/go-grpc-compression v1.1.16/go.mod h1:xxa6UoYynYS2h+5HB/Hglu81iYAp87ARaNmhhwi0s1s= diff --git a/go.mod b/go.mod index a39100fc1ab..cb3d9e5c698 100644 --- a/go.mod +++ b/go.mod @@ -56,10 +56,13 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.12.1 // indirect diff --git a/go.sum b/go.sum index 999852aa46f..738b66fc6d4 100644 --- a/go.sum +++ b/go.sum @@ -231,6 +231,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= @@ -278,9 +279,11 @@ github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mostynb/go-grpc-compression v1.1.16 h1:D9tGUINmcII049pxOj9dl32Fzhp26TrDVQXECoKJqQg= github.com/mostynb/go-grpc-compression v1.1.16/go.mod h1:xxa6UoYynYS2h+5HB/Hglu81iYAp87ARaNmhhwi0s1s= diff --git a/pdata/go.mod b/pdata/go.mod index 75eafc1acf2..b3307d1c4fe 100644 --- a/pdata/go.mod +++ b/pdata/go.mod @@ -4,14 +4,17 @@ go 1.17 require ( github.com/gogo/protobuf v1.3.2 + github.com/json-iterator/go v1.1.12 github.com/stretchr/testify v1.7.1 google.golang.org/grpc v1.46.0 google.golang.org/protobuf v1.28.0 ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect diff --git a/pdata/go.sum b/pdata/go.sum index 1a03346f130..745e4cde04f 100644 --- a/pdata/go.sum +++ b/pdata/go.sum @@ -11,8 +11,9 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -46,15 +47,23 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= diff --git a/pdata/ptrace/jsoniter.go b/pdata/ptrace/jsoniter.go new file mode 100644 index 00000000000..68ace5efae7 --- /dev/null +++ b/pdata/ptrace/jsoniter.go @@ -0,0 +1,369 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ptrace // import "go.opentelemetry.io/collector/pdata/ptrace" + +import ( + "encoding/base64" + "fmt" + + jsoniter "github.com/json-iterator/go" + + "go.opentelemetry.io/collector/pdata/internal" + otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1" + otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1" +) + +// NewJSONIterUnmarshaler returns a model.Unmarshaler. Unmarshals from OTLP json bytes. +func NewJSONIterUnmarshaler() Unmarshaler { + return newJSONIterUnmarshaler() +} + +type jsonIterUnmarshaler struct { +} + +func newJSONIterUnmarshaler() *jsonIterUnmarshaler { + return &jsonIterUnmarshaler{} +} + +func (d *jsonIterUnmarshaler) UnmarshalTraces(buf []byte) (Traces, error) { + iter := jsoniter.ConfigFastest.BorrowIterator(buf) + td := readTraceData(iter) + err := iter.Error + jsoniter.ConfigFastest.ReturnIterator(iter) + return internal.TracesFromProto(td), err +} + +func readTraceData(iter *jsoniter.Iterator) otlptrace.TracesData { + td := otlptrace.TracesData{} + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "resourceSpans", "resource_spans": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + td.ResourceSpans = append(td.ResourceSpans, readResourceSpans(iter)) + return true + }) + default: + iter.ReportError("root", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return td +} + +func readResourceSpans(iter *jsoniter.Iterator) *otlptrace.ResourceSpans { + rs := &otlptrace.ResourceSpans{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "resource": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + rs.Resource.Attributes = append(rs.Resource.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + rs.Resource.DroppedAttributesCount = iter.ReadUint32() + default: + iter.ReportError("readResourceSpans.resource", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + case "instrumentationLibrarySpans", "instrumentation_library_spans", "scopeSpans", "scope_spans": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + rs.ScopeSpans = append(rs.ScopeSpans, + readInstrumentationLibrarySpans(iter)) + return true + }) + case "schemaUrl", "schema_url": + rs.SchemaUrl = iter.ReadString() + default: + iter.ReportError("readResourceSpans", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return rs +} + +func readInstrumentationLibrarySpans(iter *jsoniter.Iterator) *otlptrace.ScopeSpans { + ils := &otlptrace.ScopeSpans{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "instrumentationLibrary", "instrumentation_library", "scope": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "name": + ils.Scope.Name = iter.ReadString() + case "version": + ils.Scope.Version = iter.ReadString() + default: + iter.ReportError("readInstrumentationLibrarySpans.instrumentationLibrary", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + case "spans": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + ils.Spans = append(ils.Spans, readSpan(iter)) + return true + }) + case "schemaUrl", "schema_url": + ils.SchemaUrl = iter.ReadString() + default: + iter.ReportError("readInstrumentationLibrarySpans", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return ils +} + +func readSpan(iter *jsoniter.Iterator) *otlptrace.Span { + sp := &otlptrace.Span{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "traceId", "trace_id": + if err := sp.TraceId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpan.traceId", fmt.Sprintf("parse trace_id:%v", err)) + } + case "spanId", "span_id": + if err := sp.SpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpan.spanId", fmt.Sprintf("parse span_id:%v", err)) + } + case "traceState", "trace_state": + sp.TraceState = iter.ReadString() + case "parentSpanId", "parent_span_id": + if err := sp.ParentSpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpan.parentSpanId", fmt.Sprintf("parse parent_span_id:%v", err)) + } + case "name": + sp.Name = iter.ReadString() + case "kind": + sp.Kind = readSpanKind(iter) + case "startTimeUnixNano", "start_time_unix_nano": + sp.StartTimeUnixNano = uint64(readInt64(iter)) + case "endTimeUnixNano", "end_time_unix_nano": + sp.EndTimeUnixNano = uint64(readInt64(iter)) + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + sp.Attributes = append(sp.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + sp.DroppedAttributesCount = iter.ReadUint32() + case "events": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + sp.Events = append(sp.Events, readSpanEvent(iter)) + return true + }) + case "droppedEventsCount", "dropped_events_count": + sp.DroppedEventsCount = iter.ReadUint32() + case "links": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + sp.Links = append(sp.Links, readSpanLink(iter)) + return true + }) + case "droppedLinksCount", "dropped_links_count": + sp.DroppedLinksCount = iter.ReadUint32() + case "status": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "message": + sp.Status.Message = iter.ReadString() + case "code": + sp.Status.Code = readStatusCode(iter) + default: + iter.ReportError("readSpan.status", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + default: + iter.ReportError("readSpan", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return sp +} + +func readSpanLink(iter *jsoniter.Iterator) *otlptrace.Span_Link { + link := &otlptrace.Span_Link{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "traceId", "trace_id": + if err := link.TraceId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpanLink", fmt.Sprintf("parse trace_id:%v", err)) + } + case "spanId", "span_id": + if err := link.SpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpanLink", fmt.Sprintf("parse span_id:%v", err)) + } + case "traceState", "trace_state": + link.TraceState = iter.ReadString() + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + link.Attributes = append(link.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + link.DroppedAttributesCount = iter.ReadUint32() + default: + iter.ReportError("readSpanLink", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return link +} + +func readSpanEvent(iter *jsoniter.Iterator) *otlptrace.Span_Event { + event := &otlptrace.Span_Event{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "timeUnixNano", "time_unix_nano": + event.TimeUnixNano = uint64(readInt64(iter)) + case "name": + event.Name = iter.ReadString() + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + event.Attributes = append(event.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + event.DroppedAttributesCount = iter.ReadUint32() + default: + iter.ReportError("readSpanEvent", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return event +} + +func readAttribute(iter *jsoniter.Iterator) otlpcommon.KeyValue { + var ( + key string + value otlpcommon.AnyValue + ) + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "key": + key = iter.ReadString() + case "value": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + value = readAnyValue(iter, f) + return true + }) + default: + iter.ReportError("readAttribute", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return otlpcommon.KeyValue{ + Key: key, + Value: value, + } +} + +func readAnyValue(iter *jsoniter.Iterator, f string) otlpcommon.AnyValue { + switch f { + case "stringValue", "string_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: iter.ReadString(), + }, + } + case "boolValue", "bool_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_BoolValue{ + BoolValue: iter.ReadBool(), + }, + } + case "intValue", "int_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_IntValue{ + IntValue: readInt64(iter), + }, + } + case "doubleValue", "double_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_DoubleValue{ + DoubleValue: iter.ReadFloat64(), + }, + } + case "bytesValue", "bytes_value": + v, err := base64.StdEncoding.DecodeString(iter.ReadString()) + if err != nil { + iter.ReportError("bytesValue", fmt.Sprintf("base64 decode:%v", err)) + return otlpcommon.AnyValue{} + } + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_BytesValue{ + BytesValue: v, + }, + } + case "arrayValue", "array_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_ArrayValue{ + ArrayValue: readArray(iter), + }, + } + default: + iter.ReportError("readAnyValue", fmt.Sprintf("unknown field:%v", f)) + return otlpcommon.AnyValue{} + } +} + +func readArray(iter *jsoniter.Iterator) *otlpcommon.ArrayValue { + v := &otlpcommon.ArrayValue{} + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "values": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + v.Values = append(v.Values, readAnyValue(iter, f)) + return true + }) + return true + }) + default: + iter.ReportError("readArray", fmt.Sprintf("unknown field:%s", f)) + } + return true + }) + return v +} + +func readInt64(iter *jsoniter.Iterator) int64 { + return iter.ReadAny().ToInt64() +} + +func readSpanKind(iter *jsoniter.Iterator) otlptrace.Span_SpanKind { + any := iter.ReadAny() + if v := any.ToInt(); v > 0 { + return otlptrace.Span_SpanKind(v) + } + v := any.ToString() + return otlptrace.Span_SpanKind(otlptrace.Span_SpanKind_value[v]) +} + +func readStatusCode(iter *jsoniter.Iterator) otlptrace.Status_StatusCode { + any := iter.ReadAny() + if v := any.ToInt(); v > 0 { + return otlptrace.Status_StatusCode(v) + } + v := any.ToString() + return otlptrace.Status_StatusCode(otlptrace.Status_StatusCode_value[v]) +} diff --git a/pdata/ptrace/jsoniter_test.go b/pdata/ptrace/jsoniter_test.go new file mode 100644 index 00000000000..73ef4e494d8 --- /dev/null +++ b/pdata/ptrace/jsoniter_test.go @@ -0,0 +1,401 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ptrace + +import ( + "fmt" + "testing" + "time" + + jsoniter "github.com/json-iterator/go" + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/pdata/internal" + otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1" +) + +var tracesOTLPFull = func() Traces { + traceID := internal.NewTraceID([16]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}) + spanID := internal.NewSpanID([8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}) + td := NewTraces() + // Add ResourceSpans. + rs := td.ResourceSpans().AppendEmpty() + rs.SetSchemaUrl("schemaURL") + // Add resource. + rs.Resource().Attributes().UpsertString("host.name", "testHost") + rs.Resource().Attributes().UpsertString("service.name", "testService") + rs.Resource().SetDroppedAttributesCount(1) + // Add InstrumentationLibrarySpans. + il := rs.ScopeSpans().AppendEmpty() + il.Scope().SetName("instrumentation name") + il.Scope().SetVersion("instrumentation version") + il.SetSchemaUrl("schemaURL") + // Add spans. + sp := il.Spans().AppendEmpty() + sp.SetName("testSpan") + sp.SetKind(internal.SpanKindClient) + sp.SetDroppedAttributesCount(1) + sp.SetStartTimestamp(internal.NewTimestampFromTime(time.Now())) + sp.SetTraceID(traceID) + sp.SetSpanID(spanID) + sp.SetDroppedEventsCount(1) + sp.SetDroppedLinksCount(1) + sp.SetEndTimestamp(internal.NewTimestampFromTime(time.Now())) + sp.SetParentSpanID(spanID) + sp.SetTraceState("state") + sp.Status().SetCode(internal.StatusCodeOk) + sp.Status().SetMessage("message") + // Add attributes. + sp.Attributes().UpsertString("string", "value") + sp.Attributes().UpsertBool("bool", true) + sp.Attributes().UpsertInt("int", 1) + sp.Attributes().UpsertDouble("double", 1.1) + sp.Attributes().UpsertBytes("bytes", []byte("foo")) + arr := internal.NewValueSlice() + arr.SliceVal().AppendEmpty().SetIntVal(1) + sp.Attributes().Upsert("array", arr) + // Add events. + event := sp.Events().AppendEmpty() + event.SetName("eventName") + event.SetTimestamp(internal.NewTimestampFromTime(time.Now())) + event.SetDroppedAttributesCount(1) + event.Attributes().UpsertString("string", "value") + event.Attributes().UpsertBool("bool", true) + event.Attributes().UpsertInt("int", 1) + event.Attributes().UpsertDouble("double", 1.1) + event.Attributes().UpsertBytes("bytes", []byte("foo")) + // Add links. + link := sp.Links().AppendEmpty() + link.SetTraceState("state") + link.SetTraceID(traceID) + link.SetSpanID(spanID) + link.SetDroppedAttributesCount(1) + link.Attributes().UpsertString("string", "value") + link.Attributes().UpsertBool("bool", true) + link.Attributes().UpsertInt("int", 1) + link.Attributes().UpsertDouble("double", 1.1) + link.Attributes().UpsertBytes("bytes", []byte("foo")) + // Add another span. + sp2 := il.Spans().AppendEmpty() + sp2.SetName("testSpan2") + return td +}() + +func TestJSONIter(t *testing.T) { + encoder := NewJSONMarshaler() + jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) + assert.NoError(t, err) + + decoder := newJSONIterUnmarshaler() + got, err := decoder.UnmarshalTraces(jsonBuf) + assert.NoError(t, err) + assert.EqualValues(t, tracesOTLPFull, got) +} + +func BenchmarkJSONUnmarshal(b *testing.B) { + b.ReportAllocs() + + encoder := NewJSONMarshaler() + jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) + assert.NoError(b, err) + decoder := newJSONUnmarshaler() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := decoder.UnmarshalTraces(jsonBuf) + assert.NoError(b, err) + } + }) +} + +func BenchmarkTracesJSONiterUnmarshal(b *testing.B) { + b.ReportAllocs() + + encoder := NewJSONMarshaler() + jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) + assert.NoError(b, err) + decoder := newJSONIterUnmarshaler() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := decoder.UnmarshalTraces(jsonBuf) + assert.NoError(b, err) + } + }) +} + +func TestReadInt64(t *testing.T) { + var data = `{"intAsNumber":1,"intAsString":"1"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(data)) + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "intAsNumber": + v := readInt64(iter) + assert.Equal(t, int64(1), v) + case "intAsString": + v := readInt64(iter) + assert.Equal(t, int64(1), v) + } + return true + }) + assert.NoError(t, iter.Error) +} + +func Test_readTraceData(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readTraceData(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readResourceSpans(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readResourceSpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("unknown resource field", func(t *testing.T) { + jsonStr := `{"resource":{"extra":""}}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readResourceSpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readInstrumentationLibrarySpans(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readInstrumentationLibrarySpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("unknown instrumentationLibrary field", func(t *testing.T) { + jsonStr := `{"instrumentationLibrary":{"extra":""}}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readInstrumentationLibrarySpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readSpan(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("unknown status field", func(t *testing.T) { + jsonStr := `{"status":{"extra":""}}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("invalid trace_id field", func(t *testing.T) { + jsonStr := `{"trace_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse trace_id") + } + }) + t.Run("invalid span_id field", func(t *testing.T) { + jsonStr := `{"span_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse span_id") + } + }) + t.Run("invalid parent_span_id field", func(t *testing.T) { + jsonStr := `{"parent_span_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse parent_span_id") + } + }) +} + +func Test_readSpanLink(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanLink(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("invalid trace_id field", func(t *testing.T) { + jsonStr := `{"trace_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanLink(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse trace_id") + } + }) + t.Run("invalid span_id field", func(t *testing.T) { + jsonStr := `{"span_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanLink(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse span_id") + } + }) +} + +func Test_readSpanEvent(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanEvent(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readAttribute(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readAttribute(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readAnyValue(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readAnyValue(iter, "") + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) + t.Run("invalid bytesValue", func(t *testing.T) { + jsonStr := `"--"` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readAnyValue(iter, "bytesValue") + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "base64") + } + }) +} + +func Test_readArray(t *testing.T) { + t.Run("unknown field", func(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readArray(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } + }) +} + +func Test_readSpanKind(t *testing.T) { + tests := []struct { + name string + jsonStr string + want otlptrace.Span_SpanKind + }{ + { + name: "string", + jsonStr: fmt.Sprintf(`"%s"`, otlptrace.Span_SPAN_KIND_INTERNAL.String()), + want: otlptrace.Span_SPAN_KIND_INTERNAL, + }, + { + name: "int", + jsonStr: fmt.Sprintf("%d", otlptrace.Span_SPAN_KIND_INTERNAL), + want: otlptrace.Span_SPAN_KIND_INTERNAL, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(tt.jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + if got := readSpanKind(iter); got != tt.want { + t.Errorf("readSpanKind() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_readStatusCode(t *testing.T) { + tests := []struct { + name string + jsonStr string + want otlptrace.Status_StatusCode + }{ + { + name: "string", + jsonStr: fmt.Sprintf(`"%s"`, otlptrace.Status_STATUS_CODE_ERROR.String()), + want: otlptrace.Status_STATUS_CODE_ERROR, + }, + { + name: "int", + jsonStr: fmt.Sprintf("%d", otlptrace.Status_STATUS_CODE_ERROR), + want: otlptrace.Status_STATUS_CODE_ERROR, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(tt.jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + if got := readStatusCode(iter); got != tt.want { + t.Errorf("readStatusCode() = %v, want %v", got, tt.want) + } + }) + } +} From b70e19d60465d936f3e97682094eeef875134c3a Mon Sep 17 00:00:00 2001 From: Jimmie Han Date: Wed, 27 Apr 2022 19:53:43 +0800 Subject: [PATCH 2/4] ptrace: json unmarshaller use defer --- pdata/ptrace/jsoniter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pdata/ptrace/jsoniter.go b/pdata/ptrace/jsoniter.go index 68ace5efae7..b6e41bf04e6 100644 --- a/pdata/ptrace/jsoniter.go +++ b/pdata/ptrace/jsoniter.go @@ -39,9 +39,9 @@ func newJSONIterUnmarshaler() *jsonIterUnmarshaler { func (d *jsonIterUnmarshaler) UnmarshalTraces(buf []byte) (Traces, error) { iter := jsoniter.ConfigFastest.BorrowIterator(buf) + defer jsoniter.ConfigFastest.ReturnIterator(iter) td := readTraceData(iter) err := iter.Error - jsoniter.ConfigFastest.ReturnIterator(iter) return internal.TracesFromProto(td), err } From 8ece8e7f3cd9e5c3107827ce93920397a7671408 Mon Sep 17 00:00:00 2001 From: Jimmie Han Date: Sun, 1 May 2022 22:26:53 +0800 Subject: [PATCH 3/4] Use jsoniter unmarshaller as default trace unmarshaler. Update unit test style design. --- pdata/ptrace/json.go | 353 +++++++++++++++++++++++++++++- pdata/ptrace/json_test.go | 368 +++++++++++++++++++++++++++++++ pdata/ptrace/jsoniter.go | 369 ------------------------------- pdata/ptrace/jsoniter_test.go | 401 ---------------------------------- 4 files changed, 716 insertions(+), 775 deletions(-) delete mode 100644 pdata/ptrace/jsoniter.go delete mode 100644 pdata/ptrace/jsoniter_test.go diff --git a/pdata/ptrace/json.go b/pdata/ptrace/json.go index befe1d9d386..3b0b726bbc9 100644 --- a/pdata/ptrace/json.go +++ b/pdata/ptrace/json.go @@ -16,10 +16,14 @@ package ptrace // import "go.opentelemetry.io/collector/pdata/ptrace" import ( "bytes" + "encoding/base64" + "fmt" "github.com/gogo/protobuf/jsonpb" + jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/pdata/internal" + otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1" otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1" "go.opentelemetry.io/collector/pdata/internal/otlp" ) @@ -44,20 +48,359 @@ func (e *jsonMarshaler) MarshalTraces(td Traces) ([]byte, error) { return buf.Bytes(), err } -type jsonUnmarshaler struct { - delegate jsonpb.Unmarshaler -} - // NewJSONUnmarshaler returns a model.Unmarshaler. Unmarshals from OTLP json bytes. func NewJSONUnmarshaler() Unmarshaler { return newJSONUnmarshaler() } +type jsonUnmarshaler struct { +} + func newJSONUnmarshaler() *jsonUnmarshaler { - return &jsonUnmarshaler{delegate: jsonpb.Unmarshaler{}} + return &jsonUnmarshaler{} } func (d *jsonUnmarshaler) UnmarshalTraces(buf []byte) (Traces, error) { + iter := jsoniter.ConfigFastest.BorrowIterator(buf) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + td := readTraceData(iter) + err := iter.Error + return internal.TracesFromProto(td), err +} + +func readTraceData(iter *jsoniter.Iterator) otlptrace.TracesData { + td := otlptrace.TracesData{} + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "resourceSpans", "resource_spans": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + td.ResourceSpans = append(td.ResourceSpans, readResourceSpans(iter)) + return true + }) + default: + iter.ReportError("root", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return td +} + +func readResourceSpans(iter *jsoniter.Iterator) *otlptrace.ResourceSpans { + rs := &otlptrace.ResourceSpans{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "resource": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + rs.Resource.Attributes = append(rs.Resource.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + rs.Resource.DroppedAttributesCount = iter.ReadUint32() + default: + iter.ReportError("readResourceSpans.resource", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + case "instrumentationLibrarySpans", "instrumentation_library_spans", "scopeSpans", "scope_spans": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + rs.ScopeSpans = append(rs.ScopeSpans, + readInstrumentationLibrarySpans(iter)) + return true + }) + case "schemaUrl", "schema_url": + rs.SchemaUrl = iter.ReadString() + default: + iter.ReportError("readResourceSpans", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return rs +} + +func readInstrumentationLibrarySpans(iter *jsoniter.Iterator) *otlptrace.ScopeSpans { + ils := &otlptrace.ScopeSpans{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "instrumentationLibrary", "instrumentation_library", "scope": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "name": + ils.Scope.Name = iter.ReadString() + case "version": + ils.Scope.Version = iter.ReadString() + default: + iter.ReportError("readInstrumentationLibrarySpans.instrumentationLibrary", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + case "spans": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + ils.Spans = append(ils.Spans, readSpan(iter)) + return true + }) + case "schemaUrl", "schema_url": + ils.SchemaUrl = iter.ReadString() + default: + iter.ReportError("readInstrumentationLibrarySpans", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return ils +} + +func readSpan(iter *jsoniter.Iterator) *otlptrace.Span { + sp := &otlptrace.Span{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "traceId", "trace_id": + if err := sp.TraceId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpan.traceId", fmt.Sprintf("parse trace_id:%v", err)) + } + case "spanId", "span_id": + if err := sp.SpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpan.spanId", fmt.Sprintf("parse span_id:%v", err)) + } + case "traceState", "trace_state": + sp.TraceState = iter.ReadString() + case "parentSpanId", "parent_span_id": + if err := sp.ParentSpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpan.parentSpanId", fmt.Sprintf("parse parent_span_id:%v", err)) + } + case "name": + sp.Name = iter.ReadString() + case "kind": + sp.Kind = readSpanKind(iter) + case "startTimeUnixNano", "start_time_unix_nano": + sp.StartTimeUnixNano = uint64(readInt64(iter)) + case "endTimeUnixNano", "end_time_unix_nano": + sp.EndTimeUnixNano = uint64(readInt64(iter)) + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + sp.Attributes = append(sp.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + sp.DroppedAttributesCount = iter.ReadUint32() + case "events": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + sp.Events = append(sp.Events, readSpanEvent(iter)) + return true + }) + case "droppedEventsCount", "dropped_events_count": + sp.DroppedEventsCount = iter.ReadUint32() + case "links": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + sp.Links = append(sp.Links, readSpanLink(iter)) + return true + }) + case "droppedLinksCount", "dropped_links_count": + sp.DroppedLinksCount = iter.ReadUint32() + case "status": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "message": + sp.Status.Message = iter.ReadString() + case "code": + sp.Status.Code = readStatusCode(iter) + default: + iter.ReportError("readSpan.status", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + default: + iter.ReportError("readSpan", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return sp +} + +func readSpanLink(iter *jsoniter.Iterator) *otlptrace.Span_Link { + link := &otlptrace.Span_Link{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "traceId", "trace_id": + if err := link.TraceId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpanLink", fmt.Sprintf("parse trace_id:%v", err)) + } + case "spanId", "span_id": + if err := link.SpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { + iter.ReportError("readSpanLink", fmt.Sprintf("parse span_id:%v", err)) + } + case "traceState", "trace_state": + link.TraceState = iter.ReadString() + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + link.Attributes = append(link.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + link.DroppedAttributesCount = iter.ReadUint32() + default: + iter.ReportError("readSpanLink", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return link +} + +func readSpanEvent(iter *jsoniter.Iterator) *otlptrace.Span_Event { + event := &otlptrace.Span_Event{} + + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "timeUnixNano", "time_unix_nano": + event.TimeUnixNano = uint64(readInt64(iter)) + case "name": + event.Name = iter.ReadString() + case "attributes": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + event.Attributes = append(event.Attributes, readAttribute(iter)) + return true + }) + case "droppedAttributesCount", "dropped_attributes_count": + event.DroppedAttributesCount = iter.ReadUint32() + default: + iter.ReportError("readSpanEvent", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return event +} + +func readAttribute(iter *jsoniter.Iterator) otlpcommon.KeyValue { + var ( + key string + value otlpcommon.AnyValue + ) + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "key": + key = iter.ReadString() + case "value": + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + value = readAnyValue(iter, f) + return true + }) + default: + iter.ReportError("readAttribute", fmt.Sprintf("unknown field:%v", f)) + } + return true + }) + return otlpcommon.KeyValue{ + Key: key, + Value: value, + } +} + +func readAnyValue(iter *jsoniter.Iterator, f string) otlpcommon.AnyValue { + switch f { + case "stringValue", "string_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_StringValue{ + StringValue: iter.ReadString(), + }, + } + case "boolValue", "bool_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_BoolValue{ + BoolValue: iter.ReadBool(), + }, + } + case "intValue", "int_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_IntValue{ + IntValue: readInt64(iter), + }, + } + case "doubleValue", "double_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_DoubleValue{ + DoubleValue: iter.ReadFloat64(), + }, + } + case "bytesValue", "bytes_value": + v, err := base64.StdEncoding.DecodeString(iter.ReadString()) + if err != nil { + iter.ReportError("bytesValue", fmt.Sprintf("base64 decode:%v", err)) + return otlpcommon.AnyValue{} + } + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_BytesValue{ + BytesValue: v, + }, + } + case "arrayValue", "array_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_ArrayValue{ + ArrayValue: readArray(iter), + }, + } + default: + iter.ReportError("readAnyValue", fmt.Sprintf("unknown field:%v", f)) + return otlpcommon.AnyValue{} + } +} + +func readArray(iter *jsoniter.Iterator) *otlpcommon.ArrayValue { + v := &otlpcommon.ArrayValue{} + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "values": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + v.Values = append(v.Values, readAnyValue(iter, f)) + return true + }) + return true + }) + default: + iter.ReportError("readArray", fmt.Sprintf("unknown field:%s", f)) + } + return true + }) + return v +} + +func readInt64(iter *jsoniter.Iterator) int64 { + return iter.ReadAny().ToInt64() +} + +func readSpanKind(iter *jsoniter.Iterator) otlptrace.Span_SpanKind { + any := iter.ReadAny() + if v := any.ToInt(); v > 0 { + return otlptrace.Span_SpanKind(v) + } + v := any.ToString() + return otlptrace.Span_SpanKind(otlptrace.Span_SpanKind_value[v]) +} + +func readStatusCode(iter *jsoniter.Iterator) otlptrace.Status_StatusCode { + any := iter.ReadAny() + if v := any.ToInt(); v > 0 { + return otlptrace.Status_StatusCode(v) + } + v := any.ToString() + return otlptrace.Status_StatusCode(otlptrace.Status_StatusCode_value[v]) +} + +// jsonpbUnmarshaler use standard `jsonpb.Unmarshaler` for benchmark and unit test. +type jsonpbUnmarshaler struct { + delegate jsonpb.Unmarshaler +} + +func newJSONPBUnmarshaler() *jsonpbUnmarshaler { + return &jsonpbUnmarshaler{delegate: jsonpb.Unmarshaler{}} +} + +func (d *jsonpbUnmarshaler) UnmarshalTraces(buf []byte) (Traces, error) { td := otlptrace.TracesData{} if err := d.delegate.Unmarshal(bytes.NewReader(buf), &td); err != nil { return Traces{}, err diff --git a/pdata/ptrace/json_test.go b/pdata/ptrace/json_test.go index 78a6eaf1fc1..2457dcb190d 100644 --- a/pdata/ptrace/json_test.go +++ b/pdata/ptrace/json_test.go @@ -15,9 +15,15 @@ package ptrace import ( + "fmt" "testing" + "time" + jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/pdata/internal" + otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1" ) var tracesOTLP = func() Traces { @@ -52,3 +58,365 @@ func TestTracesJSON_Marshal(t *testing.T) { assert.NoError(t, err) assert.Equal(t, tracesJSON, string(jsonBuf)) } + +var tracesOTLPFull = func() Traces { + traceID := internal.NewTraceID([16]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}) + spanID := internal.NewSpanID([8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}) + td := NewTraces() + // Add ResourceSpans. + rs := td.ResourceSpans().AppendEmpty() + rs.SetSchemaUrl("schemaURL") + // Add resource. + rs.Resource().Attributes().UpsertString("host.name", "testHost") + rs.Resource().Attributes().UpsertString("service.name", "testService") + rs.Resource().SetDroppedAttributesCount(1) + // Add InstrumentationLibrarySpans. + il := rs.ScopeSpans().AppendEmpty() + il.Scope().SetName("instrumentation name") + il.Scope().SetVersion("instrumentation version") + il.SetSchemaUrl("schemaURL") + // Add spans. + sp := il.Spans().AppendEmpty() + sp.SetName("testSpan") + sp.SetKind(internal.SpanKindClient) + sp.SetDroppedAttributesCount(1) + sp.SetStartTimestamp(internal.NewTimestampFromTime(time.Now())) + sp.SetTraceID(traceID) + sp.SetSpanID(spanID) + sp.SetDroppedEventsCount(1) + sp.SetDroppedLinksCount(1) + sp.SetEndTimestamp(internal.NewTimestampFromTime(time.Now())) + sp.SetParentSpanID(spanID) + sp.SetTraceState("state") + sp.Status().SetCode(internal.StatusCodeOk) + sp.Status().SetMessage("message") + // Add attributes. + sp.Attributes().UpsertString("string", "value") + sp.Attributes().UpsertBool("bool", true) + sp.Attributes().UpsertInt("int", 1) + sp.Attributes().UpsertDouble("double", 1.1) + sp.Attributes().UpsertBytes("bytes", []byte("foo")) + arr := internal.NewValueSlice() + arr.SliceVal().AppendEmpty().SetIntVal(1) + sp.Attributes().Upsert("array", arr) + // Add events. + event := sp.Events().AppendEmpty() + event.SetName("eventName") + event.SetTimestamp(internal.NewTimestampFromTime(time.Now())) + event.SetDroppedAttributesCount(1) + event.Attributes().UpsertString("string", "value") + event.Attributes().UpsertBool("bool", true) + event.Attributes().UpsertInt("int", 1) + event.Attributes().UpsertDouble("double", 1.1) + event.Attributes().UpsertBytes("bytes", []byte("foo")) + // Add links. + link := sp.Links().AppendEmpty() + link.SetTraceState("state") + link.SetTraceID(traceID) + link.SetSpanID(spanID) + link.SetDroppedAttributesCount(1) + link.Attributes().UpsertString("string", "value") + link.Attributes().UpsertBool("bool", true) + link.Attributes().UpsertInt("int", 1) + link.Attributes().UpsertDouble("double", 1.1) + link.Attributes().UpsertBytes("bytes", []byte("foo")) + // Add another span. + sp2 := il.Spans().AppendEmpty() + sp2.SetName("testSpan2") + return td +}() + +func TestJSONFull(t *testing.T) { + encoder := NewJSONMarshaler() + jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) + assert.NoError(t, err) + + decoder := newJSONUnmarshaler() + got, err := decoder.UnmarshalTraces(jsonBuf) + assert.NoError(t, err) + assert.EqualValues(t, tracesOTLPFull, got) +} + +func BenchmarkJSONPBUnmarshal(b *testing.B) { + b.ReportAllocs() + + encoder := NewJSONMarshaler() + jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) + assert.NoError(b, err) + decoder := newJSONPBUnmarshaler() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := decoder.UnmarshalTraces(jsonBuf) + assert.NoError(b, err) + } + }) +} + +func BenchmarkJSONUnmarshal(b *testing.B) { + b.ReportAllocs() + + encoder := NewJSONMarshaler() + jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) + assert.NoError(b, err) + decoder := newJSONUnmarshaler() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := decoder.UnmarshalTraces(jsonBuf) + assert.NoError(b, err) + } + }) +} + +func TestReadInt64(t *testing.T) { + var data = `{"intAsNumber":1,"intAsString":"1"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(data)) + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "intAsNumber": + v := readInt64(iter) + assert.Equal(t, int64(1), v) + case "intAsString": + v := readInt64(iter) + assert.Equal(t, int64(1), v) + } + return true + }) + assert.NoError(t, iter.Error) +} + +func TestReadTraceDataUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readTraceData(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadResourceSpansUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readResourceSpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadResourceSpansUnknownResourceField(t *testing.T) { + jsonStr := `{"resource":{"extra":""}}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readResourceSpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadInstrumentationLibrarySpansUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readInstrumentationLibrarySpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadInstrumentationLibrarySpansUnknownInstrumentationLibraryField(t *testing.T) { + jsonStr := `{"instrumentationLibrary":{"extra":""}}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readInstrumentationLibrarySpans(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadSpanUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadSpanUnknownStatusField(t *testing.T) { + jsonStr := `{"status":{"extra":""}}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} +func TestReadSpanInvalidTraceIDField(t *testing.T) { + jsonStr := `{"trace_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse trace_id") + } +} +func TestReadSpanInvalidSpanIDField(t *testing.T) { + jsonStr := `{"span_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse span_id") + } +} +func TestReadSpanInvalidParentSpanIDField(t *testing.T) { + jsonStr := `{"parent_span_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpan(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse parent_span_id") + } +} + +func TestReadSpanLinkUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanLink(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadSpanLinkInvalidTraceIDField(t *testing.T) { + jsonStr := `{"trace_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanLink(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse trace_id") + } +} + +func TestReadSpanLinkInvalidSpanIDField(t *testing.T) { + jsonStr := `{"span_id":"--"}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanLink(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "parse span_id") + } +} + +func TestReadSpanEventUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readSpanEvent(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadAttributeUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readAttribute(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadAnyValueUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readAnyValue(iter, "") + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadAnyValueInvliadBytesValue(t *testing.T) { + jsonStr := `"--"` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readAnyValue(iter, "bytesValue") + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "base64") + } +} + +func TestReadArrayUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readArray(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + +func TestReadSpanKind(t *testing.T) { + tests := []struct { + name string + jsonStr string + want otlptrace.Span_SpanKind + }{ + { + name: "string", + jsonStr: fmt.Sprintf(`"%s"`, otlptrace.Span_SPAN_KIND_INTERNAL.String()), + want: otlptrace.Span_SPAN_KIND_INTERNAL, + }, + { + name: "int", + jsonStr: fmt.Sprintf("%d", otlptrace.Span_SPAN_KIND_INTERNAL), + want: otlptrace.Span_SPAN_KIND_INTERNAL, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(tt.jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + if got := readSpanKind(iter); got != tt.want { + t.Errorf("readSpanKind() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestReadStatusCode(t *testing.T) { + tests := []struct { + name string + jsonStr string + want otlptrace.Status_StatusCode + }{ + { + name: "string", + jsonStr: fmt.Sprintf(`"%s"`, otlptrace.Status_STATUS_CODE_ERROR.String()), + want: otlptrace.Status_STATUS_CODE_ERROR, + }, + { + name: "int", + jsonStr: fmt.Sprintf("%d", otlptrace.Status_STATUS_CODE_ERROR), + want: otlptrace.Status_STATUS_CODE_ERROR, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(tt.jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + if got := readStatusCode(iter); got != tt.want { + t.Errorf("readStatusCode() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pdata/ptrace/jsoniter.go b/pdata/ptrace/jsoniter.go deleted file mode 100644 index b6e41bf04e6..00000000000 --- a/pdata/ptrace/jsoniter.go +++ /dev/null @@ -1,369 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ptrace // import "go.opentelemetry.io/collector/pdata/ptrace" - -import ( - "encoding/base64" - "fmt" - - jsoniter "github.com/json-iterator/go" - - "go.opentelemetry.io/collector/pdata/internal" - otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1" - otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1" -) - -// NewJSONIterUnmarshaler returns a model.Unmarshaler. Unmarshals from OTLP json bytes. -func NewJSONIterUnmarshaler() Unmarshaler { - return newJSONIterUnmarshaler() -} - -type jsonIterUnmarshaler struct { -} - -func newJSONIterUnmarshaler() *jsonIterUnmarshaler { - return &jsonIterUnmarshaler{} -} - -func (d *jsonIterUnmarshaler) UnmarshalTraces(buf []byte) (Traces, error) { - iter := jsoniter.ConfigFastest.BorrowIterator(buf) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - td := readTraceData(iter) - err := iter.Error - return internal.TracesFromProto(td), err -} - -func readTraceData(iter *jsoniter.Iterator) otlptrace.TracesData { - td := otlptrace.TracesData{} - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "resourceSpans", "resource_spans": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - td.ResourceSpans = append(td.ResourceSpans, readResourceSpans(iter)) - return true - }) - default: - iter.ReportError("root", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - return td -} - -func readResourceSpans(iter *jsoniter.Iterator) *otlptrace.ResourceSpans { - rs := &otlptrace.ResourceSpans{} - - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "resource": - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "attributes": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - rs.Resource.Attributes = append(rs.Resource.Attributes, readAttribute(iter)) - return true - }) - case "droppedAttributesCount", "dropped_attributes_count": - rs.Resource.DroppedAttributesCount = iter.ReadUint32() - default: - iter.ReportError("readResourceSpans.resource", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - case "instrumentationLibrarySpans", "instrumentation_library_spans", "scopeSpans", "scope_spans": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - rs.ScopeSpans = append(rs.ScopeSpans, - readInstrumentationLibrarySpans(iter)) - return true - }) - case "schemaUrl", "schema_url": - rs.SchemaUrl = iter.ReadString() - default: - iter.ReportError("readResourceSpans", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - return rs -} - -func readInstrumentationLibrarySpans(iter *jsoniter.Iterator) *otlptrace.ScopeSpans { - ils := &otlptrace.ScopeSpans{} - - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "instrumentationLibrary", "instrumentation_library", "scope": - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "name": - ils.Scope.Name = iter.ReadString() - case "version": - ils.Scope.Version = iter.ReadString() - default: - iter.ReportError("readInstrumentationLibrarySpans.instrumentationLibrary", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - case "spans": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - ils.Spans = append(ils.Spans, readSpan(iter)) - return true - }) - case "schemaUrl", "schema_url": - ils.SchemaUrl = iter.ReadString() - default: - iter.ReportError("readInstrumentationLibrarySpans", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - return ils -} - -func readSpan(iter *jsoniter.Iterator) *otlptrace.Span { - sp := &otlptrace.Span{} - - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "traceId", "trace_id": - if err := sp.TraceId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { - iter.ReportError("readSpan.traceId", fmt.Sprintf("parse trace_id:%v", err)) - } - case "spanId", "span_id": - if err := sp.SpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { - iter.ReportError("readSpan.spanId", fmt.Sprintf("parse span_id:%v", err)) - } - case "traceState", "trace_state": - sp.TraceState = iter.ReadString() - case "parentSpanId", "parent_span_id": - if err := sp.ParentSpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { - iter.ReportError("readSpan.parentSpanId", fmt.Sprintf("parse parent_span_id:%v", err)) - } - case "name": - sp.Name = iter.ReadString() - case "kind": - sp.Kind = readSpanKind(iter) - case "startTimeUnixNano", "start_time_unix_nano": - sp.StartTimeUnixNano = uint64(readInt64(iter)) - case "endTimeUnixNano", "end_time_unix_nano": - sp.EndTimeUnixNano = uint64(readInt64(iter)) - case "attributes": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - sp.Attributes = append(sp.Attributes, readAttribute(iter)) - return true - }) - case "droppedAttributesCount", "dropped_attributes_count": - sp.DroppedAttributesCount = iter.ReadUint32() - case "events": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - sp.Events = append(sp.Events, readSpanEvent(iter)) - return true - }) - case "droppedEventsCount", "dropped_events_count": - sp.DroppedEventsCount = iter.ReadUint32() - case "links": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - sp.Links = append(sp.Links, readSpanLink(iter)) - return true - }) - case "droppedLinksCount", "dropped_links_count": - sp.DroppedLinksCount = iter.ReadUint32() - case "status": - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "message": - sp.Status.Message = iter.ReadString() - case "code": - sp.Status.Code = readStatusCode(iter) - default: - iter.ReportError("readSpan.status", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - default: - iter.ReportError("readSpan", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - return sp -} - -func readSpanLink(iter *jsoniter.Iterator) *otlptrace.Span_Link { - link := &otlptrace.Span_Link{} - - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "traceId", "trace_id": - if err := link.TraceId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { - iter.ReportError("readSpanLink", fmt.Sprintf("parse trace_id:%v", err)) - } - case "spanId", "span_id": - if err := link.SpanId.UnmarshalJSON([]byte(iter.ReadString())); err != nil { - iter.ReportError("readSpanLink", fmt.Sprintf("parse span_id:%v", err)) - } - case "traceState", "trace_state": - link.TraceState = iter.ReadString() - case "attributes": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - link.Attributes = append(link.Attributes, readAttribute(iter)) - return true - }) - case "droppedAttributesCount", "dropped_attributes_count": - link.DroppedAttributesCount = iter.ReadUint32() - default: - iter.ReportError("readSpanLink", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - return link -} - -func readSpanEvent(iter *jsoniter.Iterator) *otlptrace.Span_Event { - event := &otlptrace.Span_Event{} - - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "timeUnixNano", "time_unix_nano": - event.TimeUnixNano = uint64(readInt64(iter)) - case "name": - event.Name = iter.ReadString() - case "attributes": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - event.Attributes = append(event.Attributes, readAttribute(iter)) - return true - }) - case "droppedAttributesCount", "dropped_attributes_count": - event.DroppedAttributesCount = iter.ReadUint32() - default: - iter.ReportError("readSpanEvent", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - return event -} - -func readAttribute(iter *jsoniter.Iterator) otlpcommon.KeyValue { - var ( - key string - value otlpcommon.AnyValue - ) - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "key": - key = iter.ReadString() - case "value": - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - value = readAnyValue(iter, f) - return true - }) - default: - iter.ReportError("readAttribute", fmt.Sprintf("unknown field:%v", f)) - } - return true - }) - return otlpcommon.KeyValue{ - Key: key, - Value: value, - } -} - -func readAnyValue(iter *jsoniter.Iterator, f string) otlpcommon.AnyValue { - switch f { - case "stringValue", "string_value": - return otlpcommon.AnyValue{ - Value: &otlpcommon.AnyValue_StringValue{ - StringValue: iter.ReadString(), - }, - } - case "boolValue", "bool_value": - return otlpcommon.AnyValue{ - Value: &otlpcommon.AnyValue_BoolValue{ - BoolValue: iter.ReadBool(), - }, - } - case "intValue", "int_value": - return otlpcommon.AnyValue{ - Value: &otlpcommon.AnyValue_IntValue{ - IntValue: readInt64(iter), - }, - } - case "doubleValue", "double_value": - return otlpcommon.AnyValue{ - Value: &otlpcommon.AnyValue_DoubleValue{ - DoubleValue: iter.ReadFloat64(), - }, - } - case "bytesValue", "bytes_value": - v, err := base64.StdEncoding.DecodeString(iter.ReadString()) - if err != nil { - iter.ReportError("bytesValue", fmt.Sprintf("base64 decode:%v", err)) - return otlpcommon.AnyValue{} - } - return otlpcommon.AnyValue{ - Value: &otlpcommon.AnyValue_BytesValue{ - BytesValue: v, - }, - } - case "arrayValue", "array_value": - return otlpcommon.AnyValue{ - Value: &otlpcommon.AnyValue_ArrayValue{ - ArrayValue: readArray(iter), - }, - } - default: - iter.ReportError("readAnyValue", fmt.Sprintf("unknown field:%v", f)) - return otlpcommon.AnyValue{} - } -} - -func readArray(iter *jsoniter.Iterator) *otlpcommon.ArrayValue { - v := &otlpcommon.ArrayValue{} - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "values": - iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - v.Values = append(v.Values, readAnyValue(iter, f)) - return true - }) - return true - }) - default: - iter.ReportError("readArray", fmt.Sprintf("unknown field:%s", f)) - } - return true - }) - return v -} - -func readInt64(iter *jsoniter.Iterator) int64 { - return iter.ReadAny().ToInt64() -} - -func readSpanKind(iter *jsoniter.Iterator) otlptrace.Span_SpanKind { - any := iter.ReadAny() - if v := any.ToInt(); v > 0 { - return otlptrace.Span_SpanKind(v) - } - v := any.ToString() - return otlptrace.Span_SpanKind(otlptrace.Span_SpanKind_value[v]) -} - -func readStatusCode(iter *jsoniter.Iterator) otlptrace.Status_StatusCode { - any := iter.ReadAny() - if v := any.ToInt(); v > 0 { - return otlptrace.Status_StatusCode(v) - } - v := any.ToString() - return otlptrace.Status_StatusCode(otlptrace.Status_StatusCode_value[v]) -} diff --git a/pdata/ptrace/jsoniter_test.go b/pdata/ptrace/jsoniter_test.go deleted file mode 100644 index 73ef4e494d8..00000000000 --- a/pdata/ptrace/jsoniter_test.go +++ /dev/null @@ -1,401 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ptrace - -import ( - "fmt" - "testing" - "time" - - jsoniter "github.com/json-iterator/go" - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/collector/pdata/internal" - otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1" -) - -var tracesOTLPFull = func() Traces { - traceID := internal.NewTraceID([16]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}) - spanID := internal.NewSpanID([8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}) - td := NewTraces() - // Add ResourceSpans. - rs := td.ResourceSpans().AppendEmpty() - rs.SetSchemaUrl("schemaURL") - // Add resource. - rs.Resource().Attributes().UpsertString("host.name", "testHost") - rs.Resource().Attributes().UpsertString("service.name", "testService") - rs.Resource().SetDroppedAttributesCount(1) - // Add InstrumentationLibrarySpans. - il := rs.ScopeSpans().AppendEmpty() - il.Scope().SetName("instrumentation name") - il.Scope().SetVersion("instrumentation version") - il.SetSchemaUrl("schemaURL") - // Add spans. - sp := il.Spans().AppendEmpty() - sp.SetName("testSpan") - sp.SetKind(internal.SpanKindClient) - sp.SetDroppedAttributesCount(1) - sp.SetStartTimestamp(internal.NewTimestampFromTime(time.Now())) - sp.SetTraceID(traceID) - sp.SetSpanID(spanID) - sp.SetDroppedEventsCount(1) - sp.SetDroppedLinksCount(1) - sp.SetEndTimestamp(internal.NewTimestampFromTime(time.Now())) - sp.SetParentSpanID(spanID) - sp.SetTraceState("state") - sp.Status().SetCode(internal.StatusCodeOk) - sp.Status().SetMessage("message") - // Add attributes. - sp.Attributes().UpsertString("string", "value") - sp.Attributes().UpsertBool("bool", true) - sp.Attributes().UpsertInt("int", 1) - sp.Attributes().UpsertDouble("double", 1.1) - sp.Attributes().UpsertBytes("bytes", []byte("foo")) - arr := internal.NewValueSlice() - arr.SliceVal().AppendEmpty().SetIntVal(1) - sp.Attributes().Upsert("array", arr) - // Add events. - event := sp.Events().AppendEmpty() - event.SetName("eventName") - event.SetTimestamp(internal.NewTimestampFromTime(time.Now())) - event.SetDroppedAttributesCount(1) - event.Attributes().UpsertString("string", "value") - event.Attributes().UpsertBool("bool", true) - event.Attributes().UpsertInt("int", 1) - event.Attributes().UpsertDouble("double", 1.1) - event.Attributes().UpsertBytes("bytes", []byte("foo")) - // Add links. - link := sp.Links().AppendEmpty() - link.SetTraceState("state") - link.SetTraceID(traceID) - link.SetSpanID(spanID) - link.SetDroppedAttributesCount(1) - link.Attributes().UpsertString("string", "value") - link.Attributes().UpsertBool("bool", true) - link.Attributes().UpsertInt("int", 1) - link.Attributes().UpsertDouble("double", 1.1) - link.Attributes().UpsertBytes("bytes", []byte("foo")) - // Add another span. - sp2 := il.Spans().AppendEmpty() - sp2.SetName("testSpan2") - return td -}() - -func TestJSONIter(t *testing.T) { - encoder := NewJSONMarshaler() - jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) - assert.NoError(t, err) - - decoder := newJSONIterUnmarshaler() - got, err := decoder.UnmarshalTraces(jsonBuf) - assert.NoError(t, err) - assert.EqualValues(t, tracesOTLPFull, got) -} - -func BenchmarkJSONUnmarshal(b *testing.B) { - b.ReportAllocs() - - encoder := NewJSONMarshaler() - jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) - assert.NoError(b, err) - decoder := newJSONUnmarshaler() - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := decoder.UnmarshalTraces(jsonBuf) - assert.NoError(b, err) - } - }) -} - -func BenchmarkTracesJSONiterUnmarshal(b *testing.B) { - b.ReportAllocs() - - encoder := NewJSONMarshaler() - jsonBuf, err := encoder.MarshalTraces(tracesOTLPFull) - assert.NoError(b, err) - decoder := newJSONIterUnmarshaler() - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := decoder.UnmarshalTraces(jsonBuf) - assert.NoError(b, err) - } - }) -} - -func TestReadInt64(t *testing.T) { - var data = `{"intAsNumber":1,"intAsString":"1"}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(data)) - iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - switch f { - case "intAsNumber": - v := readInt64(iter) - assert.Equal(t, int64(1), v) - case "intAsString": - v := readInt64(iter) - assert.Equal(t, int64(1), v) - } - return true - }) - assert.NoError(t, iter.Error) -} - -func Test_readTraceData(t *testing.T) { - t.Run("unknown field", func(t *testing.T) { - jsonStr := `{"extra":""}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readTraceData(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) -} - -func Test_readResourceSpans(t *testing.T) { - t.Run("unknown field", func(t *testing.T) { - jsonStr := `{"extra":""}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readResourceSpans(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) - t.Run("unknown resource field", func(t *testing.T) { - jsonStr := `{"resource":{"extra":""}}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readResourceSpans(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) -} - -func Test_readInstrumentationLibrarySpans(t *testing.T) { - t.Run("unknown field", func(t *testing.T) { - jsonStr := `{"extra":""}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readInstrumentationLibrarySpans(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) - t.Run("unknown instrumentationLibrary field", func(t *testing.T) { - jsonStr := `{"instrumentationLibrary":{"extra":""}}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readInstrumentationLibrarySpans(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) -} - -func Test_readSpan(t *testing.T) { - t.Run("unknown field", func(t *testing.T) { - jsonStr := `{"extra":""}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readSpan(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) - t.Run("unknown status field", func(t *testing.T) { - jsonStr := `{"status":{"extra":""}}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readSpan(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) - t.Run("invalid trace_id field", func(t *testing.T) { - jsonStr := `{"trace_id":"--"}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readSpan(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "parse trace_id") - } - }) - t.Run("invalid span_id field", func(t *testing.T) { - jsonStr := `{"span_id":"--"}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readSpan(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "parse span_id") - } - }) - t.Run("invalid parent_span_id field", func(t *testing.T) { - jsonStr := `{"parent_span_id":"--"}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readSpan(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "parse parent_span_id") - } - }) -} - -func Test_readSpanLink(t *testing.T) { - t.Run("unknown field", func(t *testing.T) { - jsonStr := `{"extra":""}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readSpanLink(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) - t.Run("invalid trace_id field", func(t *testing.T) { - jsonStr := `{"trace_id":"--"}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readSpanLink(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "parse trace_id") - } - }) - t.Run("invalid span_id field", func(t *testing.T) { - jsonStr := `{"span_id":"--"}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readSpanLink(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "parse span_id") - } - }) -} - -func Test_readSpanEvent(t *testing.T) { - t.Run("unknown field", func(t *testing.T) { - jsonStr := `{"extra":""}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readSpanEvent(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) -} - -func Test_readAttribute(t *testing.T) { - t.Run("unknown field", func(t *testing.T) { - jsonStr := `{"extra":""}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readAttribute(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) -} - -func Test_readAnyValue(t *testing.T) { - t.Run("unknown field", func(t *testing.T) { - jsonStr := `{"extra":""}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readAnyValue(iter, "") - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) - t.Run("invalid bytesValue", func(t *testing.T) { - jsonStr := `"--"` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readAnyValue(iter, "bytesValue") - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "base64") - } - }) -} - -func Test_readArray(t *testing.T) { - t.Run("unknown field", func(t *testing.T) { - jsonStr := `{"extra":""}` - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - readArray(iter) - if assert.Error(t, iter.Error) { - assert.Contains(t, iter.Error.Error(), "unknown field") - } - }) -} - -func Test_readSpanKind(t *testing.T) { - tests := []struct { - name string - jsonStr string - want otlptrace.Span_SpanKind - }{ - { - name: "string", - jsonStr: fmt.Sprintf(`"%s"`, otlptrace.Span_SPAN_KIND_INTERNAL.String()), - want: otlptrace.Span_SPAN_KIND_INTERNAL, - }, - { - name: "int", - jsonStr: fmt.Sprintf("%d", otlptrace.Span_SPAN_KIND_INTERNAL), - want: otlptrace.Span_SPAN_KIND_INTERNAL, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(tt.jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - if got := readSpanKind(iter); got != tt.want { - t.Errorf("readSpanKind() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_readStatusCode(t *testing.T) { - tests := []struct { - name string - jsonStr string - want otlptrace.Status_StatusCode - }{ - { - name: "string", - jsonStr: fmt.Sprintf(`"%s"`, otlptrace.Status_STATUS_CODE_ERROR.String()), - want: otlptrace.Status_STATUS_CODE_ERROR, - }, - { - name: "int", - jsonStr: fmt.Sprintf("%d", otlptrace.Status_STATUS_CODE_ERROR), - want: otlptrace.Status_STATUS_CODE_ERROR, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - iter := jsoniter.ConfigFastest.BorrowIterator([]byte(tt.jsonStr)) - defer jsoniter.ConfigFastest.ReturnIterator(iter) - if got := readStatusCode(iter); got != tt.want { - t.Errorf("readStatusCode() = %v, want %v", got, tt.want) - } - }) - } -} From 1e46b4db23473f470e399283eba39414a6dcabb5 Mon Sep 17 00:00:00 2001 From: Jimmie Han Date: Mon, 2 May 2022 16:10:45 +0800 Subject: [PATCH 4/4] Add kvlist support --- pdata/ptrace/json.go | 37 +++++++++++++++++++++++++++---------- pdata/ptrace/json_test.go | 15 +++++++++++++++ 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/pdata/ptrace/json.go b/pdata/ptrace/json.go index 3b0b726bbc9..2ad28adf209 100644 --- a/pdata/ptrace/json.go +++ b/pdata/ptrace/json.go @@ -276,17 +276,14 @@ func readSpanEvent(iter *jsoniter.Iterator) *otlptrace.Span_Event { } func readAttribute(iter *jsoniter.Iterator) otlpcommon.KeyValue { - var ( - key string - value otlpcommon.AnyValue - ) + kv := otlpcommon.KeyValue{} iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { switch f { case "key": - key = iter.ReadString() + kv.Key = iter.ReadString() case "value": iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { - value = readAnyValue(iter, f) + kv.Value = readAnyValue(iter, f) return true }) default: @@ -294,10 +291,7 @@ func readAttribute(iter *jsoniter.Iterator) otlpcommon.KeyValue { } return true }) - return otlpcommon.KeyValue{ - Key: key, - Value: value, - } + return kv } func readAnyValue(iter *jsoniter.Iterator, f string) otlpcommon.AnyValue { @@ -343,6 +337,12 @@ func readAnyValue(iter *jsoniter.Iterator, f string) otlpcommon.AnyValue { ArrayValue: readArray(iter), }, } + case "kvlistValue", "kvlist_value": + return otlpcommon.AnyValue{ + Value: &otlpcommon.AnyValue_KvlistValue{ + KvlistValue: readKvlistValue(iter), + }, + } default: iter.ReportError("readAnyValue", fmt.Sprintf("unknown field:%v", f)) return otlpcommon.AnyValue{} @@ -369,6 +369,23 @@ func readArray(iter *jsoniter.Iterator) *otlpcommon.ArrayValue { return v } +func readKvlistValue(iter *jsoniter.Iterator) *otlpcommon.KeyValueList { + v := &otlpcommon.KeyValueList{} + iter.ReadObjectCB(func(iter *jsoniter.Iterator, f string) bool { + switch f { + case "values": + iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool { + v.Values = append(v.Values, readAttribute(iter)) + return true + }) + default: + iter.ReportError("readKvlistValue", fmt.Sprintf("unknown field:%s", f)) + } + return true + }) + return v +} + func readInt64(iter *jsoniter.Iterator) int64 { return iter.ReadAny().ToInt64() } diff --git a/pdata/ptrace/json_test.go b/pdata/ptrace/json_test.go index 2457dcb190d..b02a10828ca 100644 --- a/pdata/ptrace/json_test.go +++ b/pdata/ptrace/json_test.go @@ -98,7 +98,12 @@ var tracesOTLPFull = func() Traces { sp.Attributes().UpsertBytes("bytes", []byte("foo")) arr := internal.NewValueSlice() arr.SliceVal().AppendEmpty().SetIntVal(1) + arr.SliceVal().AppendEmpty().SetStringVal("str") sp.Attributes().Upsert("array", arr) + kvList := internal.NewValueMap() + kvList.MapVal().Upsert("int", internal.NewValueInt(1)) + kvList.MapVal().Upsert("string", internal.NewValueString("string")) + sp.Attributes().Upsert("kvList", kvList) // Add events. event := sp.Events().AppendEmpty() event.SetName("eventName") @@ -365,6 +370,16 @@ func TestReadArrayUnknownField(t *testing.T) { } } +func TestReadKvlistValueUnknownField(t *testing.T) { + jsonStr := `{"extra":""}` + iter := jsoniter.ConfigFastest.BorrowIterator([]byte(jsonStr)) + defer jsoniter.ConfigFastest.ReturnIterator(iter) + readKvlistValue(iter) + if assert.Error(t, iter.Error) { + assert.Contains(t, iter.Error.Error(), "unknown field") + } +} + func TestReadSpanKind(t *testing.T) { tests := []struct { name string