Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate instrument registration to return same instrument #3238

Closed
wants to merge 14 commits into from
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Upgrade `golang.org/x/sys/unix` from `v0.0.0-20210423185535-09eb48e85fd7` to `v0.0.0-20220919091848-fb04ddd9f9c8`.
This addresses [GO-2022-0493](https://pkg.go.dev/vuln/GO-2022-0493). (#3235)

### Fixed

- Return the same instrument for equivalent creation calls. (#3229, #3238)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22

### Changed
Expand Down
52 changes: 52 additions & 0 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import "sync"

// cache is a locking storage used to quickly return already computed values.
//
// The zero value of a cache is empty and ready to use.
//
// A cache must not be copied after first use.
//
// All methods of a cache are safe to call concurrently.
type cache[K comparable, V any] struct {
sync.Mutex
data map[K]V
}

// Lookup returns the value stored in the cache with the accociated key if it
// exists. Otherwise, f is called and its returned value is set in the cache
// for key and returned.
//
// Lookup is safe to call concurrently. It will hold the cache lock, so f
// should not block excessively.
func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.Lock()
defer c.Unlock()

if c.data == nil {
val := f()
c.data = map[K]V{key: val}
return val
}
if v, ok := c.data[key]; ok {
return v
}
val := f()
c.data[key] = val
return val
}
39 changes: 39 additions & 0 deletions sdk/metric/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCache(t *testing.T) {
k0, k1 := "one", "two"
v0, v1 := 1, 2

c := cache[string, int]{}

var got int
require.NotPanics(t, func() {
got = c.Lookup(k0, func() int { return v0 })
}, "zero-value cache panics on Lookup")
assert.Equal(t, v0, got, "zero-value cache did not return fallback")

assert.Equal(t, v0, c.Lookup(k0, func() int { return v1 }), "existing key")

assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key")
}
13 changes: 9 additions & 4 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,22 @@ type meter struct {
instrumentation.Scope

pipes pipelines
cache cache[instrumentID, any]
}

// Compile-time check meter implements metric.Meter.
var _ metric.Meter = (*meter)(nil)

// AsyncInt64 returns the asynchronous integer instrument provider.
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
c := newInstrumentCache[int64](&m.cache)
return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)}
}

// AsyncFloat64 returns the asynchronous floating-point instrument provider.
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
c := newInstrumentCache[float64](&m.cache)
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)}
}

// RegisterCallback registers the function f to be called when any of the
Expand All @@ -108,10 +111,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context

// SyncInt64 returns the synchronous integer instrument provider.
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
c := newInstrumentCache[int64](&m.cache)
return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)}
}

// SyncFloat64 returns the synchronous floating-point instrument provider.
func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
c := newInstrumentCache[float64](&m.cache)
return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)}
}
73 changes: 63 additions & 10 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var (
errCreatingAggregators = errors.New("could not create all aggregators")
errIncompatibleAggregation = errors.New("incompatible aggregation")
errUnknownAggregation = errors.New("unrecognized aggregation")

errCacheNumberConflict = errors.New("instrument already exists: conflicting number type")
)

type aggregator interface {
Expand Down Expand Up @@ -345,31 +347,82 @@ func (p pipelines) registerCallback(fn func(context.Context)) {
// measurements with while updating all pipelines that need to pull from those
// aggregations.
type resolver[N int64 | float64] struct {
cache instrumentCache[N]
inserters []*inserter[N]
}

func newResolver[N int64 | float64](p pipelines) *resolver[N] {
func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) *resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter[N](p[i])
}
return &resolver[N]{in}
return &resolver[N]{cache: c, inserters: in}
}

// Aggregators returns the Aggregators instrument inst needs to update when it
// makes a measurement.
func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
var aggs []internal.Aggregator[N]
id := instrumentID{
scope: inst.Scope,
name: inst.Name,
description: inst.Description,
}

errs := &multierror{}
for _, i := range r.inserters {
a, err := i.Instrument(inst, instUnit)
if err != nil {
errs.append(err)
return r.cache.Lookup(id, func() ([]internal.Aggregator[N], error) {
var aggs []internal.Aggregator[N]
errs := &multierror{}
for _, i := range r.inserters {
a, err := i.Instrument(inst, instUnit)
if err != nil {
errs.append(err)
}
aggs = append(aggs, a...)
}
aggs = append(aggs, a...)
return aggs, errs.errorOrNil()
})
}

// resolvedAggregators is the result of resolving aggregators for an instrument.
type resolvedAggregators[N int64 | float64] struct {
aggregators []internal.Aggregator[N]
err error
}

type instrumentCache[N int64 | float64] struct {
cache *cache[instrumentID, any]
}

func newInstrumentCache[N int64 | float64](c *cache[instrumentID, any]) instrumentCache[N] {
if c == nil {
c = &cache[instrumentID, any]{}
}
return aggs, errs.errorOrNil()
return instrumentCache[N]{cache: c}
}

// Lookup returns the Aggregators and error for a cached instrumentID if they
// exist in the cache. Otherwise, f is called and its returned values are set
// in the cache and returned.
//
// If an instrumentID has been stored in the cache for a different N, an error
// is returned describing the conflict.
//
// Lookup is safe to call concurrently.
func (c instrumentCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) {
vAny := c.cache.Lookup(key, func() any {
a, err := f()
return &resolvedAggregators[N]{
aggregators: a,
err: err,
}
})

switch v := vAny.(type) {
case *resolvedAggregators[N]:
aggs, err = v.aggregators, v.err
default:
err = errCacheNumberConflict
}
return aggs, err
}

type multierror struct {
Expand Down
12 changes: 6 additions & 6 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) {
inst := view.Instrument{Name: "foo", Kind: view.SyncCounter}

r := newResolver[int64](p)
r := newResolver(p, newInstrumentCache[int64](nil))
aggs, err := r.Aggregators(inst, unit.Dimensionless)
assert.NoError(t, err)

Expand All @@ -344,7 +344,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo
func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) {
inst := view.Instrument{Name: "foo", Kind: view.SyncCounter}

r := newResolver[float64](p)
r := newResolver(p, newInstrumentCache[float64](nil))
aggs, err := r.Aggregators(inst, unit.Dimensionless)
assert.NoError(t, err)

Expand Down Expand Up @@ -375,14 +375,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
p := newPipelines(resource.Empty(), views)
inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge}

ri := newResolver[int64](p)
ri := newResolver(p, newInstrumentCache[int64](nil))
intAggs, err := ri.Aggregators(inst, unit.Dimensionless)
assert.Error(t, err)
assert.Len(t, intAggs, 0)

p = newPipelines(resource.Empty(), views)

rf := newResolver[float64](p)
rf := newResolver(p, newInstrumentCache[float64](nil))
floatAggs, err := rf.Aggregators(inst, unit.Dimensionless)
assert.Error(t, err)
assert.Len(t, floatAggs, 0)
Expand All @@ -405,7 +405,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) {

p := newPipelines(resource.Empty(), views)

ri := newResolver[int64](p)
ri := newResolver(p, newInstrumentCache[int64](nil))
intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless)
assert.NoError(t, err)
assert.Len(t, intAggs, 1)
Expand All @@ -416,7 +416,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) {
assert.Len(t, intAggs, 2)

// Creating a float foo instrument should error because there is an int foo instrument.
rf := newResolver[float64](p)
rf := newResolver(p, newInstrumentCache[float64](nil))
floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless)
assert.Error(t, err)
assert.Len(t, floatAggs, 1)
Expand Down
28 changes: 28 additions & 0 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
Expand Down Expand Up @@ -211,3 +212,30 @@ func TestPipelineConcurrency(t *testing.T) {
}
wg.Wait()
}

func TestInstrumentCacheNumberConflict(t *testing.T) {
c := cache[instrumentID, any]{}

key := instrumentID{
scope: instrumentation.Scope{Name: "scope name"},
name: "name",
description: "description",
}
aggs := []internal.Aggregator[int64]{internal.NewCumulativeSum[int64](true)}

instCachI := newInstrumentCache[int64](&c)
gotI, err := instCachI.Lookup(key, func() ([]internal.Aggregator[int64], error) {
return aggs, nil
})
require.NoError(t, err)
require.Equal(t, aggs, gotI)

instCachF := newInstrumentCache[float64](&c)
gotF, err := instCachF.Lookup(key, func() ([]internal.Aggregator[float64], error) {
return []internal.Aggregator[float64]{
internal.NewCumulativeSum[float64](true),
}, nil
})
assert.ErrorIs(t, err, errCacheNumberConflict)
assert.Nil(t, gotF, "cache conflict should not return a value")
}