Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Meter a struct, simplify the global Meter #709

Merged
merged 11 commits into from May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/global/global_test.go
Expand Up @@ -37,7 +37,7 @@ func (*testTraceProvider) Tracer(_ string) trace.Tracer {
}

func (*testMeterProvider) Meter(_ string) metric.Meter {
return &metric.NoopMeter{}
return metric.Meter{}
}

func TestMultipleGlobalTracerProvider(t *testing.T) {
Expand Down
179 changes: 45 additions & 134 deletions api/global/internal/meter.go
Expand Up @@ -49,22 +49,27 @@ import (
type meterProvider struct {
delegate metric.Provider

lock sync.Mutex
meters map[string]*meter
}
// lock protects `delegate` and `meters`.
lock sync.Mutex

type meter struct {
delegate unsafe.Pointer // (*metric.Meter)
// meters maintains a unique entry for every named Meter
// that has been registered through the global instance.
meters map[string]*meterEntry
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}

provider *meterProvider
name string
type meterImpl struct {
delegate unsafe.Pointer // (*metric.MeterImpl)

lock sync.Mutex
registry map[string]metric.InstrumentImpl
syncInsts []*syncImpl
asyncInsts []*asyncImpl
}

type meterEntry struct {
unique metric.MeterImpl
impl meterImpl
}

type instrument struct {
descriptor metric.Descriptor
}
Expand All @@ -73,16 +78,14 @@ type syncImpl struct {
delegate unsafe.Pointer // (*metric.SyncImpl)

instrument

constructor func(metric.Meter) (metric.SyncImpl, error)
}

type asyncImpl struct {
delegate unsafe.Pointer // (*metric.AsyncImpl)

instrument

constructor func(metric.Meter) (metric.AsyncImpl, error)
callback func(func(metric.Number, []core.KeyValue))
}

// SyncImpler is implemented by all of the sync metric
Expand All @@ -107,7 +110,7 @@ type syncHandle struct {
}

var _ metric.Provider = &meterProvider{}
var _ metric.Meter = &meter{}
var _ metric.MeterImpl = &meterImpl{}
var _ metric.InstrumentImpl = &syncImpl{}
var _ metric.BoundSyncImpl = &syncHandle{}
var _ metric.AsyncImpl = &asyncImpl{}
Expand All @@ -120,7 +123,7 @@ func (inst *instrument) Descriptor() metric.Descriptor {

func newMeterProvider() *meterProvider {
return &meterProvider{
meters: map[string]*meter{},
meters: map[string]*meterEntry{},
}
}

Expand All @@ -129,8 +132,8 @@ func (p *meterProvider) setDelegate(provider metric.Provider) {
defer p.lock.Unlock()

p.delegate = provider
for _, m := range p.meters {
m.setDelegate(provider)
for name, entry := range p.meters {
entry.impl.setDelegate(name, provider)
}
p.meters = nil
}
Expand All @@ -143,29 +146,24 @@ func (p *meterProvider) Meter(name string) metric.Meter {
return p.delegate.Meter(name)
}

if exm, ok := p.meters[name]; ok {
return exm
}
entry, ok := p.meters[name]
if !ok {
entry = &meterEntry{}
entry.unique = registry.NewUniqueInstrumentMeterImpl(&entry.impl)
p.meters[name] = entry

m := &meter{
provider: p,
name: name,
registry: map[string]metric.InstrumentImpl{},
syncInsts: []*syncImpl{},
asyncInsts: []*asyncImpl{},
}
p.meters[name] = m
return m
return metric.WrapMeterImpl(entry.unique, name)
}

// Meter interface and delegation

func (m *meter) setDelegate(provider metric.Provider) {
func (m *meterImpl) setDelegate(name string, provider metric.Provider) {
m.lock.Lock()
defer m.lock.Unlock()

d := new(metric.Meter)
*d = provider.Meter(m.name)
d := new(metric.MeterImpl)
*d = provider.Meter(name).MeterImpl()
m.delegate = unsafe.Pointer(d)

for _, inst := range m.syncInsts {
Expand All @@ -178,49 +176,30 @@ func (m *meter) setDelegate(provider metric.Provider) {
m.asyncInsts = nil
}

func (m *meter) newSync(desc metric.Descriptor, constructor func(metric.Meter) (metric.SyncImpl, error)) (metric.SyncImpl, error) {
func (m *meterImpl) NewSyncInstrument(desc metric.Descriptor) (metric.SyncImpl, error) {
m.lock.Lock()
defer m.lock.Unlock()

if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return constructor(*meterPtr)
}

if ex, ok := m.registry[desc.Name()]; ok {
if !registry.Compatible(desc, ex.Descriptor()) {
return nil, registry.NewMetricKindMismatchError(ex.Descriptor())
}
return ex.(metric.SyncImpl), nil
if meterPtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).NewSyncInstrument(desc)
}

inst := &syncImpl{
instrument: instrument{
descriptor: desc,
},
constructor: constructor,
}
m.syncInsts = append(m.syncInsts, inst)
m.registry[desc.Name()] = inst
return inst, nil
}

func syncCheck(has SyncImpler, err error) (metric.SyncImpl, error) {
if has != nil {
return has.SyncImpl(), err
}
if err == nil {
err = metric.ErrSDKReturnedNilImpl
}
return nil, err
}

// Synchronous delegation

func (inst *syncImpl) setDelegate(d metric.Meter) {
func (inst *syncImpl) setDelegate(d metric.MeterImpl) {
implPtr := new(metric.SyncImpl)

var err error
*implPtr, err = inst.constructor(d)
*implPtr, err = d.NewSyncInstrument(inst.descriptor)

if err != nil {
// TODO: There is no standard way to deliver this error to the user.
Expand Down Expand Up @@ -264,29 +243,25 @@ func (bound *syncHandle) Unbind() {

// Async delegation

func (m *meter) newAsync(desc metric.Descriptor, constructor func(metric.Meter) (metric.AsyncImpl, error)) (metric.AsyncImpl, error) {
func (m *meterImpl) NewAsyncInstrument(
desc metric.Descriptor,
callback func(func(metric.Number, []core.KeyValue)),
) (metric.AsyncImpl, error) {

m.lock.Lock()
defer m.lock.Unlock()

if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return constructor(*meterPtr)
}

if ex, ok := m.registry[desc.Name()]; ok {
if !registry.Compatible(desc, ex.Descriptor()) {
return nil, registry.NewMetricKindMismatchError(ex.Descriptor())
}
return ex.(metric.AsyncImpl), nil
if meterPtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).NewAsyncInstrument(desc, callback)
}

inst := &asyncImpl{
instrument: instrument{
descriptor: desc,
},
constructor: constructor,
callback: callback,
}
m.asyncInsts = append(m.asyncInsts, inst)
m.registry[desc.Name()] = inst
return inst, nil
}

Expand All @@ -297,21 +272,11 @@ func (obs *asyncImpl) Implementation() interface{} {
return obs
}

func asyncCheck(has AsyncImpler, err error) (metric.AsyncImpl, error) {
if has != nil {
return has.AsyncImpl(), err
}
if err == nil {
err = metric.ErrSDKReturnedNilImpl
}
return nil, err
}

func (obs *asyncImpl) setDelegate(d metric.Meter) {
func (obs *asyncImpl) setDelegate(d metric.MeterImpl) {
implPtr := new(metric.AsyncImpl)

var err error
*implPtr, err = obs.constructor(d)
*implPtr, err = d.NewAsyncInstrument(obs.descriptor, obs.callback)

if err != nil {
// TODO: There is no standard way to deliver this error to the user.
Expand All @@ -326,8 +291,8 @@ func (obs *asyncImpl) setDelegate(d metric.Meter) {

// Metric updates

func (m *meter) RecordBatch(ctx context.Context, labels []core.KeyValue, measurements ...metric.Measurement) {
if delegatePtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); delegatePtr != nil {
func (m *meterImpl) RecordBatch(ctx context.Context, labels []core.KeyValue, measurements ...metric.Measurement) {
if delegatePtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); delegatePtr != nil {
(*delegatePtr).RecordBatch(ctx, labels, measurements...)
}
}
Expand Down Expand Up @@ -363,64 +328,10 @@ func (bound *syncHandle) RecordOne(ctx context.Context, number metric.Number) {
(*implPtr).RecordOne(ctx, number)
}

// Constructors

func (m *meter) withName(opts []metric.Option) []metric.Option {
return append(opts, metric.WithLibraryName(m.name))
}

func (m *meter) NewInt64Counter(name string, opts ...metric.Option) (metric.Int64Counter, error) {
return metric.WrapInt64CounterInstrument(m.newSync(
metric.NewDescriptor(name, metric.CounterKind, metric.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewInt64Counter(name, opts...))
}))
}

func (m *meter) NewFloat64Counter(name string, opts ...metric.Option) (metric.Float64Counter, error) {
return metric.WrapFloat64CounterInstrument(m.newSync(
metric.NewDescriptor(name, metric.CounterKind, metric.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewFloat64Counter(name, opts...))
}))
}

func (m *meter) NewInt64Measure(name string, opts ...metric.Option) (metric.Int64Measure, error) {
return metric.WrapInt64MeasureInstrument(m.newSync(
metric.NewDescriptor(name, metric.MeasureKind, metric.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewInt64Measure(name, opts...))
}))
}

func (m *meter) NewFloat64Measure(name string, opts ...metric.Option) (metric.Float64Measure, error) {
return metric.WrapFloat64MeasureInstrument(m.newSync(
metric.NewDescriptor(name, metric.MeasureKind, metric.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewFloat64Measure(name, opts...))
}))
}

func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, opts ...metric.Option) (metric.Int64Observer, error) {
return metric.WrapInt64ObserverInstrument(m.newAsync(
metric.NewDescriptor(name, metric.ObserverKind, metric.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.AsyncImpl, error) {
return asyncCheck(other.RegisterInt64Observer(name, callback, opts...))
}))
}

func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, opts ...metric.Option) (metric.Float64Observer, error) {
return metric.WrapFloat64ObserverInstrument(m.newAsync(
metric.NewDescriptor(name, metric.ObserverKind, metric.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.AsyncImpl, error) {
return asyncCheck(other.RegisterFloat64Observer(name, callback, opts...))
}))
}

func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{
"meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate),
"meter.delegate": unsafe.Offsetof(meter{}.delegate),
"meterImpl.delegate": unsafe.Offsetof(meterImpl{}.delegate),
"syncImpl.delegate": unsafe.Offsetof(syncImpl{}.delegate),
"asyncImpl.delegate": unsafe.Offsetof(asyncImpl{}.delegate),
"syncHandle.delegate": unsafe.Offsetof(syncHandle{}.delegate),
Expand Down
8 changes: 4 additions & 4 deletions api/global/internal/meter_test.go
Expand Up @@ -280,15 +280,15 @@ type meterProviderWithConstructorError struct {
}

type meterWithConstructorError struct {
metric.Meter
metric.MeterImpl
}

func (m *meterProviderWithConstructorError) Meter(name string) metric.Meter {
return &meterWithConstructorError{m.Provider.Meter(name)}
return metric.WrapMeterImpl(&meterWithConstructorError{m.Provider.Meter(name).MeterImpl()}, name)
}

func (m *meterWithConstructorError) NewInt64Counter(name string, opts ...metric.Option) (metric.Int64Counter, error) {
return metric.Int64Counter{}, errors.New("constructor error")
func (m *meterWithConstructorError) NewSyncInstrument(_ metric.Descriptor) (metric.SyncImpl, error) {
return metric.NoopSync{}, errors.New("constructor error")
}

func TestErrorInDeferredConstructor(t *testing.T) {
Expand Down