From b8ae2721edae79e352300a54af6c698a6288e154 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 15 Nov 2021 12:50:13 -0800 Subject: [PATCH] Remove metric aggregator Subtract interface (#2350) * Remove metric aggregator Subtract interface * Apply suggestions from code review Co-authored-by: Georg Pirklbauer * Apply suggestions from code review Co-authored-by: Tyler Yahn * make generate * update changelog Co-authored-by: Georg Pirklbauer Co-authored-by: Tyler Yahn --- CHANGELOG.md | 4 + sdk/export/metric/aggregation/aggregation.go | 5 +- sdk/export/metric/metric.go | 10 -- sdk/metric/aggregator/sum/sum.go | 17 -- sdk/metric/processor/basic/basic.go | 77 ++++----- sdk/metric/processor/basic/basic_test.go | 155 +++++++++---------- 6 files changed, 109 insertions(+), 159 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e5284adf9cc..90698867fa5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Removed + +- Remove the metric Processor's ability to convert cumulative to delta aggregation temporality. (#2350) + ## [1.2.0] - 2021-11-12 ### Changed diff --git a/sdk/export/metric/aggregation/aggregation.go b/sdk/export/metric/aggregation/aggregation.go index 51d3bd31c45..6d0f45e87cd 100644 --- a/sdk/export/metric/aggregation/aggregation.go +++ b/sdk/export/metric/aggregation/aggregation.go @@ -125,7 +125,10 @@ var ( ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument") ErrNaNInput = fmt.Errorf("NaN value is an invalid input") ErrInconsistentType = fmt.Errorf("inconsistent aggregator types") - ErrNoSubtraction = fmt.Errorf("aggregator does not subtract") + + // ErrNoCumulativeToDelta is returned when requesting delta + // export kind for a precomputed sum instrument. + ErrNoCumulativeToDelta = fmt.Errorf("cumulative to delta not implemented") // ErrNoData is returned when (due to a race with collection) // the Aggregator is check-pointed before the first value is set. diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index f077f74013f..65f99911fa6 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -193,16 +193,6 @@ type Aggregator interface { Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error } -// Subtractor is an optional interface implemented by some -// Aggregators. An Aggregator must support `Subtract()` in order to -// be configured for a Precomputed-Sum instrument (CounterObserver, -// UpDownCounterObserver) using a DeltaExporter. -type Subtractor interface { - // Subtract subtracts the `operand` from this Aggregator and - // outputs the value in `result`. - Subtract(operand, result Aggregator, descriptor *sdkapi.Descriptor) error -} - // Exporter handles presentation of the checkpoint of aggregate // metrics. This is the final stage of a metrics export pipeline, // where metric data are formatted for a specific system. diff --git a/sdk/metric/aggregator/sum/sum.go b/sdk/metric/aggregator/sum/sum.go index 26390a61015..8c1d11c715d 100644 --- a/sdk/metric/aggregator/sum/sum.go +++ b/sdk/metric/aggregator/sum/sum.go @@ -32,7 +32,6 @@ type Aggregator struct { } var _ export.Aggregator = &Aggregator{} -var _ export.Subtractor = &Aggregator{} var _ aggregation.Sum = &Aggregator{} // New returns a new counter aggregator implemented by atomic @@ -88,19 +87,3 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error c.value.AddNumber(desc.NumberKind(), o.value) return nil } - -func (c *Aggregator) Subtract(opAgg, resAgg export.Aggregator, descriptor *sdkapi.Descriptor) error { - op, _ := opAgg.(*Aggregator) - if op == nil { - return aggregator.NewInconsistentAggregatorError(c, opAgg) - } - - res, _ := resAgg.(*Aggregator) - if res == nil { - return aggregator.NewInconsistentAggregatorError(c, resAgg) - } - - res.value = c.value - res.value.AddNumber(descriptor.NumberKind(), number.NewNumberSignChange(descriptor.NumberKind(), op.value)) - return nil -} diff --git a/sdk/metric/processor/basic/basic.go b/sdk/metric/processor/basic/basic.go index 7e2fd26320a..7ea491cf417 100644 --- a/sdk/metric/processor/basic/basic.go +++ b/sdk/metric/processor/basic/basic.go @@ -76,11 +76,6 @@ type ( // values in a single collection round. current export.Aggregator - // delta, if non-nil, refers to an Aggregator owned by - // the processor used to compute deltas between - // precomputed sums. - delta export.Aggregator - // cumulative, if non-nil, refers to an Aggregator owned // by the processor used to store the last cumulative // value. @@ -94,9 +89,6 @@ type ( sync.RWMutex values map[stateKey]*stateValue - // Note: the timestamp logic currently assumes all - // exports are deltas. - processStart time.Time intervalStart time.Time intervalEnd time.Time @@ -124,8 +116,8 @@ var ErrInvalidTemporality = fmt.Errorf("invalid aggregation temporality") // New returns a basic Processor that is also a Checkpointer using the provided // AggregatorSelector to select Aggregators. The TemporalitySelector // is consulted to determine the kind(s) of exporter that will consume -// data, so that this Processor can prepare to compute Delta or -// Cumulative Aggregations as needed. +// data, so that this Processor can prepare to compute Cumulative Aggregations +// as needed. func New(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) *Processor { return NewFactory(aselector, tselector, opts...).NewCheckpointer().(*Processor) } @@ -191,13 +183,17 @@ func (b *Processor) Process(accum export.Accumulation) error { } if stateful { if desc.InstrumentKind().PrecomputedSum() { - // If we know we need to compute deltas, allocate two aggregators. - b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta) - } else { - // In this case we are certain not to need a delta, only allocate - // a cumulative aggregator. - b.AggregatorFor(desc, &newValue.cumulative) + // To convert precomputed sums to + // deltas requires two aggregators to + // be allocated, one for the prior + // value and one for the output delta. + // This functionality was removed from + // the basic processor in PR #2350. + return aggregation.ErrNoCumulativeToDelta } + // In this case allocate one aggregator to + // save the current state. + b.AggregatorFor(desc, &newValue.cumulative) } b.state.values[key] = newValue return nil @@ -310,28 +306,15 @@ func (b *Processor) FinishCollection() error { continue } - // Update Aggregator state to support exporting either a - // delta or a cumulative aggregation. - var err error - if mkind.PrecomputedSum() { - if currentSubtractor, ok := value.current.(export.Subtractor); ok { - // This line is equivalent to: - // value.delta = currentSubtractor - value.cumulative - err = currentSubtractor.Subtract(value.cumulative, value.delta, key.descriptor) - - if err == nil { - err = value.current.SynchronizedMove(value.cumulative, key.descriptor) - } - } else { - err = aggregation.ErrNoSubtraction - } - } else { + // The only kind of aggregators that are not stateless + // are the ones needing delta to cumulative + // conversion. Merge aggregator state in this case. + if !mkind.PrecomputedSum() { // This line is equivalent to: - // value.cumulative = value.cumulative + value.delta - err = value.cumulative.Merge(value.current, key.descriptor) - } - if err != nil { - return err + // value.cumulative = value.cumulative + value.current + if err := value.cumulative.Merge(value.current, key.descriptor); err != nil { + return err + } } } return nil @@ -350,13 +333,8 @@ func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export. var agg aggregation.Aggregation var start time.Time - // If the processor does not have Config.Memory and it was not updated - // in the prior round, do not visit this value. - if !b.config.Memory && value.updated != (b.finishedCollection-1) { - continue - } - aggTemp := exporter.TemporalityFor(key.descriptor, value.current.Aggregation().Kind()) + switch aggTemp { case aggregation.CumulativeTemporality: // If stateful, the sum has been computed. If stateless, the @@ -372,16 +350,23 @@ func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export. case aggregation.DeltaTemporality: // Precomputed sums are a special case. if mkind.PrecomputedSum() { - agg = value.delta.Aggregation() - } else { - agg = value.current.Aggregation() + // This functionality was removed from + // the basic processor in PR #2350. + return aggregation.ErrNoCumulativeToDelta } + agg = value.current.Aggregation() start = b.intervalStart default: return fmt.Errorf("%v: %w", aggTemp, ErrInvalidTemporality) } + // If the processor does not have Config.Memory and it was not updated + // in the prior round, do not visit this value. + if !b.config.Memory && value.updated != (b.finishedCollection-1) { + continue + } + if err := f(export.NewRecord( key.descriptor, value.labels, diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index c972a963fb6..ced1b321759 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -125,6 +125,17 @@ func testProcessor( nkind number.Kind, akind aggregation.Kind, ) { + // This code tests for errors when the export kind is Delta + // and the instrument kind is PrecomputedSum(). + expectConversion := !(aggTemp == aggregation.DeltaTemporality && mkind.PrecomputedSum()) + requireConversion := func(t *testing.T, err error) { + if expectConversion { + require.NoError(t, err) + } else { + require.Equal(t, aggregation.ErrNoCumulativeToDelta, err) + } + } + // Note: this selector uses the instrument name to dictate // aggregation kind. selector := processorTest.AggregatorSelector() @@ -154,22 +165,13 @@ func testProcessor( processor.StartCollection() for na := 0; na < nAccum; na++ { - _ = processor.Process(updateFor(t, &desc1, selector, input, labs1...)) - _ = processor.Process(updateFor(t, &desc2, selector, input, labs2...)) + requireConversion(t, processor.Process(updateFor(t, &desc1, selector, input, labs1...))) + requireConversion(t, processor.Process(updateFor(t, &desc2, selector, input, labs2...))) } - err := processor.FinishCollection() - if err == aggregation.ErrNoSubtraction { - var subr export.Aggregator - selector.AggregatorFor(&desc1, &subr) - _, canSub := subr.(export.Subtractor) - - // Allow unsupported subraction case only when it is called for. - require.True(t, mkind.PrecomputedSum() && aggTemp == aggregation.DeltaTemporality && !canSub) - return - } else if err != nil { - t.Fatal("unexpected FinishCollection error: ", err) - } + // Note: in case of !expectConversion, we still get no error here + // because the Process() skipped entering state for those records. + require.NoError(t, processor.FinishCollection()) if nc < nCheckpoint-1 { continue @@ -182,19 +184,18 @@ func testProcessor( // We're repeating the test after another // interval with no updates. processor.StartCollection() - if err := processor.FinishCollection(); err != nil { - t.Fatal("unexpected collection error: ", err) - } + require.NoError(t, processor.FinishCollection()) } // Test the final checkpoint state. records1 := processorTest.NewOutput(attribute.DefaultEncoder()) - err = reader.ForEach(aggregation.ConstantTemporalitySelector(aggTemp), records1.AddRecord) + require.NoError(t, reader.ForEach(aggregation.ConstantTemporalitySelector(aggTemp), records1.AddRecord)) - // Test for an allowed error: - if err != nil && err != aggregation.ErrNoSubtraction { - t.Fatal("unexpected checkpoint error: ", err) + if !expectConversion { + require.EqualValues(t, map[string]float64{}, records1.Map()) + continue } + var multiplier int64 if mkind.Asynchronous() { @@ -202,9 +203,8 @@ func testProcessor( // number of Accumulators, unless LastValue aggregation. // If a precomputed sum, we expect cumulative inputs. if mkind.PrecomputedSum() { - if aggTemp == aggregation.DeltaTemporality && akind != aggregation.LastValueKind { - multiplier = int64(nAccum) - } else if akind == aggregation.LastValueKind { + require.NotEqual(t, aggTemp, aggregation.DeltaTemporality) + if akind == aggregation.LastValueKind { multiplier = cumulativeMultiplier } else { multiplier = cumulativeMultiplier * int64(nAccum) @@ -401,72 +401,44 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { } } -func TestStatefulNoMemoryDelta(t *testing.T) { - aggTempSel := aggregation.DeltaTemporalitySelector() - - desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind) - selector := processorTest.AggregatorSelector() - - processor := basic.New(selector, aggTempSel, basic.WithMemory(false)) - reader := processor.Reader() - - for i := 1; i < 3; i++ { - // Empty interval - processor.StartCollection() - require.NoError(t, processor.FinishCollection()) - - // Verify zero elements - records := processorTest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) - require.EqualValues(t, map[string]float64{}, records.Map()) - - // Add 10 - processor.StartCollection() - _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) - require.NoError(t, processor.FinishCollection()) - - // Verify one element - records = processorTest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) - require.EqualValues(t, map[string]float64{ - "inst.sum/A=B/": 10, - }, records.Map()) - } -} - func TestMultiObserverSum(t *testing.T) { - for _, aggTempSel := range []aggregation.TemporalitySelector{ - aggregation.CumulativeTemporalitySelector(), - aggregation.DeltaTemporalitySelector(), + for _, test := range []struct { + name string + aggregation.TemporalitySelector + expectProcessErr error + }{ + {"cumulative", aggregation.CumulativeTemporalitySelector(), nil}, + {"delta", aggregation.DeltaTemporalitySelector(), aggregation.ErrNoCumulativeToDelta}, } { + t.Run(test.name, func(t *testing.T) { + aggTempSel := test.TemporalitySelector + desc := metrictest.NewDescriptor("observe.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind) + selector := processorTest.AggregatorSelector() - desc := metrictest.NewDescriptor("observe.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind) - selector := processorTest.AggregatorSelector() - - processor := basic.New(selector, aggTempSel, basic.WithMemory(false)) - reader := processor.Reader() - - for i := 1; i < 3; i++ { - // Add i*10*3 times - processor.StartCollection() - _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) - _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) - _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) - require.NoError(t, processor.FinishCollection()) + processor := basic.New(selector, aggTempSel, basic.WithMemory(false)) + reader := processor.Reader() - // Multiplier is 1 for deltas, otherwise i. - multiplier := i - if aggTempSel.TemporalityFor(&desc, aggregation.SumKind) == aggregation.DeltaTemporality { - multiplier = 1 + for i := 1; i < 3; i++ { + // Add i*10*3 times + processor.StartCollection() + require.True(t, errors.Is(processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))), test.expectProcessErr)) + require.True(t, errors.Is(processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))), test.expectProcessErr)) + require.True(t, errors.Is(processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))), test.expectProcessErr)) + require.NoError(t, processor.FinishCollection()) + + // Verify one element + records := processorTest.NewOutput(attribute.DefaultEncoder()) + if test.expectProcessErr == nil { + require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) + require.EqualValues(t, map[string]float64{ + "observe.sum/A=B/": float64(3 * 10 * i), + }, records.Map()) + } else { + require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) + require.EqualValues(t, map[string]float64{}, records.Map()) + } } - - // Verify one element - records := processorTest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) - require.EqualValues(t, map[string]float64{ - "observe.sum/A=B/": float64(3 * 10 * multiplier), - }, records.Map()) - } + }) } } @@ -515,6 +487,19 @@ func TestCounterObserverEndToEnd(t *testing.T) { return nil })) + // Try again, but ask for a Delta + require.Equal( + t, + aggregation.ErrNoCumulativeToDelta, + data.ForEach( + aggregation.ConstantTemporalitySelector(aggregation.DeltaTemporality), + func(r export.Record) error { + t.Fail() + return nil + }, + ), + ) + startTime[i] = record.StartTime() endTime[i] = record.EndTime() data.Unlock()