From fc08f9bd506bfcf52b1b44e8bd3d6624bb72d2a1 Mon Sep 17 00:00:00 2001 From: Anthony Regeda Date: Wed, 8 Feb 2023 11:47:30 +0100 Subject: [PATCH] No memory leakage in attributes filter The attributes filter collects seen attributes in order to avoid filtration on the same attribute set. However, the `attribute.Set` is not comparable type and new allocations of sets with same attributes will be considered as new sets. Metrics with a high cardinality of attributes consume a lot of memory even if we set a filter to reduce that cardinality. --- sdk/metric/internal/filter.go | 28 ++-------------------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/sdk/metric/internal/filter.go b/sdk/metric/internal/filter.go index 4d24b62819a..5c60c5e0d0b 100644 --- a/sdk/metric/internal/filter.go +++ b/sdk/metric/internal/filter.go @@ -15,8 +15,6 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( - "sync" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -44,9 +42,6 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg type filter[N int64 | float64] struct { filter attribute.Filter aggregator Aggregator[N] - - sync.Mutex - seen map[attribute.Set]attribute.Set } // newFilter returns an filter Aggregator that wraps agg with the attribute @@ -58,21 +53,13 @@ func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filte return &filter[N]{ filter: fn, aggregator: agg, - seen: make(map[attribute.Set]attribute.Set), } } // Aggregate records the measurement, scoped by attr, and aggregates it // into an aggregation. func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { - // TODO (#3006): drop stale attributes from seen. - f.Lock() - defer f.Unlock() - fAttr, ok := f.seen[attr] - if !ok { - fAttr, _ = attr.Filter(f.filter) - f.seen[attr] = fAttr - } + fAttr, _ := attr.Filter(f.filter) f.aggregator.Aggregate(measurement, fAttr) } @@ -90,9 +77,6 @@ func (f *filter[N]) Aggregation() metricdata.Aggregation { type precomputedFilter[N int64 | float64] struct { filter attribute.Filter aggregator precomputeAggregator[N] - - sync.Mutex - seen map[attribute.Set]attribute.Set } // newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg @@ -104,21 +88,13 @@ func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn att return &precomputedFilter[N]{ filter: fn, aggregator: agg, - seen: make(map[attribute.Set]attribute.Set), } } // Aggregate records the measurement, scoped by attr, and aggregates it // into an aggregation. func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) { - // TODO (#3006): drop stale attributes from seen. - f.Lock() - defer f.Unlock() - fAttr, ok := f.seen[attr] - if !ok { - fAttr, _ = attr.Filter(f.filter) - f.seen[attr] = fAttr - } + fAttr, _ := attr.Filter(f.filter) if fAttr.Equals(&attr) { // No filtering done. f.aggregator.Aggregate(measurement, fAttr)