Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the experimental exemplar feature #4871

Merged
merged 13 commits into from
Jan 31, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808)
- Experimental exemplar exporting is added to the metric SDK.
See [metric documentation](./sdk/metric/EXPERIMENTAL.md#exemplars) for more information about this feature and how to enable it. (#4871)

### Fixed

Expand Down
22 changes: 22 additions & 0 deletions sdk/metric/EXPERIMENTAL.md
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,28 @@ Disable the cardinality limit.
unset OTEL_GO_X_CARDINALITY_LIMIT
```

### Exemplars
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

A sample of mesurements made may be exported directly as a set of exemplars.

This experimental feature can be enabled by setting the `OTEL_GO_X_EXEMPLAR` environment variable.
The value of must be the case-insensitive string of `"true"` to enable the feature.
All other values are ignored.

#### Examples

Enable exemplars to be exported.

```console
export OTEL_GO_X_EXEMPLAR=true
```

Disable exemplars from being exported.

```console
unset OTEL_GO_X_EXEMPLAR
```

## Compatibility and Stability

Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../VERSIONING.md).
Expand Down
89 changes: 89 additions & 0 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"runtime"
"strconv"
"testing"

Expand All @@ -24,6 +26,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/trace"
)

var viewBenchmarks = []struct {
Expand Down Expand Up @@ -369,3 +372,89 @@ func benchCollectAttrs(setup func(attribute.Set) Reader) func(*testing.B) {
b.Run("Attributes/10", run(setup(attribute.NewSet(attrs...))))
}
}

func BenchmarkExemplars(b *testing.B) {
sc := trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{0o1},
TraceID: trace.TraceID{0o1},
pellared marked this conversation as resolved.
Show resolved Hide resolved
TraceFlags: trace.FlagsSampled,
})
ctx := trace.ContextWithSpanContext(context.Background(), sc)

attr := attribute.NewSet(
attribute.String("user", "Alice"),
attribute.Bool("admin", true),
)

setup := func(name string) (metric.Meter, Reader) {
r := NewManualReader()
v := NewView(Instrument{Name: "*"}, Stream{
AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("user")
},
})
mp := NewMeterProvider(WithReader(r), WithView(v))
return mp.Meter(name), r
}
nCPU := runtime.NumCPU()
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

b.Setenv("OTEL_GO_X_EXEMPLAR", "true")

name := fmt.Sprintf("Int64Counter/%d", nCPU)
pellared marked this conversation as resolved.
Show resolved Hide resolved
b.Run(name, func(b *testing.B) {
m, r := setup("Int64Counter")
i, err := m.Int64Counter("int64-counter")
assert.NoError(b, err)

rm := newRM(metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Exemplars: make([]metricdata.Exemplar[int64], 0, nCPU)},
},
})
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Exemplars)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for j := 0; j < 2*nCPU; j++ {
i.Add(ctx, 1, metric.WithAttributeSet(attr))
}

_ = r.Collect(ctx, rm)
assert.Len(b, *e, nCPU)
}
})

name = fmt.Sprintf("Int64Histogram/%d", nCPU)
b.Run(name, func(b *testing.B) {
m, r := setup("Int64Counter")
i, err := m.Int64Histogram("int64-histogram")
assert.NoError(b, err)

rm := newRM(metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{Exemplars: make([]metricdata.Exemplar[int64], 0, 1)},
},
})
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64]).DataPoints[0].Exemplars)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for j := 0; j < 2*nCPU; j++ {
i.Record(ctx, 1, metric.WithAttributeSet(attr))
}

_ = r.Collect(ctx, rm)
assert.Len(b, *e, 1)
}
})
}

func newRM(a metricdata.Aggregation) *metricdata.ResourceMetrics {
return &metricdata.ResourceMetrics{
ScopeMetrics: []metricdata.ScopeMetrics{
{Metrics: []metricdata.Metrics{{Data: a}}},
},
}
}
96 changes: 96 additions & 0 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"os"
"runtime"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
)

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and user defined
// environment variables.
//
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] {
if !x.Exemplars.Enabled() {
return nil
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir[N] {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := make([]float64, len(a.Boundaries))
copy(cp, a.Boundaries)
return func() exemplar.Reservoir[N] {
bounds := cp
return exemplar.Histogram[N](bounds)
}

Check warning on line 48 in sdk/metric/exemplar.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/exemplar.go#L38-L48

Added lines #L38 - L48 were not covered by tests
}

var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. numer of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}

Check warning on line 72 in sdk/metric/exemplar.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/exemplar.go#L51-L72

Added lines #L51 - L72 were not covered by tests
}

return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](n)
}

Check warning on line 77 in sdk/metric/exemplar.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/exemplar.go#L75-L77

Added lines #L75 - L77 were not covered by tests
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

switch os.Getenv(filterEnvKey) {
case "always_on":
return resF()
case "always_off":
return exemplar.Drop[N]
case "trace_based":
fallthrough
default:
newR := resF()
return func() exemplar.Reservoir[N] {
return exemplar.SampledFilter(newR())
}

Check warning on line 94 in sdk/metric/exemplar.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/exemplar.go#L81-L94

Added lines #L81 - L94 were not covered by tests
}
}
37 changes: 28 additions & 9 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -44,6 +45,12 @@
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// ReservoirFunc is the factory function used by aggregate functions to
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -54,15 +61,27 @@
AggregationLimit int
}

func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

Check warning on line 67 in sdk/metric/internal/aggregate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/internal/aggregate/aggregate.go#L66-L67

Added lines #L66 - L67 were not covered by tests

return exemplar.Drop[N]
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)

func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
f(ctx, n, fAttr)
fAttr, dropped := a.Filter(fltr)
f(ctx, n, fAttr, dropped)
}
}
return f
return func(ctx context.Context, n N, a attribute.Set) {
f(ctx, n, a, nil)
}
}

// LastValue returns a last-value aggregate function input and output.
Expand All @@ -71,7 +90,7 @@
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N](b.AggregationLimit)
lv := newLastValue[N](b.AggregationLimit, b.resFunc())

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
Expand All @@ -87,7 +106,7 @@
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newPrecomputedSum[N](monotonic, b.AggregationLimit)
s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -98,7 +117,7 @@

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic, b.AggregationLimit)
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -110,7 +129,7 @@
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit)
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand All @@ -122,7 +141,7 @@
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit)
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand Down
16 changes: 11 additions & 5 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
Expand Down Expand Up @@ -59,6 +60,10 @@ var (
}
)

func dropExemplars[N int64 | float64]() exemplar.Reservoir[N] {
return exemplar.Drop[N]()
}

func TestBuilderFilter(t *testing.T) {
t.Run("Int64", testBuilderFilter[int64]())
t.Run("Float64", testBuilderFilter[float64]())
Expand All @@ -69,20 +74,21 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) {
t.Helper()

value, attr := N(1), alice
run := func(b Builder[N], wantA attribute.Set) func(*testing.T) {
run := func(b Builder[N], wantF attribute.Set, wantD []attribute.KeyValue) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

meas := b.filter(func(_ context.Context, v N, a attribute.Set) {
meas := b.filter(func(_ context.Context, v N, f attribute.Set, d []attribute.KeyValue) {
assert.Equal(t, value, v, "measured incorrect value")
assert.Equal(t, wantA, a, "measured incorrect attributes")
assert.Equal(t, wantF, f, "measured incorrect filtered attributes")
assert.ElementsMatch(t, wantD, d, "measured incorrect dropped attributes")
})
meas(context.Background(), value, attr)
}
}

t.Run("NoFilter", run(Builder[N]{}, attr))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice))
t.Run("NoFilter", run(Builder[N]{}, attr, nil))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
}
}

Expand Down
Loading