Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine precomputed values of filtered attribute sets #3549

Merged
merged 29 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
948932a
Combine spatially aggregated precomputed vals
MrAlias Dec 19, 2022
5896179
Ignore false positive lint error and test method
MrAlias Dec 19, 2022
222a1cb
Add fix to changelog
MrAlias Dec 19, 2022
b463070
Handle edge case of exact set after filter
MrAlias Dec 20, 2022
e219ee2
Fix filter and measure algo for precomp
MrAlias Dec 21, 2022
cd34714
Add tests for precomp sums
MrAlias Dec 21, 2022
954145f
Unify precomputedMap
MrAlias Dec 21, 2022
c483c19
Merge branch 'main' into fix-3439
MrAlias Dec 24, 2022
3f3180f
Merge branch 'main' into fix-3439
MrAlias Jan 3, 2023
da9e4b4
Merge branch 'main' into fix-3439
MrAlias Jan 3, 2023
d5d63f1
Merge branch 'main' into fix-3439
MrAlias Jan 4, 2023
e0177a7
Merge branch 'main' into fix-3439
hanyuancheung Jan 5, 2023
876827b
Adds example from supplimental guide
MadVikingGod Jan 5, 2023
79e0f4a
Fixes for lint
MadVikingGod Jan 5, 2023
52c340f
Merge branch 'main' into fix-3439
MrAlias Jan 10, 2023
813cb0d
Merge branch 'main' into fix-3439
MrAlias Jan 10, 2023
81043bd
Merge branch 'main' into fix-3439
MrAlias Jan 10, 2023
135bd8a
Merge branch 'main' into fix-3439
MrAlias Jan 12, 2023
649337a
Merge branch 'main' into fix-3439
MrAlias Jan 13, 2023
406a4bf
Update sdk/metric/meter_example_test.go
MrAlias Jan 13, 2023
36bf77b
Merge pull request #721 from MadVikingGod/mvg/example-3439
MrAlias Jan 13, 2023
657c069
Fix async example test
MrAlias Jan 13, 2023
f295494
Reduce duplicate code in TestAsynchronousExample
MrAlias Jan 13, 2023
38cd7f4
Clarify naming and documentation
MrAlias Jan 14, 2023
8ec850a
Fix spelling errors
MrAlias Jan 14, 2023
44d1b97
Merge branch 'main' into fix-3439
MadVikingGod Jan 17, 2023
af45187
Add a noop filter to default view
MrAlias Jan 17, 2023
c6b7069
Merge branch 'main' into fix-3439
MrAlias Jan 19, 2023
1c25a20
Merge branch 'main' into fix-3439
MrAlias Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)

### Fixed

- Asynchronous instruments that use sum aggregators and attribute filters correctly add values from equivalent attribute sets that have been filtered. (#3439, #3549)

### Changed

- Global error handler uses an atomic value instead of a mutex. (#3543)
Expand Down
35 changes: 31 additions & 4 deletions sdk/metric/internal/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,38 @@ type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator Aggregator[N]

// Used to aggreagte if an aggregator aggregates values differently for
// spatically reaggregated attributes.
aggregateFiltered func(N, attribute.Set)

sync.Mutex
seen map[attribute.Set]attribute.Set
seen map[attribute.Set]attribute.Set
filtered map[attribute.Set]bool
}

// NewFilter wraps an Aggregator with an attribute filtering function.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
af, ok := agg.(interface{ aggregateFiltered(N, attribute.Set) })
if ok {
return &filter[N]{
filter: fn,
aggregator: agg,
aggregateFiltered: af.aggregateFiltered,
seen: make(map[attribute.Set]attribute.Set),
// Use distinct filtered and seen to ensure un-filtered attributes
// that match the same previously filtered attributes is treated
// the same (added, not set).
filtered: make(map[attribute.Set]bool),
}
}
return &filter[N]{
filter: fn,
aggregator: agg,
seen: map[attribute.Set]attribute.Set{},
seen: make(map[attribute.Set]attribute.Set),
// Don't allocate filtered as it won't be used
}
}

Expand All @@ -51,10 +70,18 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
var na []attribute.KeyValue
fAttr, na = attr.Filter(f.filter)
f.seen[attr] = fAttr
if f.aggregateFiltered != nil && len(na) != 0 {
f.filtered[fAttr] = true
}
}
if f.aggregateFiltered != nil && f.filtered[fAttr] {
f.aggregateFiltered(measurement, fAttr)
} else {
f.aggregator.Aggregate(measurement, fAttr)
}
f.aggregator.Aggregate(measurement, fAttr)
}

// Aggregation returns an Aggregation, for all the aggregated
Expand Down
12 changes: 12 additions & 0 deletions sdk/metric/internal/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) {
s.Unlock()
}

// aggregateFiltered records value with spatially re-aggregated attrs.
func (s *precomputedDeltaSum[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // used to filter.
s.Lock()
s.recorded[attr] += value
s.Unlock()
}

func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -260,3 +267,8 @@ type precomputedSum[N int64 | float64] struct {
func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) {
s.set(value, attr)
}

// aggregateFiltered records value with spatially re-aggregated attrs.
func (s *precomputedSum[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // used to filter.
s.valueMap.Aggregate(value, attr)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}
24 changes: 24 additions & 0 deletions sdk/metric/internal/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
Expand Down Expand Up @@ -163,6 +164,29 @@ func TestDeltaSumReset(t *testing.T) {
t.Run("Float64", testDeltaSumReset[float64])
}

func TestAggregateFiltered(t *testing.T) {
t.Run("PreComputedDelta", testAggregateFiltered(NewPrecomputedDeltaSum[int64](false)))
t.Run("PreComputedCumulativeSum", testAggregateFiltered(NewPrecomputedCumulativeSum[int64](false)))
}

type af interface{ aggregateFiltered(int64, attribute.Set) }

func testAggregateFiltered[N int64 | float64](a Aggregator[N]) func(*testing.T) {
attrs := attribute.NewSet(attribute.String("key", "val"))
return func(t *testing.T) {
a.Aggregate(1, attrs)

require.Implements(t, (*af)(nil), a)
a.(af).aggregateFiltered(1, attrs)

agg := a.Aggregation()
require.IsType(t, agg, metricdata.Sum[int64]{})
sum := agg.(metricdata.Sum[int64])
require.Len(t, sum.DataPoints, 1)
assert.Equal(t, N(2), sum.DataPoints[0].Value)
}
}

func TestEmptySumNilAggregation(t *testing.T) {
assert.Nil(t, NewCumulativeSum[int64](true).Aggregation())
assert.Nil(t, NewCumulativeSum[int64](false).Aggregation())
Expand Down
89 changes: 51 additions & 38 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,11 @@ func TestRegisterCallbackDropAggregations(t *testing.T) {
}

func TestAttributeFilter(t *testing.T) {
t.Run("Delta", testAttributeFilter(metricdata.DeltaTemporality))
t.Run("Cumulative", testAttributeFilter(metricdata.CumulativeTemporality))
}

func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
one := 1.0
two := 2.0
testcases := []struct {
Expand All @@ -640,7 +645,8 @@ func TestAttributeFilter(t *testing.T) {
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"))
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
return err
},
Expand All @@ -650,10 +656,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 2.0, // TODO (#3439): This should be 3.0.
Value: 4.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
Expand All @@ -667,7 +673,8 @@ func TestAttributeFilter(t *testing.T) {
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"))
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
return err
},
Expand All @@ -677,10 +684,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 2.0, // TODO (#3439): This should be 3.0.
Value: 4.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
Expand Down Expand Up @@ -719,7 +726,8 @@ func TestAttributeFilter(t *testing.T) {
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"))
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
return err
},
Expand All @@ -729,10 +737,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 20, // TODO (#3439): This should be 30.
Value: 40,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
Expand All @@ -746,7 +754,8 @@ func TestAttributeFilter(t *testing.T) {
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"))
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
return err
},
Expand All @@ -756,10 +765,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 20, // TODO (#3439): This should be 30.
Value: 40,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
Expand Down Expand Up @@ -810,7 +819,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
Expand All @@ -836,7 +845,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
Expand Down Expand Up @@ -867,7 +876,7 @@ func TestAttributeFilter(t *testing.T) {
Sum: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
},
},
},
Expand All @@ -892,7 +901,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
Expand All @@ -918,7 +927,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
Expand Down Expand Up @@ -949,34 +958,38 @@ func TestAttributeFilter(t *testing.T) {
Sum: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
},
},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
rdr := NewManualReader()
mtr := NewMeterProvider(
WithReader(rdr),
WithView(NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("foo")
}},
)),
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))

m, err := rdr.Collect(context.Background())
assert.NoError(t, err)
return func(t *testing.T) {
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
rdr := NewManualReader(WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality {
return temporality
}))
mtr := NewMeterProvider(
WithReader(rdr),
WithView(NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("foo")
}},
)),
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))

m, err := rdr.Collect(context.Background())
assert.NoError(t, err)

require.Len(t, m.ScopeMetrics, 1)
require.Len(t, m.ScopeMetrics[0].Metrics, 1)
require.Len(t, m.ScopeMetrics, 1)
require.Len(t, m.ScopeMetrics[0].Metrics, 1)

metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
})
metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
})
}
}
}

Expand Down