Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine precomputed values of filtered attribute sets #3549

Merged
merged 29 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
948932a
Combine spatially aggregated precomputed vals
MrAlias Dec 19, 2022
5896179
Ignore false positive lint error and test method
MrAlias Dec 19, 2022
222a1cb
Add fix to changelog
MrAlias Dec 19, 2022
b463070
Handle edge case of exact set after filter
MrAlias Dec 20, 2022
e219ee2
Fix filter and measure algo for precomp
MrAlias Dec 21, 2022
cd34714
Add tests for precomp sums
MrAlias Dec 21, 2022
954145f
Unify precomputedMap
MrAlias Dec 21, 2022
c483c19
Merge branch 'main' into fix-3439
MrAlias Dec 24, 2022
3f3180f
Merge branch 'main' into fix-3439
MrAlias Jan 3, 2023
da9e4b4
Merge branch 'main' into fix-3439
MrAlias Jan 3, 2023
d5d63f1
Merge branch 'main' into fix-3439
MrAlias Jan 4, 2023
e0177a7
Merge branch 'main' into fix-3439
hanyuancheung Jan 5, 2023
876827b
Adds example from supplimental guide
MadVikingGod Jan 5, 2023
79e0f4a
Fixes for lint
MadVikingGod Jan 5, 2023
52c340f
Merge branch 'main' into fix-3439
MrAlias Jan 10, 2023
813cb0d
Merge branch 'main' into fix-3439
MrAlias Jan 10, 2023
81043bd
Merge branch 'main' into fix-3439
MrAlias Jan 10, 2023
135bd8a
Merge branch 'main' into fix-3439
MrAlias Jan 12, 2023
649337a
Merge branch 'main' into fix-3439
MrAlias Jan 13, 2023
406a4bf
Update sdk/metric/meter_example_test.go
MrAlias Jan 13, 2023
36bf77b
Merge pull request #721 from MadVikingGod/mvg/example-3439
MrAlias Jan 13, 2023
657c069
Fix async example test
MrAlias Jan 13, 2023
f295494
Reduce duplicate code in TestAsynchronousExample
MrAlias Jan 13, 2023
38cd7f4
Clarify naming and documentation
MrAlias Jan 14, 2023
8ec850a
Fix spelling errors
MrAlias Jan 14, 2023
44d1b97
Merge branch 'main' into fix-3439
MadVikingGod Jan 17, 2023
af45187
Add a noop filter to default view
MrAlias Jan 17, 2023
c6b7069
Merge branch 'main' into fix-3439
MrAlias Jan 19, 2023
1c25a20
Merge branch 'main' into fix-3439
MrAlias Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `Int64Histogram` replaces the `syncint64.Histogram`
- Add `NewTracerProvider` to `go.opentelemetry.io/otel/bridge/opentracing` to create `WrapperTracer` instances from a `TracerProvider`. (#3316)

### Fixed

- Asynchronous instruments that use sum aggregators and attribute filters correctly add values from equivalent attribute sets that have been filtered. (#3439, #3549)

### Changed

- Instrument configuration in `go.opentelemetry.io/otel/metric/instrument` is split into specific options and confguration based on the instrument type. (#3507)
Expand Down
25 changes: 22 additions & 3 deletions sdk/metric/internal/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
)

// now is used to return the current local time while allowing tests to
// override the the default time.Now function.
// override the default time.Now function.
var now = time.Now

// Aggregator forms an aggregation from a collection of recorded measurements.
//
// Aggregators need to be comparable so they can be de-duplicated by the SDK when
// it creates them for multiple views.
// Aggregators need to be comparable so they can be de-duplicated by the SDK
// when it creates them for multiple views.
type Aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
Expand All @@ -38,3 +38,22 @@ type Aggregator[N int64 | float64] interface {
// measurements made and ends an aggregation cycle.
Aggregation() metricdata.Aggregation
}

// precomputeAggregator is an Aggregator that receives values to aggregate that
// have been pre-computed by the caller.
type precomputeAggregator[N int64 | float64] interface {
// The Aggregate method of the embedded Aggregator is used to record
// pre-computed measurements, scoped by attributes that have not been
// filtered by an attribute filter.
Aggregator[N]

// aggregateFiltered records measurements scoped by attributes that have
// been filtered by an attribute filter.
//
// Pre-computed measurements of filtered attributes need to be recorded
// separate from those that haven't been filtered so they can be added to
// the non-filtered pre-computed measurements in a collection cycle and
// then resets after the cycle (the non-filtered pre-computed measurements
// are not reset).
aggregateFiltered(N, attribute.Set)
}
86 changes: 78 additions & 8 deletions sdk/metric/internal/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,26 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// filter is an aggregator that applies attribute filter when Aggregating. filters
// do not have any backing memory, and must be constructed with a backing Aggregator.
// NewFilter returns an Aggregator that wraps an agg with an attribute
// filtering function. Both pre-computed non-pre-computed Aggregators can be
// passed for agg. An appropriate Aggregator will be returned for the detected
// type.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
if fa, ok := agg.(precomputeAggregator[N]); ok {
return newPrecomputedFilter(fa, fn)
}
return newFilter(agg, fn)
}

// filter wraps an aggregator with an attribute filter. All recorded
// measurements will have their attributes filtered before they are passed to
// the underlying aggregator's Aggregate method.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator Aggregator[N]
Expand All @@ -31,15 +49,16 @@ type filter[N int64 | float64] struct {
seen map[attribute.Set]attribute.Set
}

// NewFilter wraps an Aggregator with an attribute filtering function.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
// newFilter returns an filter Aggregator that wraps agg with the attribute
// filter fn.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
return &filter[N]{
filter: fn,
aggregator: agg,
seen: map[attribute.Set]attribute.Set{},
seen: make(map[attribute.Set]attribute.Set),
}
}

Expand All @@ -62,3 +81,54 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}

// precomputedFilter is an aggregator that applies attribute filter when
// Aggregating for pre-computed Aggregations. The pre-computed Aggregations
// need to operate normally when no attribute filtering is done (for sums this
// means setting the value), but when attribute filtering is done it needs to
// be added to any set value.
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator precomputeAggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
// with the attribute filter fn.
//
// This should not be used to wrap a non-pre-computed Aggregator. Use a
// precomputedFilter instead.
func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] {
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
} else {
f.aggregator.aggregateFiltered(measurement, fAttr)
}
}

// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}
89 changes: 89 additions & 0 deletions sdk/metric/internal/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"fmt"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -194,3 +196,90 @@ func TestFilterConcurrent(t *testing.T) {
testFilterConcurrent[float64](t)
})
}

func TestPrecomputedFilter(t *testing.T) {
t.Run("Int64", testPrecomputedFilter[int64]())
t.Run("Float64", testPrecomputedFilter[float64]())
}

func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
agg := newTestFilterAgg[N]()
f := NewFilter[N](agg, testAttributeFilter)
require.IsType(t, &precomputedFilter[N]{}, f)

var (
powerLevel = attribute.Int("power-level", 9000)
user = attribute.String("user", "Alice")
admin = attribute.Bool("admin", true)
)
a := attribute.NewSet(powerLevel)
key := a
f.Aggregate(1, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(0), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel, user)
f.Aggregate(2, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(2), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel, user, admin)
f.Aggregate(3, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel)
f.Aggregate(2, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))

a = attribute.NewSet(user)
f.Aggregate(3, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a))

_ = f.Aggregation()
assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation")
}
}

func str(a attribute.Set) string {
iter := a.Iter()
out := make([]string, 0, iter.Len())
for iter.Next() {
kv := iter.Attribute()
out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface()))
}
return strings.Join(out, ",")
}

type testFilterAgg[N int64 | float64] struct {
values map[attribute.Set]precomputedValue[N]
aggregationN int
}

func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] {
return &testFilterAgg[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}

func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) {
v := a.values[attr]
v.measured = val
a.values[attr] = v
}

// nolint: unused // Used to agg filtered.
func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) {
v := a.values[attr]
v.filtered += val
a.values[attr] = v
}

func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation {
a.aggregationN++
return nil
}
Loading