Skip to content

Commit

Permalink
Changes instruments uniqueness in pipeline. (#3071)
Browse files Browse the repository at this point in the history
* Changes instruments uniquness in pipeline.

* Fix lint

* Update sdk/metric/pipeline.go

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
MadVikingGod and MrAlias committed Aug 12, 2022
1 parent 1c08d20 commit a33d917
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 26 deletions.
36 changes: 22 additions & 14 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ type aggregator interface {
Aggregation() metricdata.Aggregation
}
type instrumentKey struct {
name string
name string
unit unit.Unit
}

type instrumentValue struct {
description string
unit unit.Unit
aggregator aggregator
}

func newPipeline(res *resource.Resource) *pipeline {
Expand All @@ -48,7 +52,7 @@ func newPipeline(res *resource.Resource) *pipeline {
}
return &pipeline{
resource: res,
aggregations: make(map[instrumentation.Scope]map[instrumentKey]aggregator),
aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue),
}
}

Expand All @@ -61,31 +65,35 @@ type pipeline struct {
resource *resource.Resource

sync.Mutex
aggregations map[instrumentation.Scope]map[instrumentKey]aggregator
aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue
callbacks []func(context.Context)
}

var errAlreadyRegistered = errors.New("instrument already registered")

// addAggregator will stores an aggregator with an instrument description. The aggregator
// is used when `produce()` is called.
func (p *pipeline) addAggregator(scope instrumentation.Scope, name, description string, instUnit unit.Unit, agg aggregator) error {
p.Lock()
defer p.Unlock()
if p.aggregations == nil {
p.aggregations = map[instrumentation.Scope]map[instrumentKey]aggregator{}
p.aggregations = map[instrumentation.Scope]map[instrumentKey]instrumentValue{}
}
if p.aggregations[scope] == nil {
p.aggregations[scope] = map[instrumentKey]aggregator{}
p.aggregations[scope] = map[instrumentKey]instrumentValue{}
}
inst := instrumentKey{
name: name,
description: description,
unit: instUnit,
name: name,
unit: instUnit,
}
if _, ok := p.aggregations[scope][inst]; ok {
return fmt.Errorf("instrument already registered: name %s, scope: %s", name, scope)
return fmt.Errorf("%w: name %s, scope: %s", errAlreadyRegistered, name, scope)
}

p.aggregations[scope][inst] = agg
p.aggregations[scope][inst] = instrumentValue{
description: description,
aggregator: agg,
}
return nil
}

Expand Down Expand Up @@ -115,12 +123,12 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
for scope, instruments := range p.aggregations {
metrics := make([]metricdata.Metrics, 0, len(instruments))
for inst, agg := range instruments {
data := agg.Aggregation()
for inst, instValue := range instruments {
data := instValue.aggregator.Aggregation()
if data != nil {
metrics = append(metrics, metricdata.Metrics{
Name: inst.name,
Description: inst.description,
Description: instValue.description,
Unit: inst.unit,
Data: data,
})
Expand Down
102 changes: 90 additions & 12 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,96 @@ func TestNewPipeline(t *testing.T) {
}

func TestPipelineDuplicateRegistration(t *testing.T) {
pipe := newPipeline(nil)

err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
require.NoError(t, err)

err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
assert.Error(t, err)

output, err := pipe.produce(context.Background())
assert.NoError(t, err)
require.Len(t, output.ScopeMetrics, 1)
require.Len(t, output.ScopeMetrics[0].Metrics, 1)
type instrumentID struct {
scope instrumentation.Scope
name string
description string
unit unit.Unit
}
testCases := []struct {
name string
secondInst instrumentID
want error
wantScopeLen int
wantMetricsLen int
}{
{
name: "exact should error",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "name",
description: "desc",
unit: unit.Dimensionless,
},
want: errAlreadyRegistered,
wantScopeLen: 1,
wantMetricsLen: 1,
},
{
name: "description should not be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "name",
description: "other desc",
unit: unit.Dimensionless,
},
want: errAlreadyRegistered,
wantScopeLen: 1,
wantMetricsLen: 1,
},
{
name: "scope should be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{
Name: "newScope",
},
name: "name",
description: "desc",
unit: unit.Dimensionless,
},
wantScopeLen: 2,
wantMetricsLen: 1,
},
{
name: "name should be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "newName",
description: "desc",
unit: unit.Dimensionless,
},
wantScopeLen: 1,
wantMetricsLen: 2,
},
{
name: "unit should be identifying",
secondInst: instrumentID{
scope: instrumentation.Scope{},
name: "name",
description: "desc",
unit: unit.Bytes,
},
wantScopeLen: 1,
wantMetricsLen: 2,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
pipe := newPipeline(nil)
err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
require.NoError(t, err)

err = pipe.addAggregator(tt.secondInst.scope, tt.secondInst.name, tt.secondInst.description, tt.secondInst.unit, testSumAggregator{})
assert.ErrorIs(t, err, tt.want)

if tt.wantScopeLen > 0 {
output, err := pipe.produce(context.Background())
assert.NoError(t, err)
require.Len(t, output.ScopeMetrics, tt.wantScopeLen)
require.Len(t, output.ScopeMetrics[0].Metrics, tt.wantMetricsLen)
}
})
}
}

func TestPipelineUsesResource(t *testing.T) {
Expand Down

0 comments on commit a33d917

Please sign in to comment.