Skip to content

Commit

Permalink
Make Meter a struct, simplify the global Meter (#709)
Browse files Browse the repository at this point in the history
* Tests pass

* Precommit pass

* More lint

* Remove a few interfaces

* Final edits

* Fix comments

* Indentation

* registry->unique

* Comments
  • Loading branch information
jmacd committed May 12, 2020
1 parent 64afb05 commit 32ddc16
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 310 deletions.
2 changes: 1 addition & 1 deletion api/global/global_test.go
Expand Up @@ -37,7 +37,7 @@ func (*testTraceProvider) Tracer(_ string) trace.Tracer {
}

func (*testMeterProvider) Meter(_ string) metric.Meter {
return &metric.NoopMeter{}
return metric.Meter{}
}

func TestMultipleGlobalTracerProvider(t *testing.T) {
Expand Down
179 changes: 45 additions & 134 deletions api/global/internal/meter.go
Expand Up @@ -49,22 +49,27 @@ import (
type meterProvider struct {
delegate metric.Provider

lock sync.Mutex
meters map[string]*meter
}
// lock protects `delegate` and `meters`.
lock sync.Mutex

type meter struct {
delegate unsafe.Pointer // (*metric.Meter)
// meters maintains a unique entry for every named Meter
// that has been registered through the global instance.
meters map[string]*meterEntry
}

provider *meterProvider
name string
type meterImpl struct {
delegate unsafe.Pointer // (*metric.MeterImpl)

lock sync.Mutex
registry map[string]metric.InstrumentImpl
syncInsts []*syncImpl
asyncInsts []*asyncImpl
}

type meterEntry struct {
unique metric.MeterImpl
impl meterImpl
}

type instrument struct {
descriptor metric.Descriptor
}
Expand All @@ -73,16 +78,14 @@ type syncImpl struct {
delegate unsafe.Pointer // (*metric.SyncImpl)

instrument

constructor func(metric.Meter) (metric.SyncImpl, error)
}

type asyncImpl struct {
delegate unsafe.Pointer // (*metric.AsyncImpl)

instrument

constructor func(metric.Meter) (metric.AsyncImpl, error)
callback func(func(metric.Number, []core.KeyValue))
}

// SyncImpler is implemented by all of the sync metric
Expand All @@ -107,7 +110,7 @@ type syncHandle struct {
}

var _ metric.Provider = &meterProvider{}
var _ metric.Meter = &meter{}
var _ metric.MeterImpl = &meterImpl{}
var _ metric.InstrumentImpl = &syncImpl{}
var _ metric.BoundSyncImpl = &syncHandle{}
var _ metric.AsyncImpl = &asyncImpl{}
Expand All @@ -120,7 +123,7 @@ func (inst *instrument) Descriptor() metric.Descriptor {

func newMeterProvider() *meterProvider {
return &meterProvider{
meters: map[string]*meter{},
meters: map[string]*meterEntry{},
}
}

Expand All @@ -129,8 +132,8 @@ func (p *meterProvider) setDelegate(provider metric.Provider) {
defer p.lock.Unlock()

p.delegate = provider
for _, m := range p.meters {
m.setDelegate(provider)
for name, entry := range p.meters {
entry.impl.setDelegate(name, provider)
}
p.meters = nil
}
Expand All @@ -143,29 +146,24 @@ func (p *meterProvider) Meter(name string) metric.Meter {
return p.delegate.Meter(name)
}

if exm, ok := p.meters[name]; ok {
return exm
}
entry, ok := p.meters[name]
if !ok {
entry = &meterEntry{}
entry.unique = registry.NewUniqueInstrumentMeterImpl(&entry.impl)
p.meters[name] = entry

m := &meter{
provider: p,
name: name,
registry: map[string]metric.InstrumentImpl{},
syncInsts: []*syncImpl{},
asyncInsts: []*asyncImpl{},
}
p.meters[name] = m
return m
return metric.WrapMeterImpl(entry.unique, name)
}

// Meter interface and delegation

func (m *meter) setDelegate(provider metric.Provider) {
func (m *meterImpl) setDelegate(name string, provider metric.Provider) {
m.lock.Lock()
defer m.lock.Unlock()

d := new(metric.Meter)
*d = provider.Meter(m.name)
d := new(metric.MeterImpl)
*d = provider.Meter(name).MeterImpl()
m.delegate = unsafe.Pointer(d)

for _, inst := range m.syncInsts {
Expand All @@ -178,49 +176,30 @@ func (m *meter) setDelegate(provider metric.Provider) {
m.asyncInsts = nil
}

func (m *meter) newSync(desc metric.Descriptor, constructor func(metric.Meter) (metric.SyncImpl, error)) (metric.SyncImpl, error) {
func (m *meterImpl) NewSyncInstrument(desc metric.Descriptor) (metric.SyncImpl, error) {
m.lock.Lock()
defer m.lock.Unlock()

if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return constructor(*meterPtr)
}

if ex, ok := m.registry[desc.Name()]; ok {
if !registry.Compatible(desc, ex.Descriptor()) {
return nil, registry.NewMetricKindMismatchError(ex.Descriptor())
}
return ex.(metric.SyncImpl), nil
if meterPtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).NewSyncInstrument(desc)
}

inst := &syncImpl{
instrument: instrument{
descriptor: desc,
},
constructor: constructor,
}
m.syncInsts = append(m.syncInsts, inst)
m.registry[desc.Name()] = inst
return inst, nil
}

func syncCheck(has SyncImpler, err error) (metric.SyncImpl, error) {
if has != nil {
return has.SyncImpl(), err
}
if err == nil {
err = metric.ErrSDKReturnedNilImpl
}
return nil, err
}

// Synchronous delegation

func (inst *syncImpl) setDelegate(d metric.Meter) {
func (inst *syncImpl) setDelegate(d metric.MeterImpl) {
implPtr := new(metric.SyncImpl)

var err error
*implPtr, err = inst.constructor(d)
*implPtr, err = d.NewSyncInstrument(inst.descriptor)

if err != nil {
// TODO: There is no standard way to deliver this error to the user.
Expand Down Expand Up @@ -264,29 +243,25 @@ func (bound *syncHandle) Unbind() {

// Async delegation

func (m *meter) newAsync(desc metric.Descriptor, constructor func(metric.Meter) (metric.AsyncImpl, error)) (metric.AsyncImpl, error) {
func (m *meterImpl) NewAsyncInstrument(
desc metric.Descriptor,
callback func(func(metric.Number, []core.KeyValue)),
) (metric.AsyncImpl, error) {

m.lock.Lock()
defer m.lock.Unlock()

if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return constructor(*meterPtr)
}

if ex, ok := m.registry[desc.Name()]; ok {
if !registry.Compatible(desc, ex.Descriptor()) {
return nil, registry.NewMetricKindMismatchError(ex.Descriptor())
}
return ex.(metric.AsyncImpl), nil
if meterPtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).NewAsyncInstrument(desc, callback)
}

inst := &asyncImpl{
instrument: instrument{
descriptor: desc,
},
constructor: constructor,
callback: callback,
}
m.asyncInsts = append(m.asyncInsts, inst)
m.registry[desc.Name()] = inst
return inst, nil
}

Expand All @@ -297,21 +272,11 @@ func (obs *asyncImpl) Implementation() interface{} {
return obs
}

func asyncCheck(has AsyncImpler, err error) (metric.AsyncImpl, error) {
if has != nil {
return has.AsyncImpl(), err
}
if err == nil {
err = metric.ErrSDKReturnedNilImpl
}
return nil, err
}

func (obs *asyncImpl) setDelegate(d metric.Meter) {
func (obs *asyncImpl) setDelegate(d metric.MeterImpl) {
implPtr := new(metric.AsyncImpl)

var err error
*implPtr, err = obs.constructor(d)
*implPtr, err = d.NewAsyncInstrument(obs.descriptor, obs.callback)

if err != nil {
// TODO: There is no standard way to deliver this error to the user.
Expand All @@ -326,8 +291,8 @@ func (obs *asyncImpl) setDelegate(d metric.Meter) {

// Metric updates

func (m *meter) RecordBatch(ctx context.Context, labels []core.KeyValue, measurements ...metric.Measurement) {
if delegatePtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); delegatePtr != nil {
func (m *meterImpl) RecordBatch(ctx context.Context, labels []core.KeyValue, measurements ...metric.Measurement) {
if delegatePtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); delegatePtr != nil {
(*delegatePtr).RecordBatch(ctx, labels, measurements...)
}
}
Expand Down Expand Up @@ -363,64 +328,10 @@ func (bound *syncHandle) RecordOne(ctx context.Context, number metric.Number) {
(*implPtr).RecordOne(ctx, number)
}

// Constructors

func (m *meter) withName(opts []metric.Option) []metric.Option {
return append(opts, metric.WithLibraryName(m.name))
}

func (m *meter) NewInt64Counter(name string, opts ...metric.Option) (metric.Int64Counter, error) {
return metric.WrapInt64CounterInstrument(m.newSync(
metric.NewDescriptor(name, metric.CounterKind, metric.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewInt64Counter(name, opts...))
}))
}

func (m *meter) NewFloat64Counter(name string, opts ...metric.Option) (metric.Float64Counter, error) {
return metric.WrapFloat64CounterInstrument(m.newSync(
metric.NewDescriptor(name, metric.CounterKind, metric.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewFloat64Counter(name, opts...))
}))
}

func (m *meter) NewInt64Measure(name string, opts ...metric.Option) (metric.Int64Measure, error) {
return metric.WrapInt64MeasureInstrument(m.newSync(
metric.NewDescriptor(name, metric.MeasureKind, metric.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewInt64Measure(name, opts...))
}))
}

func (m *meter) NewFloat64Measure(name string, opts ...metric.Option) (metric.Float64Measure, error) {
return metric.WrapFloat64MeasureInstrument(m.newSync(
metric.NewDescriptor(name, metric.MeasureKind, metric.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewFloat64Measure(name, opts...))
}))
}

func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, opts ...metric.Option) (metric.Int64Observer, error) {
return metric.WrapInt64ObserverInstrument(m.newAsync(
metric.NewDescriptor(name, metric.ObserverKind, metric.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.AsyncImpl, error) {
return asyncCheck(other.RegisterInt64Observer(name, callback, opts...))
}))
}

func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, opts ...metric.Option) (metric.Float64Observer, error) {
return metric.WrapFloat64ObserverInstrument(m.newAsync(
metric.NewDescriptor(name, metric.ObserverKind, metric.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.AsyncImpl, error) {
return asyncCheck(other.RegisterFloat64Observer(name, callback, opts...))
}))
}

func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{
"meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate),
"meter.delegate": unsafe.Offsetof(meter{}.delegate),
"meterImpl.delegate": unsafe.Offsetof(meterImpl{}.delegate),
"syncImpl.delegate": unsafe.Offsetof(syncImpl{}.delegate),
"asyncImpl.delegate": unsafe.Offsetof(asyncImpl{}.delegate),
"syncHandle.delegate": unsafe.Offsetof(syncHandle{}.delegate),
Expand Down
8 changes: 4 additions & 4 deletions api/global/internal/meter_test.go
Expand Up @@ -280,15 +280,15 @@ type meterProviderWithConstructorError struct {
}

type meterWithConstructorError struct {
metric.Meter
metric.MeterImpl
}

func (m *meterProviderWithConstructorError) Meter(name string) metric.Meter {
return &meterWithConstructorError{m.Provider.Meter(name)}
return metric.WrapMeterImpl(&meterWithConstructorError{m.Provider.Meter(name).MeterImpl()}, name)
}

func (m *meterWithConstructorError) NewInt64Counter(name string, opts ...metric.Option) (metric.Int64Counter, error) {
return metric.Int64Counter{}, errors.New("constructor error")
func (m *meterWithConstructorError) NewSyncInstrument(_ metric.Descriptor) (metric.SyncImpl, error) {
return metric.NoopSync{}, errors.New("constructor error")
}

func TestErrorInDeferredConstructor(t *testing.T) {
Expand Down

0 comments on commit 32ddc16

Please sign in to comment.