Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have multi-instrument callback return an error #3576

Merged
merged 3 commits into from
Jan 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `InstrumentKindAsyncGauge` is renamed to `InstrumentKindObservableGauge`
- Update the `RegisterCallback` method of the `Meter` in the `go.opentelemetry.io/otel/sdk/metric` package to accept the added `Callback` type instead of an inline function type definition.
The underlying type of a `Callback` is the same `func(context.Context)` that the method used to accept. (#3564)
- The callback function registered with a `Meter` from the `go.opentelemetry.io/otel/metric` package is required to return an error now. (#3576)

### Deprecated

Expand Down
3 changes: 2 additions & 1 deletion example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ func main() {
if err != nil {
log.Fatal(err)
}
_, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) error {
n := -10. + rand.Float64()*(90.) // [-10, 100)
gauge.Observe(ctx, n, attrs...)
return nil
})
if err != nil {
log.Fatal(err)
Expand Down
6 changes: 4 additions & 2 deletions metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ func ExampleMeter_asynchronous_single() {
}

_, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
func(ctx context.Context) {
func(ctx context.Context) error {
// instrument.WithCallbackFunc(func(ctx context.Context) {
//Do Work to get the real memoryUsage
// mem := GatherMemory(ctx)
mem := 75000

memoryUsage.Observe(ctx, int64(mem))
return nil
})
if err != nil {
fmt.Println("Failed to register callback")
Expand All @@ -90,7 +91,7 @@ func ExampleMeter_asynchronous_multiple() {
heapAlloc,
gcCount,
},
func(ctx context.Context) {
func(ctx context.Context) error {
memStats := &runtime.MemStats{}
// This call does work
runtime.ReadMemStats(memStats)
Expand All @@ -100,6 +101,7 @@ func ExampleMeter_asynchronous_multiple() {

// This function synchronously records the pauses
computeGCPauses(ctx, gcPause, memStats.PauseNs[:])
return nil
},
)

Expand Down
3 changes: 1 addition & 2 deletions metric/internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package global // import "go.opentelemetry.io/otel/metric/internal/global"

import (
"container/list"
"context"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -323,7 +322,7 @@ func unwrapInstruments(instruments []instrument.Asynchronous) []instrument.Async

type registration struct {
instruments []instrument.Asynchronous
function func(context.Context)
function metric.Callback

unreg func() error
unregMu sync.Mutex
Expand Down
13 changes: 8 additions & 5 deletions metric/internal/global/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) {
_, _ = mtr.Int64Counter(name)
_, _ = mtr.Int64UpDownCounter(name)
_, _ = mtr.Int64Histogram(name)
_, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
_, _ = mtr.RegisterCallback(nil, func(ctx context.Context) error { return nil })
if !once {
wg.Done()
once = true
Expand All @@ -88,7 +88,7 @@ func TestMeterRace(t *testing.T) {

func TestUnregisterRace(t *testing.T) {
mtr := &meter{}
reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {})
reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) error { return nil })
require.NoError(t, err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -130,8 +130,9 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun
_, err = m.Int64ObservableGauge("test_Async_Gauge")
assert.NoError(t, err)

_, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) error {
afcounter.Observe(ctx, 3)
return nil
})
require.NoError(t, err)

Expand Down Expand Up @@ -324,8 +325,9 @@ func TestRegistrationDelegation(t *testing.T) {
require.NoError(t, err)

var called0 bool
reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) error {
called0 = true
return nil
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "callback not registered")
Expand All @@ -334,8 +336,9 @@ func TestRegistrationDelegation(t *testing.T) {
assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered")

var called1 bool
reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) error {
called1 = true
return nil
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered")
Expand Down
4 changes: 2 additions & 2 deletions metric/internal/global/meter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type testMeter struct {
siUDCount int
siHist int

callbacks []func(context.Context)
callbacks []metric.Callback
}

func (m *testMeter) Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) {
Expand Down Expand Up @@ -145,6 +145,6 @@ func (m *testMeter) collect() {
// Unregister.
continue
}
f(ctx)
_ = f(ctx)
}
}
2 changes: 1 addition & 1 deletion metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type Meter interface {
// the same attributes as another Callback will report.
//
// The function needs to be concurrent safe.
type Callback func(context.Context)
type Callback func(context.Context) error

// Registration is an token representing the unique registration of a callback
// for a set of instruments with a Meter.
Expand Down
64 changes: 43 additions & 21 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func TestMeterInstrumentConcurrency(t *testing.T) {
wg.Wait()
}

var emptyCallback metric.Callback = func(ctx context.Context) error { return nil }

// A Meter Should be able register Callbacks Concurrently.
func TestMeterCallbackCreationConcurrency(t *testing.T) {
wg := &sync.WaitGroup{}
Expand All @@ -103,19 +105,19 @@ func TestMeterCallbackCreationConcurrency(t *testing.T) {
m := NewMeterProvider().Meter("callback-concurrency")

go func() {
_, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {})
_, _ = m.RegisterCallback([]instrument.Asynchronous{}, emptyCallback)
wg.Done()
}()
go func() {
_, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {})
_, _ = m.RegisterCallback([]instrument.Asynchronous{}, emptyCallback)
wg.Done()
}()
wg.Wait()
}

func TestNoopCallbackUnregisterConcurrency(t *testing.T) {
m := NewMeterProvider().Meter("noop-unregister-concurrency")
reg, err := m.RegisterCallback(nil, func(ctx context.Context) {})
reg, err := m.RegisterCallback(nil, emptyCallback)
require.NoError(t, err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -143,11 +145,11 @@ func TestCallbackUnregisterConcurrency(t *testing.T) {
require.NoError(t, err)

i := []instrument.Asynchronous{actr}
regCtr, err := meter.RegisterCallback(i, func(ctx context.Context) {})
regCtr, err := meter.RegisterCallback(i, emptyCallback)
require.NoError(t, err)

i = []instrument.Asynchronous{ag}
regG, err := meter.RegisterCallback(i, func(ctx context.Context) {})
regG, err := meter.RegisterCallback(i, emptyCallback)
require.NoError(t, err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -183,8 +185,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
ctr, err := m.Int64ObservableCounter("aint", instrument.WithInt64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 3)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -212,8 +215,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
ctr, err := m.Int64ObservableUpDownCounter("aint", instrument.WithInt64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 11)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -241,8 +245,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
gauge, err := m.Int64ObservableGauge("agauge", instrument.WithInt64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) error {
gauge.Observe(ctx, 11)
return nil
})
assert.NoError(t, err)

Expand All @@ -268,8 +273,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
ctr, err := m.Float64ObservableCounter("afloat", instrument.WithFloat64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 3)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -297,8 +303,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
ctr, err := m.Float64ObservableUpDownCounter("afloat", instrument.WithFloat64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 11)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -326,8 +333,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
gauge, err := m.Float64ObservableGauge("agauge", instrument.WithFloat64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) error {
gauge.Observe(ctx, 11)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -501,16 +509,18 @@ func TestMetersProvideScope(t *testing.T) {
m1 := mp.Meter("scope1")
ctr1, err := m1.Float64ObservableCounter("ctr1")
assert.NoError(t, err)
_, err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) {
_, err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) error {
ctr1.Observe(ctx, 5)
return nil
})
assert.NoError(t, err)

m2 := mp.Meter("scope2")
ctr2, err := m2.Int64ObservableCounter("ctr2")
assert.NoError(t, err)
_, err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) {
_, err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) error {
ctr2.Observe(ctx, 7)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -594,7 +604,10 @@ func TestUnregisterUnregisters(t *testing.T) {
floag64Counter,
floag64UpDownCounter,
floag64Gauge,
}, func(context.Context) { called = true })
}, func(context.Context) error {
called = true
return nil
})
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -644,7 +657,10 @@ func TestRegisterCallbackDropAggregations(t *testing.T) {
floag64Counter,
floag64UpDownCounter,
floag64Gauge,
}, func(context.Context) { called = true })
}, func(context.Context) error {
called = true
return nil
})
require.NoError(t, err)

data, err := r.Collect(context.Background())
Expand All @@ -669,9 +685,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -696,9 +713,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -723,9 +741,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -748,9 +767,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -775,9 +795,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -802,9 +823,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand Down
4 changes: 3 additions & 1 deletion sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(metric.Callback)
f(ctx)
if err := f(ctx); err != nil {
errs.append(err)
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
return metricdata.ResourceMetrics{}, err
Expand Down