Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes instruments uniqueness in pipeline. #3071

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 22 additions & 14 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"sync"

Expand All @@ -32,9 +33,12 @@ type aggregator interface {
Aggregation() metricdata.Aggregation
}
type instrumentKey struct {
name string
name string
unit unit.Unit
}
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
type instrumentValue struct {
description string
unit unit.Unit
aggregator aggregator
}

func newPipeline(res *resource.Resource) *pipeline {
Expand All @@ -43,7 +47,7 @@ func newPipeline(res *resource.Resource) *pipeline {
}
return &pipeline{
resource: res,
aggregations: make(map[instrumentation.Scope]map[instrumentKey]aggregator),
aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue),
}
}

Expand All @@ -56,31 +60,35 @@ type pipeline struct {
resource *resource.Resource

sync.Mutex
aggregations map[instrumentation.Scope]map[instrumentKey]aggregator
aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue
callbacks []func(context.Context)
}

var errAlreadyRegistered = errors.New("instrument already registered")

// addAggregator will stores an aggregator with an instrument description. The aggregator
// is used when `produce()` is called.
func (p *pipeline) addAggregator(scope instrumentation.Scope, name, description string, instUnit unit.Unit, agg aggregator) error {
p.Lock()
defer p.Unlock()
if p.aggregations == nil {
p.aggregations = map[instrumentation.Scope]map[instrumentKey]aggregator{}
p.aggregations = map[instrumentation.Scope]map[instrumentKey]instrumentValue{}
}
if p.aggregations[scope] == nil {
p.aggregations[scope] = map[instrumentKey]aggregator{}
p.aggregations[scope] = map[instrumentKey]instrumentValue{}
}
inst := instrumentKey{
name: name,
description: description,
unit: instUnit,
name: name,
unit: instUnit,
}
if _, ok := p.aggregations[scope][inst]; ok {
return fmt.Errorf("instrument already registered: name %s, scope: %s", name, scope)
return fmt.Errorf("%w: name %s, scope: %s", errAlreadyRegistered, name, scope)
}

p.aggregations[scope][inst] = agg
p.aggregations[scope][inst] = instrumentValue{
description: description,
aggregator: agg,
}
return nil
}

Expand Down Expand Up @@ -110,12 +118,12 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
for scope, instruments := range p.aggregations {
metrics := make([]metricdata.Metrics, 0, len(instruments))
for inst, aggregation := range instruments {
data := aggregation.Aggregation()
for inst, instValue := range instruments {
data := instValue.aggregator.Aggregation()
if data != nil {
metrics = append(metrics, metricdata.Metrics{
Name: inst.name,
Description: inst.description,
Description: instValue.description,
Unit: inst.unit,
Data: data,
})
Expand Down
102 changes: 90 additions & 12 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,96 @@ func TestNewPipeline(t *testing.T) {
}

func TestPipelineDuplicateRegistration(t *testing.T) {
pipe := newPipeline(nil)

err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
require.NoError(t, err)

err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
assert.Error(t, err)

output, err := pipe.produce(context.Background())
assert.NoError(t, err)
require.Len(t, output.ScopeMetrics, 1)
require.Len(t, output.ScopeMetrics[0].Metrics, 1)
type instrumentID struct {
scope instrumentation.Scope
name string
description string
unit unit.Unit
}
testCases := []struct {
name string
secondInst instrumentID
want error
wantScopeLen int
wantMetricsLen int
}{
{
name: "exact should error",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "name",
description: "desc",
unit: unit.Dimensionless,
},
want: errAlreadyRegistered,
wantScopeLen: 1,
wantMetricsLen: 1,
},
{
name: "description should not be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "name",
description: "other desc",
unit: unit.Dimensionless,
},
want: errAlreadyRegistered,
wantScopeLen: 1,
wantMetricsLen: 1,
},
{
name: "scope should be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{
Name: "newScope",
},
name: "name",
description: "desc",
unit: unit.Dimensionless,
},
wantScopeLen: 2,
wantMetricsLen: 1,
},
{
name: "name should be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "newName",
description: "desc",
unit: unit.Dimensionless,
},
wantScopeLen: 1,
wantMetricsLen: 2,
},
{
name: "unit should be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "name",
description: "desc",
unit: unit.Bytes,
},
wantScopeLen: 1,
wantMetricsLen: 2,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
pipe := newPipeline(nil)
err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
require.NoError(t, err)

err = pipe.addAggregator(tt.secondInst.scope, tt.secondInst.name, tt.secondInst.description, tt.secondInst.unit, testSumAggregator{})
assert.ErrorIs(t, err, tt.want)

if tt.wantScopeLen > 0 {
output, err := pipe.produce(context.Background())
assert.NoError(t, err)
require.Len(t, output.ScopeMetrics, tt.wantScopeLen)
require.Len(t, output.ScopeMetrics[0].Metrics, tt.wantMetricsLen)
}
})
}
}

func TestPipelineUsesResource(t *testing.T) {
Expand Down