Skip to content

Commit

Permalink
Create metric API Callback type (#3564)
Browse files Browse the repository at this point in the history
* Create metric API Callback type

Document the type according the OTel specification requirements.

* Update all impls of the metric API with new type

* Add changes to changelog

* Update PR number in changelog entry
  • Loading branch information
MrAlias committed Jan 5, 2023
1 parent efd8a7d commit e368276
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 13 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package.
This `Registration` can be used to unregister callbacks. (#3522)
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)
- Add the `Callback` function type to the `go.opentelemetry.io/otel/metric` package.
This new named function type is registered with a `Meter`. (#3564)

### Changed

Expand Down Expand Up @@ -52,6 +54,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `InstrumentKindAsyncCounter` is renamed to `InstrumentKindObservableCounter`
- `InstrumentKindAsyncUpDownCounter` is renamed to `InstrumentKindObservableUpDownCounter`
- `InstrumentKindAsyncGauge` is renamed to `InstrumentKindObservableGauge`
- Update the `RegisterCallback` method of the `Meter` in the `go.opentelemetry.io/otel/sdk/metric` package to accept the added `Callback` type instead of an inline function type definition.
The underlying type of a `Callback` is the same `func(context.Context)` that the method used to accept. (#3564)

### Deprecated

Expand Down
2 changes: 1 addition & 1 deletion metric/internal/global/meter.go
Expand Up @@ -283,7 +283,7 @@ func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f metric.Callback) (metric.Registration, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
insts = unwrapInstruments(insts)
return del.RegisterCallback(insts, f)
Expand Down
2 changes: 1 addition & 1 deletion metric/internal/global/meter_types_test.go
Expand Up @@ -119,7 +119,7 @@ func (m *testMeter) Float64ObservableGauge(name string, options ...instrument.Op
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f metric.Callback) (metric.Registration, error) {
m.callbacks = append(m.callbacks, f)
return testReg{
f: func(idx int) func() {
Expand Down
15 changes: 14 additions & 1 deletion metric/meter.go
Expand Up @@ -106,9 +106,22 @@ type Meter interface {
//
// If no instruments are passed, f should not be registered nor called
// during collection.
RegisterCallback(instruments []instrument.Asynchronous, f func(context.Context)) (Registration, error)
RegisterCallback(instruments []instrument.Asynchronous, f Callback) (Registration, error)
}

// Callback is a function registered with a Meter that makes observations for
// the set of instruments it is registered with.
//
// The function needs to complete in a finite amount of time and the deadline
// of the passed context is expected to be honored.
//
// The function needs to make unique observations across all registered
// Callbacks. Meaning, it should not report measurements for an instrument with
// the same attributes as another Callback will report.
//
// The function needs to be concurrent safe.
type Callback func(context.Context)

// Registration is an token representing the unique registration of a callback
// for a set of instruments with a Meter.
type Registration interface {
Expand Down
2 changes: 1 addition & 1 deletion metric/noop.go
Expand Up @@ -92,7 +92,7 @@ func (noopMeter) Float64ObservableGauge(string, ...instrument.Option) (asyncfloa
}

// RegisterCallback creates a register callback that does not record any metrics.
func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) (Registration, error) {
func (noopMeter) RegisterCallback([]instrument.Asynchronous, Callback) (Registration, error) {
return noopReg{}, nil
}

Expand Down
8 changes: 2 additions & 6 deletions sdk/metric/meter.go
Expand Up @@ -15,8 +15,6 @@
package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
Expand Down Expand Up @@ -143,7 +141,7 @@ func (m *meter) Float64ObservableGauge(name string, options ...instrument.Option

// RegisterCallback registers the function f to be called when any of the
// insts Collect method is called.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f metric.Callback) (metric.Registration, error) {
for _, inst := range insts {
// Only register if at least one instrument has a non-drop aggregation.
// Otherwise, calling f during collection will be wasted computation.
Expand Down Expand Up @@ -174,9 +172,7 @@ func (noopRegister) Unregister() error {
return nil
}

type callback func(context.Context)

func (m *meter) registerCallback(c callback) (metric.Registration, error) {
func (m *meter) registerCallback(c metric.Callback) (metric.Registration, error) {
return m.pipes.registerCallback(c), nil
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/pipeline.go
Expand Up @@ -96,7 +96,7 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) {
}

// addCallback registers a callback to be run when `produce()` is called.
func (p *pipeline) addCallback(c callback) (unregister func()) {
func (p *pipeline) addCallback(c metric.Callback) (unregister func()) {
p.Lock()
defer p.Unlock()
e := p.callbacks.PushBack(c)
Expand Down Expand Up @@ -126,7 +126,7 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err

for e := p.callbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(callback)
f := e.Value.(metric.Callback)
f(ctx)
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
Expand Down Expand Up @@ -447,7 +447,7 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli
return pipes
}

func (p pipelines) registerCallback(c callback) metric.Registration {
func (p pipelines) registerCallback(c metric.Callback) metric.Registration {
unregs := make([]func(), len(p))
for i, pipe := range p {
unregs[i] = pipe.addCallback(c)
Expand Down

0 comments on commit e368276

Please sign in to comment.