Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create metric API Callback type #3564

Merged
merged 5 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package.
This `Registration` can be used to unregister callbacks. (#3522)
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)
- Add the `Callback` function type to the `go.opentelemetry.io/otel/metric` package.
This new named function type is registered with a `Meter`. (#3564)

### Changed

Expand Down Expand Up @@ -52,6 +54,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `InstrumentKindAsyncCounter` is renamed to `InstrumentKindObservableCounter`
- `InstrumentKindAsyncUpDownCounter` is renamed to `InstrumentKindObservableUpDownCounter`
- `InstrumentKindAsyncGauge` is renamed to `InstrumentKindObservableGauge`
- Update the `RegisterCallback` method of the `Meter` in the `go.opentelemetry.io/otel/sdk/metric` package to accept the added `Callback` type instead of an inline function type definition.
The underlying type of a `Callback` is the same `func(context.Context)` that the method used to accept. (#3564)

### Deprecated

Expand Down
2 changes: 1 addition & 1 deletion metric/internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f metric.Callback) (metric.Registration, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
insts = unwrapInstruments(insts)
return del.RegisterCallback(insts, f)
Expand Down
2 changes: 1 addition & 1 deletion metric/internal/global/meter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (m *testMeter) Float64ObservableGauge(name string, options ...instrument.Op
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f metric.Callback) (metric.Registration, error) {
m.callbacks = append(m.callbacks, f)
return testReg{
f: func(idx int) func() {
Expand Down
15 changes: 14 additions & 1 deletion metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,22 @@ type Meter interface {
//
// If no instruments are passed, f should not be registered nor called
// during collection.
RegisterCallback(instruments []instrument.Asynchronous, f func(context.Context)) (Registration, error)
RegisterCallback(instruments []instrument.Asynchronous, f Callback) (Registration, error)
}

// Callback is a function registered with a Meter that makes observations for
// the set of instruments it is registered with.
//
// The function needs to complete in a finite amount of time and the deadline
// of the passed context is expected to be honored.
//
// The function needs to make unique observations across all registered
// Callbacks. Meaning, it should not report measurements for an instrument with
// the same attributes as another Callback will report.
//
// The function needs to be concurrent safe.
type Callback func(context.Context)

// Registration is an token representing the unique registration of a callback
// for a set of instruments with a Meter.
type Registration interface {
Expand Down
2 changes: 1 addition & 1 deletion metric/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (noopMeter) Float64ObservableGauge(string, ...instrument.Option) (asyncfloa
}

// RegisterCallback creates a register callback that does not record any metrics.
func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) (Registration, error) {
func (noopMeter) RegisterCallback([]instrument.Asynchronous, Callback) (Registration, error) {
return noopReg{}, nil
}

Expand Down
8 changes: 2 additions & 6 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
Expand Down Expand Up @@ -143,7 +141,7 @@ func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option

// RegisterCallback registers the function f to be called when any of the
// insts Collect method is called.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f metric.Callback) (metric.Registration, error) {
for _, inst := range insts {
// Only register if at least one instrument has a non-drop aggregation.
// Otherwise, calling f during collection will be wasted computation.
Expand Down Expand Up @@ -174,9 +172,7 @@ func (noopRegister) Unregister() error {
return nil
}

type callback func(context.Context)

func (m *meter) registerCallback(c callback) (metric.Registration, error) {
func (m *meter) registerCallback(c metric.Callback) (metric.Registration, error) {
return m.pipes.registerCallback(c), nil
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) {
}

// addCallback registers a callback to be run when `produce()` is called.
func (p *pipeline) addCallback(c callback) (unregister func()) {
func (p *pipeline) addCallback(c metric.Callback) (unregister func()) {
p.Lock()
defer p.Unlock()
e := p.callbacks.PushBack(c)
Expand Down Expand Up @@ -126,7 +126,7 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err

for e := p.callbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(callback)
f := e.Value.(metric.Callback)
f(ctx)
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
Expand Down Expand Up @@ -447,7 +447,7 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli
return pipes
}

func (p pipelines) registerCallback(c callback) metric.Registration {
func (p pipelines) registerCallback(c metric.Callback) metric.Registration {
unregs := make([]func(), len(p))
for i, pipe := range p {
unregs[i] = pipe.addCallback(c)
Expand Down