Skip to content

Commit

Permalink
Replace internal aggregate Aggregator with Measure/ComputeAggregation…
Browse files Browse the repository at this point in the history
… and a Builder (#4304)
  • Loading branch information
MrAlias committed Jul 17, 2023
1 parent fdbcb9a commit d18f201
Show file tree
Hide file tree
Showing 18 changed files with 577 additions and 363 deletions.
32 changes: 16 additions & 16 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type streamID struct {
}

type int64Inst struct {
aggregators []aggregate.Aggregator[int64]
measures []aggregate.Measure[int64]

embedded.Int64Counter
embedded.Int64UpDownCounter
Expand All @@ -219,13 +219,13 @@ func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) {
if err := ctx.Err(); err != nil {
return
}
for _, agg := range i.aggregators {
agg.Aggregate(val, s)
for _, in := range i.measures {
in(ctx, val, s)
}
}

type float64Inst struct {
aggregators []aggregate.Aggregator[float64]
measures []aggregate.Measure[float64]

embedded.Float64Counter
embedded.Float64UpDownCounter
Expand All @@ -250,8 +250,8 @@ func (i *float64Inst) aggregate(ctx context.Context, val float64, s attribute.Se
if err := ctx.Err(); err != nil {
return
}
for _, agg := range i.aggregators {
agg.Aggregate(val, s)
for _, in := range i.measures {
in(ctx, val, s)
}
}

Expand All @@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{}
var _ metric.Float64ObservableUpDownCounter = float64Observable{}
var _ metric.Float64ObservableGauge = float64Observable{}

func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[float64]) float64Observable {
func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
return float64Observable{
observable: newObservable(scope, kind, name, desc, u, agg),
observable: newObservable(scope, kind, name, desc, u, meas),
}
}

Expand All @@ -296,20 +296,20 @@ var _ metric.Int64ObservableCounter = int64Observable{}
var _ metric.Int64ObservableUpDownCounter = int64Observable{}
var _ metric.Int64ObservableGauge = int64Observable{}

func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[int64]) int64Observable {
func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
return int64Observable{
observable: newObservable(scope, kind, name, desc, u, agg),
observable: newObservable(scope, kind, name, desc, u, meas),
}
}

type observable[N int64 | float64] struct {
metric.Observable
observablID[N]

aggregators []aggregate.Aggregator[N]
measures []aggregate.Measure[N]
}

func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[N]) *observable[N] {
func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
Expand All @@ -318,14 +318,14 @@ func newObservable[N int64 | float64](scope instrumentation.Scope, kind Instrume
unit: u,
scope: scope,
},
aggregators: agg,
measures: meas,
}
}

// observe records the val for the set of attrs.
func (o *observable[N]) observe(val N, s attribute.Set) {
for _, agg := range o.aggregators {
agg.Aggregate(val, s)
for _, in := range o.measures {
in(context.Background(), val, s)
}
}

Expand All @@ -336,7 +336,7 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument")
// no-op because it does not have any aggregators. Also, an error is returned
// if scope defines a Meter other than the one o was created by.
func (o *observable[N]) registerable(scope instrumentation.Scope) error {
if len(o.aggregators) == 0 {
if len(o.measures) == 0 {
return errEmptyAgg
}
if scope != o.scope {
Expand Down
41 changes: 31 additions & 10 deletions sdk/metric/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func BenchmarkInstrument(b *testing.B) {
Expand All @@ -32,11 +33,21 @@ func BenchmarkInstrument(b *testing.B) {
}

b.Run("instrumentImpl/aggregate", func(b *testing.B) {
inst := int64Inst{aggregators: []aggregate.Aggregator[int64]{
aggregate.NewLastValue[int64](),
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true),
}}
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

in, _ := build.LastValue()
meas = append(meas, in)

build.Temporality = metricdata.CumulativeTemporality
in, _ = build.Sum(true)
meas = append(meas, in)

build.Temporality = metricdata.DeltaTemporality
in, _ = build.Sum(true)
meas = append(meas, in)

inst := int64Inst{measures: meas}
ctx := context.Background()

b.ReportAllocs()
Expand All @@ -47,11 +58,21 @@ func BenchmarkInstrument(b *testing.B) {
})

b.Run("observable/observe", func(b *testing.B) {
o := observable[int64]{aggregators: []aggregate.Aggregator[int64]{
aggregate.NewLastValue[int64](),
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true),
}}
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

in, _ := build.LastValue()
meas = append(meas, in)

build.Temporality = metricdata.CumulativeTemporality
in, _ = build.Sum(true)
meas = append(meas, in)

build.Temporality = metricdata.DeltaTemporality
in, _ = build.Sum(true)
meas = append(meas, in)

o := observable[int64]{measures: meas}

b.ReportAllocs()
b.ResetTimer()
Expand Down
127 changes: 127 additions & 0 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// Measure receives measurements to be aggregated.
type Measure[N int64 | float64] func(context.Context, N, attribute.Set)

// ComputeAggregation stores the aggregate of measurements into dest and
// returns the number of aggregate data-points output.
type ComputeAggregation func(dest *metricdata.Aggregation) int

// Builder builds an aggregate function.
type Builder[N int64 | float64] struct {
// Temporality is the temporality used for the returned aggregate function.
//
// If this is not provided a default of cumulative will be used (except for
// the last-value aggregate function where delta is the only appropriate
// temporality).
Temporality metricdata.Temporality
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
}

func (b Builder[N]) input(agg aggregator[N]) Measure[N] {
if b.Filter != nil {
agg = newFilter[N](agg, b.Filter)
}
return func(_ context.Context, n N, a attribute.Set) {
agg.Aggregate(n, a)
}
}

// LastValue returns a last-value aggregate function input and output.
//
// The Builder.Temporality is ignored and delta is use always.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N]()

return b.input(lv), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = lv.Aggregation()

gData, _ := (*dest).(metricdata.Gauge[N])
return len(gData.DataPoints)
}
}

// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
var s aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
s = newPrecomputedDeltaSum[N](monotonic)
default:
s = newPrecomputedCumulativeSum[N](monotonic)
}

return b.input(s), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = s.Aggregation()

sData, _ := (*dest).(metricdata.Sum[N])
return len(sData.DataPoints)
}
}

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
var s aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
s = newDeltaSum[N](monotonic)
default:
s = newCumulativeSum[N](monotonic)
}

return b.input(s), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = s.Aggregation()

sData, _ := (*dest).(metricdata.Sum[N])
return len(sData.DataPoints)
}
}

// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram) (Measure[N], ComputeAggregation) {
var h aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
h = newDeltaHistogram[N](cfg)
default:
h = newCumulativeHistogram[N](cfg)
}
return b.input(h), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = h.Aggregation()

hData, _ := (*dest).(metricdata.Histogram[N])
return len(hData.DataPoints)
}
}
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
// override the default time.Now function.
var now = time.Now

// Aggregator forms an aggregation from a collection of recorded measurements.
// aggregator forms an aggregation from a collection of recorded measurements.
//
// Aggregators need to be comparable so they can be de-duplicated by the SDK
// when it creates them for multiple views.
type Aggregator[N int64 | float64] interface {
type aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
Aggregate(measurement N, attr attribute.Set)
Expand All @@ -45,7 +45,7 @@ type precomputeAggregator[N int64 | float64] interface {
// The Aggregate method of the embedded Aggregator is used to record
// pre-computed measurements, scoped by attributes that have not been
// filtered by an attribute filter.
Aggregator[N]
aggregator[N]

// aggregateFiltered records measurements scoped by attributes that have
// been filtered by an attribute filter.
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/aggregator_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (p *meter) Int64Counter(string, ...metric.Int64CounterOption) (metric.Int64
// temporality to used based on the Reader and View configuration. Assume
// here these are determined to be a cumulative sum.

aggregator := NewCumulativeSum[int64](true)
aggregator := newCumulativeSum[int64](true)
count := inst{aggregateFunc: aggregator.Aggregate}

p.aggregations = append(p.aggregations, aggregator.Aggregation())
Expand All @@ -54,7 +54,7 @@ func (p *meter) Int64UpDownCounter(string, ...metric.Int64UpDownCounterOption) (
// configuration. Assume here these are determined to be a last-value
// aggregation (the temporality does not affect the produced aggregations).

aggregator := NewLastValue[int64]()
aggregator := newLastValue[int64]()
upDownCount := inst{aggregateFunc: aggregator.Aggregate}

p.aggregations = append(p.aggregations, aggregator.Aggregation())
Expand All @@ -71,7 +71,7 @@ func (p *meter) Int64Histogram(string, ...metric.Int64HistogramOption) (metric.I
// Assume here these are determined to be a delta explicit-bucket
// histogram.

aggregator := NewDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{
aggregator := newDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000},
NoMinMax: false,
})
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/internal/aggregate/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ type aggregatorTester[N int64 | float64] struct {
CycleN int
}

func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) {
func (at *aggregatorTester[N]) Run(a aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) {
t.Run("Comparable", func(t *testing.T) {
assert.NotPanics(t, func() {
_ = map[Aggregator[N]]struct{}{a: {}}
_ = map[aggregator[N]]struct{}{a: {}}
})
})

Expand Down Expand Up @@ -117,7 +117,7 @@ func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap[N], eFunc expect

var bmarkResults metricdata.Aggregation

func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggregator[N], count int) {
func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() aggregator[N], count int) {
attrs := make([]attribute.Set, count)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
Expand All @@ -137,7 +137,7 @@ func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggreg
})

b.Run("Aggregations", func(b *testing.B) {
aggs := make([]Aggregator[N], b.N)
aggs := make([]aggregator[N], b.N)
for n := range aggs {
a := factory()
for _, attr := range attrs {
Expand All @@ -155,7 +155,7 @@ func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggreg
})
}

func benchmarkAggregator[N int64 | float64](factory func() Aggregator[N]) func(*testing.B) {
func benchmarkAggregator[N int64 | float64](factory func() aggregator[N]) func(*testing.B) {
counts := []int{1, 10, 100}
return func(b *testing.B) {
for _, n := range counts {
Expand Down
Loading

0 comments on commit d18f201

Please sign in to comment.