Skip to content

Commit

Permalink
Merge branch 'main' into log-keyvalue-sorted-equal
Browse files Browse the repository at this point in the history
  • Loading branch information
dmathieu committed May 7, 2024
2 parents 1aa5721 + 2f662db commit 482ba99
Show file tree
Hide file tree
Showing 30 changed files with 390 additions and 133 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Apply the value length limits to `Record` attributes in `go.opentelemetry.io/otel/sdk/log`. (#5230)
- De-duplicate map attributes added to a `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5230)
- The `go.opentelemetry.io/otel/exporters/stdout/stdoutlog` exporter won't print `AttributeValueLengthLimit` and `AttributeCountLimit` fields now, instead it prints the `DroppedAttributes` field. (#5272)
- Improved performance in the `Stringer` implementation of `go.opentelemetry.io/otel/baggage.Member` by reducing the number of allocations. (#5286)

### Fixed

Expand Down Expand Up @@ -54,6 +55,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Improve performance of baggage member character validation in `go.opentelemetry.io/otel/baggage`. (#5214)
- The `otel-collector` example now uses docker compose to bring up services instead of kubernetes. (#5244)

### Fixed

- Slice attribute values in `go.opentelemetry.io/otel/attribute` are now emitted as their JSON representation. (#5159)

## [1.25.0/0.47.0/0.0.8/0.1.0-alpha] 2024-04-05

### Added
Expand Down
15 changes: 15 additions & 0 deletions attribute/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,21 @@ func TestEmit(t *testing.T) {
v: attribute.BoolValue(true),
want: "true",
},
{
name: `test Key.Emit() can emit a string representing self.INT64SLICE`,
v: attribute.Int64SliceValue([]int64{1, 42}),
want: `[1,42]`,
},
{
name: `test Key.Emit() can emit a string representing self.INT64`,
v: attribute.Int64Value(42),
want: "42",
},
{
name: `test Key.Emit() can emit a string representing self.FLOAT64SLICE`,
v: attribute.Float64SliceValue([]float64{1.0, 42.5}),
want: `[1,42.5]`,
},
{
name: `test Key.Emit() can emit a string representing self.FLOAT64`,
v: attribute.Float64Value(42.1),
Expand All @@ -78,6 +88,11 @@ func TestEmit(t *testing.T) {
v: attribute.StringValue("foo"),
want: "foo",
},
{
name: `test Key.Emit() can emit a string representing self.STRINGSLICE`,
v: attribute.StringSliceValue([]string{"foo", "bar"}),
want: `["foo","bar"]`,
},
} {
t.Run(testcase.name, func(t *testing.T) {
// proto: func (v attribute.Value) Emit() string {
Expand Down
18 changes: 15 additions & 3 deletions attribute/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,27 @@ func (v Value) Emit() string {
case BOOL:
return strconv.FormatBool(v.AsBool())
case INT64SLICE:
return fmt.Sprint(v.asInt64Slice())
j, err := json.Marshal(v.asInt64Slice())
if err != nil {
return fmt.Sprintf("invalid: %v", v.asInt64Slice())
}
return string(j)
case INT64:
return strconv.FormatInt(v.AsInt64(), 10)
case FLOAT64SLICE:
return fmt.Sprint(v.asFloat64Slice())
j, err := json.Marshal(v.asFloat64Slice())
if err != nil {
return fmt.Sprintf("invalid: %v", v.asFloat64Slice())
}
return string(j)
case FLOAT64:
return fmt.Sprint(v.AsFloat64())
case STRINGSLICE:
return fmt.Sprint(v.asStringSlice())
j, err := json.Marshal(v.asStringSlice())
if err != nil {
return fmt.Sprintf("invalid: %v", v.asStringSlice())
}
return string(j)
case STRING:
return v.stringly
default:
Expand Down
4 changes: 2 additions & 2 deletions baggage/baggage.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ func (m Member) String() string {
// A key is just an ASCII string. A value is restricted to be
// US-ASCII characters excluding CTLs, whitespace,
// DQUOTE, comma, semicolon, and backslash.
s := fmt.Sprintf("%s%s%s", m.key, keyValueDelimiter, valueEscape(m.value))
s := m.key + keyValueDelimiter + valueEscape(m.value)
if len(m.properties) > 0 {
s = fmt.Sprintf("%s%s%s", s, propertyDelimiter, m.properties.String())
s += propertyDelimiter + m.properties.String()
}
return s
}
Expand Down
15 changes: 15 additions & 0 deletions baggage/baggage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,3 +1027,18 @@ func BenchmarkValueEscape(b *testing.B) {
})
}
}

func BenchmarkMemberString(b *testing.B) {
alphabet := "abcdefghijklmnopqrstuvwxyz"
props := make([]Property, len(alphabet))
for i, r := range alphabet {
props[i] = Property{key: string(r)}
}
member, err := NewMember(alphabet, alphabet, props...)
require.NoError(b, err)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = member.String()
}
}
16 changes: 8 additions & 8 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ import (
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] {
func reservoirFunc(agg Aggregation) func() exemplar.Reservoir {
if !x.Exemplars.Enabled() {
return nil
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir[N] {
resF := func() func() exemplar.Reservoir {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.Reservoir[N] {
return func() exemplar.Reservoir {
bounds := cp
return exemplar.Histogram[N](bounds)
return exemplar.Histogram(bounds)
}
}

Expand Down Expand Up @@ -61,8 +61,8 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir
}
}

return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](n)
return func() exemplar.Reservoir {
return exemplar.FixedSize(n)
}
}

Expand All @@ -73,12 +73,12 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir
case "always_on":
return resF()
case "always_off":
return exemplar.Drop[N]
return exemplar.Drop
case "trace_based":
fallthrough
default:
newR := resF()
return func() exemplar.Reservoir[N] {
return func() exemplar.Reservoir {
return exemplar.SampledFilter(newR())
}
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir[N]
ReservoirFunc func() exemplar.Reservoir
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -50,12 +50,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
func (b Builder[N]) resFunc() func() exemplar.Reservoir {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

return exemplar.Drop[N]
return exemplar.Drop
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ var (
}
)

func dropExemplars[N int64 | float64]() exemplar.Reservoir[N] {
return exemplar.Drop[N]()
func dropExemplars[N int64 | float64]() exemplar.Reservoir {
return exemplar.Drop()
}

func TestBuilderFilter(t *testing.T) {
Expand Down
42 changes: 42 additions & 0 deletions sdk/metric/internal/aggregate/exemplar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

var exemplarPool = sync.Pool{
New: func() any { return new([]exemplar.Exemplar) },
}

func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) {
dest := exemplarPool.Get().(*[]exemplar.Exemplar)
defer func() {
*dest = (*dest)[:0]
exemplarPool.Put(dest)
}()

*dest = reset(*dest, len(*out), cap(*out))

f(dest)

*out = reset(*out, len(*dest), cap(*dest))
for i, e := range *dest {
(*out)[i].FilteredAttributes = e.FilteredAttributes
(*out)[i].Time = e.Time
(*out)[i].SpanID = e.SpanID
(*out)[i].TraceID = e.TraceID

switch e.Value.Type() {
case exemplar.Int64ValueType:
(*out)[i].Value = N(e.Value.Int64())
case exemplar.Float64ValueType:
(*out)[i].Value = N(e.Value.Float64())
}
}
}
50 changes: 50 additions & 0 deletions sdk/metric/internal/aggregate/exemplar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestCollectExemplars(t *testing.T) {
t.Run("Int64", testCollectExemplars[int64]())
t.Run("Float64", testCollectExemplars[float64]())
}

func testCollectExemplars[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
now := time.Now()
alice := attribute.String("user", "Alice")
value := N(1)
spanID := [8]byte{0x1}
traceID := [16]byte{0x1}

out := new([]metricdata.Exemplar[N])
collectExemplars(out, func(in *[]exemplar.Exemplar) {
*in = reset(*in, 1, 1)
(*in)[0] = exemplar.Exemplar{
FilteredAttributes: []attribute.KeyValue{alice},
Time: now,
Value: exemplar.NewValue(value),
SpanID: spanID[:],
TraceID: traceID[:],
}
})

assert.Equal(t, []metricdata.Exemplar[N]{{
FilteredAttributes: []attribute.KeyValue{alice},
Time: now,
Value: value,
SpanID: spanID[:],
TraceID: traceID[:],
}}, *out)
}
}
12 changes: 6 additions & 6 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res exemplar.Reservoir[N]
res exemplar.Reservoir

count uint64
min N
Expand Down Expand Up @@ -282,7 +282,7 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -305,7 +305,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int

newRes func() exemplar.Reservoir[N]
newRes func() exemplar.Reservoir
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand Down Expand Up @@ -333,7 +333,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
e.values[attr.Equivalent()] = v
}
v.record(value)
v.res.Offer(ctx, t, value, droppedAttr)
v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
}

func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
Expand Down Expand Up @@ -376,7 +376,7 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
// TODO (#3006): This will use an unbounded amount of memory if there
Expand Down
Loading

0 comments on commit 482ba99

Please sign in to comment.