diff --git a/CHANGELOG.md b/CHANGELOG.md index 258c09bda9f7..0234ea5e921e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Count the Collect time in the PeriodicReader timeout. (#4221) - `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272) - `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272) +- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290) ### Fixed diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 50a59697e163..1d1a7b0320c3 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -255,10 +255,12 @@ type precomputedDeltaSum[N int64 | float64] struct { // collection cycle, and the unfiltered-sum is kept for the next collection // cycle. func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { + newReported := make(map[attribute.Set]N) s.Lock() defer s.Unlock() if len(s.values) == 0 { + s.reported = newReported return nil } @@ -277,16 +279,12 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { Time: t, Value: delta, }) - if delta != 0 { - s.reported[attr] = v - } - value.filtered = N(0) - s.values[attr] = value - // TODO (#3006): This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. + newReported[attr] = v + // Unused attribute sets do not report. + delete(s.values, attr) } + // Unused attribute sets are forgotten. + s.reported = newReported // The delta collection cycle resets. s.start = t return out @@ -349,12 +347,8 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { Time: t, Value: value.measured + value.filtered, }) - value.filtered = N(0) - s.values[attr] = value - // TODO (#3006): This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. + // Unused attribute sets do not report. + delete(s.values, attr) } return out } diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index 3d206df5412a..ee66a382ce95 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -180,10 +180,9 @@ func TestPreComputedDeltaSum(t *testing.T) { opt := metricdatatest.IgnoreTimestamp() metricdatatest.AssertAggregationsEqual(t, want, got, opt) - // Delta values should zero. + // No observation means no metric data got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) got = agg.Aggregation() @@ -193,13 +192,8 @@ func TestPreComputedDeltaSum(t *testing.T) { // Filtered values should not persist. got = agg.Aggregation() - // measured(+): 1, previous(-): 2, filtered(+): 0 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -1)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - got = agg.Aggregation() - // measured(+): 1, previous(-): 1, filtered(+): 0 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + // No observation means no metric data + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) // Override set value. agg.Aggregate(2, attrs) @@ -208,8 +202,8 @@ func TestPreComputedDeltaSum(t *testing.T) { agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() - // measured(+): 5, previous(-): 1, filtered(+): 13 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)} + // measured(+): 5, previous(-): 8, filtered(+): 13 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) // Filtered values should not persist. @@ -251,19 +245,18 @@ func TestPreComputedCumulativeSum(t *testing.T) { opt := metricdatatest.IgnoreTimestamp() metricdatatest.AssertAggregationsEqual(t, want, got, opt) - // Cumulative values should persist. + // Cumulative values should not persist. got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)} + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) // Filtered values should not persist. got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) // Override set value. agg.Aggregate(5, attrs) @@ -276,8 +269,7 @@ func TestPreComputedCumulativeSum(t *testing.T) { // Filtered values should not persist. got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 5)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) // Order should not affect measure. // Filtered should add. @@ -287,9 +279,6 @@ func TestPreComputedCumulativeSum(t *testing.T) { got = agg.Aggregation() want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) - got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 7)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) } func TestEmptySumNilAggregation(t *testing.T) { diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 4fea8fccf90e..0ab1199a6a4c 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -1692,8 +1692,7 @@ func TestObservableExample(t *testing.T) { Temporality: temporality, IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ - // Thread 1 remains at last measured value. - {Attributes: thread1, Value: 60}, + // Thread 1 is no longer exported. {Attributes: thread2, Value: 53}, {Attributes: thread3, Value: 5}, }, @@ -1767,8 +1766,7 @@ func TestObservableExample(t *testing.T) { Temporality: temporality, IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ - // Thread 1 remains at last measured value. - {Attributes: thread1, Value: 0}, + // Thread 1 is no longer exported. {Attributes: thread2, Value: 6}, {Attributes: thread3, Value: 5}, },