Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Observer callback support #717

Merged
merged 13 commits into from
May 13, 2020
10 changes: 5 additions & 5 deletions api/global/internal/meter.go
Expand Up @@ -85,7 +85,7 @@ type asyncImpl struct {

instrument

callback func(func(metric.Number, []core.KeyValue))
runner metric.AsyncRunner
}

// SyncImpler is implemented by all of the sync metric
Expand Down Expand Up @@ -245,21 +245,21 @@ func (bound *syncHandle) Unbind() {

func (m *meterImpl) NewAsyncInstrument(
desc metric.Descriptor,
callback func(func(metric.Number, []core.KeyValue)),
runner metric.AsyncRunner,
) (metric.AsyncImpl, error) {

m.lock.Lock()
defer m.lock.Unlock()

if meterPtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).NewAsyncInstrument(desc, callback)
return (*meterPtr).NewAsyncInstrument(desc, runner)
}

inst := &asyncImpl{
instrument: instrument{
descriptor: desc,
},
callback: callback,
runner: runner,
}
m.asyncInsts = append(m.asyncInsts, inst)
return inst, nil
Expand All @@ -276,7 +276,7 @@ func (obs *asyncImpl) setDelegate(d metric.MeterImpl) {
implPtr := new(metric.AsyncImpl)

var err error
*implPtr, err = d.NewAsyncInstrument(obs.descriptor, obs.callback)
*implPtr, err = d.NewAsyncInstrument(obs.descriptor, obs.runner)

if err != nil {
// TODO: There is no standard way to deliver this error to the user.
Expand Down
25 changes: 9 additions & 16 deletions api/metric/api.go
Expand Up @@ -198,9 +198,7 @@ func (m Meter) RegisterInt64Observer(name string, callback Int64ObserverCallback
}
return wrapInt64ObserverInstrument(
m.newAsync(name, ObserverKind, Int64NumberKind, opts,
func(observe func(Number, []core.KeyValue)) {
callback(Int64ObserverResult{observe})
}))
newInt64AsyncRunner(callback)))
}

// RegisterFloat64Observer creates a new floating point Observer with
Expand All @@ -213,21 +211,16 @@ func (m Meter) RegisterFloat64Observer(name string, callback Float64ObserverCall
}
return wrapFloat64ObserverInstrument(
m.newAsync(name, ObserverKind, Float64NumberKind, opts,
func(observe func(Number, []core.KeyValue)) {
callback(Float64ObserverResult{observe})
}))
newFloat64AsyncRunner(callback)))
}

// Observe captures a single integer value from the associated
// instrument callback, with the given labels.
func (io Int64ObserverResult) Observe(value int64, labels ...core.KeyValue) {
io.observe(NewInt64Number(value), labels)
}

// Observe captures a single floating point value from the associated
// instrument callback, with the given labels.
func (fo Float64ObserverResult) Observe(value float64, labels ...core.KeyValue) {
fo.observe(NewFloat64Number(value), labels)
// NewBatchObserver creates a new BatchObserver that supports
// making batches of observations for multiple instruments.
func (m Meter) NewBatchObserver(callback BatchObserverCallback) BatchObserver {
return BatchObserver{
meter: m,
runner: newBatchAsyncRunner(callback),
}
}

// WithDescription applies provided description.
Expand Down
47 changes: 46 additions & 1 deletion api/metric/api_test.go
Expand Up @@ -211,6 +211,51 @@ func checkBatches(t *testing.T, ctx context.Context, labels []core.KeyValue, moc
}
}

func TestBatchObserver(t *testing.T) {
mockSDK, meter := mockTest.NewMeter()

var obs1 metric.Int64Observer
var obs2 metric.Float64Observer

labels := []core.KeyValue{
key.String("A", "B"),
key.String("C", "D"),
}

cb := Must(meter).NewBatchObserver(
func(result metric.BatchObserverResult) {
result.Observe(labels,
obs1.Observation(42),
obs2.Observation(42.0),
)
},
)
obs1 = cb.RegisterInt64Observer("test.observer.int")
obs2 = cb.RegisterFloat64Observer("test.observer.float")

mockSDK.RunAsyncInstruments()

require.Len(t, mockSDK.MeasurementBatches, 1)

impl1 := obs1.AsyncImpl().Implementation().(*mockTest.Async)
impl2 := obs2.AsyncImpl().Implementation().(*mockTest.Async)

require.NotNil(t, impl1)
require.NotNil(t, impl2)

got := mockSDK.MeasurementBatches[0]
require.Equal(t, labels, got.Labels)
require.Len(t, got.Measurements, 2)

m1 := got.Measurements[0]
require.Equal(t, impl1, m1.Instrument.Implementation().(*mockTest.Async))
require.Equal(t, 0, m1.Number.CompareNumber(metric.Int64NumberKind, fortyTwo(t, metric.Int64NumberKind)))

m2 := got.Measurements[1]
require.Equal(t, impl2, m2.Instrument.Implementation().(*mockTest.Async))
require.Equal(t, 0, m2.Number.CompareNumber(metric.Float64NumberKind, fortyTwo(t, metric.Float64NumberKind)))
}

func checkObserverBatch(t *testing.T, labels []core.KeyValue, mock *mockTest.MeterImpl, kind metric.NumberKind, observer metric.AsyncImpl) {
t.Helper()
assert.Len(t, mock.MeasurementBatches, 1)
Expand Down Expand Up @@ -257,7 +302,7 @@ func (testWrappedMeter) NewSyncInstrument(_ metric.Descriptor) (metric.SyncImpl,
return nil, nil
}

func (testWrappedMeter) NewAsyncInstrument(_ metric.Descriptor, _ func(func(metric.Number, []core.KeyValue))) (metric.AsyncImpl, error) {
func (testWrappedMeter) NewAsyncInstrument(_ metric.Descriptor, _ metric.AsyncRunner) (metric.AsyncImpl, error) {
return nil, errors.New("Test wrap error")
}

Expand Down
34 changes: 34 additions & 0 deletions api/metric/must.go
Expand Up @@ -20,6 +20,12 @@ type MeterMust struct {
meter Meter
}

// BatchObserverMust is a wrapper for BatchObserver that panics when
// any instrument constructor encounters an error.
type BatchObserverMust struct {
batch BatchObserver
}

// Must constructs a MeterMust implementation from a Meter, allowing
// the application to panic when any instrument constructor yields an
// error.
Expand Down Expand Up @@ -86,3 +92,31 @@ func (mm MeterMust) RegisterFloat64Observer(name string, callback Float64Observe
return inst
}
}

// NewBatchObserver returns a wrapper around BatchObserver that panics
// when any instrument constructor returns an error.
func (mm MeterMust) NewBatchObserver(callback BatchObserverCallback) BatchObserverMust {
return BatchObserverMust{
batch: mm.meter.NewBatchObserver(callback),
}
}

// RegisterInt64Observer calls `BatchObserver.RegisterInt64Observer` and
// returns the instrument, panicking if it encounters an error.
func (bm BatchObserverMust) RegisterInt64Observer(name string, oos ...Option) Int64Observer {
if inst, err := bm.batch.RegisterInt64Observer(name, oos...); err != nil {
panic(err)
} else {
return inst
}
}

// RegisterFloat64Observer calls `BatchObserver.RegisterFloat64Observer` and
// returns the instrument, panicking if it encounters an error.
func (bm BatchObserverMust) RegisterFloat64Observer(name string, oos ...Option) Float64Observer {
if inst, err := bm.batch.RegisterFloat64Observer(name, oos...); err != nil {
panic(err)
} else {
return inst
}
}