Skip to content

Commit

Permalink
Merge branch 'master' into opentelemetry-exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
rghetia committed Mar 5, 2020
2 parents 5e984ef + 79de90a commit 241940a
Show file tree
Hide file tree
Showing 29 changed files with 1,216 additions and 218 deletions.
24 changes: 19 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ ALL_DOCS := $(shell find . -name '*.md' -type f | sort)
ALL_GO_MOD_DIRS := $(filter-out $(TOOLS_MOD_DIR), $(shell find . -type f -name 'go.mod' -exec dirname {} \; | sort))
ALL_COVERAGE_MOD_DIRS := $(shell find . -type f -name 'go.mod' -exec dirname {} \; | egrep -v '^./example|^$(TOOLS_MOD_DIR)' | sort)

# Mac OS Catalina 10.5.x doesn't support 386. Hence skip 386 test
SKIP_386_TEST = false
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Darwin)
SW_VERS := $(shell sw_vers -productVersion)
ifeq ($(shell echo $(SW_VERS) | egrep '^(10.1[5-9]|1[1-9]|[2-9])'), $(SW_VERS))
SKIP_386_TEST = true
endif
endif

GOTEST_MIN = go test -v -timeout 30s
GOTEST = $(GOTEST_MIN) -race
GOTEST_WITH_COVERAGE = $(GOTEST) -coverprofile=coverage.txt -covermode=atomic
Expand Down Expand Up @@ -73,11 +83,15 @@ test:

.PHONY: test-386
test-386:
set -e; for dir in $(ALL_GO_MOD_DIRS); do \
echo "go test ./... GOARCH 386 in $${dir}"; \
(cd "$${dir}" && \
GOARCH=386 $(GOTEST_MIN) ./...); \
done
if [ $(SKIP_386_TEST) = true ] ; then \
echo "skipping the test for GOARCH 386 as it is not supported on the current OS"; \
else \
set -e; for dir in $(ALL_GO_MOD_DIRS); do \
echo "go test ./... GOARCH 386 in $${dir}"; \
(cd "$${dir}" && \
GOARCH=386 $(GOTEST_MIN) ./...); \
done; \
fi

.PHONY: examples
examples:
Expand Down
13 changes: 13 additions & 0 deletions api/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package global
import (
"go.opentelemetry.io/otel/api/global/internal"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/trace"
)

Expand Down Expand Up @@ -48,3 +49,15 @@ func MeterProvider() metric.Provider {
func SetMeterProvider(mp metric.Provider) {
internal.SetMeterProvider(mp)
}

// Propagators returns the registered global propagators instance. If
// none is registered then an instance of propagators.NoopPropagators
// is returned.
func Propagators() propagation.Propagators {
return internal.Propagators()
}

// SetPropagators registers `p` as the global propagators instance.
func SetPropagators(p propagation.Propagators) {
internal.SetPropagators(p)
}
26 changes: 26 additions & 0 deletions api/global/internal/internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package internal_test

import (
"os"
"testing"

"go.opentelemetry.io/otel/api/global/internal"
ottest "go.opentelemetry.io/otel/internal/testing"
)

// Ensure struct alignment prior to running tests.
func TestMain(m *testing.M) {
fieldsMap := internal.AtomicFieldOffsets()
fields := make([]ottest.FieldOffset, 0, len(fieldsMap))
for name, offset := range fieldsMap {
fields = append(fields, ottest.FieldOffset{
Name: name,
Offset: offset,
})
}
if !ottest.Aligned8Byte(fields, os.Stderr) {
os.Exit(1)
}

os.Exit(m.Run())
}
184 changes: 175 additions & 9 deletions api/global/internal/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,44 +40,76 @@ const (
)

type meterProvider struct {
lock sync.Mutex
meters []*meter
delegate metric.Provider

lock sync.Mutex
meters []*meter
}

type meter struct {
delegate unsafe.Pointer // (*metric.Meter)

provider *meterProvider
name string

lock sync.Mutex
instruments []*instImpl

delegate unsafe.Pointer // (*metric.Meter)
lock sync.Mutex
instruments []*instImpl
liveObservers map[*obsImpl]struct{}
// orderedObservers slice contains observers in their order of
// registration. It may also contain unregistered
// observers. The liveObservers map should be consulted to
// check if the observer is registered or not.
orderedObservers []*obsImpl
}

type instImpl struct {
delegate unsafe.Pointer // (*metric.InstrumentImpl)

name string
mkind metricKind
nkind core.NumberKind
opts interface{}
}

delegate unsafe.Pointer // (*metric.InstrumentImpl)
type obsImpl struct {
delegate unsafe.Pointer // (*metric.Int64Observer or *metric.Float64Observer)

name string
nkind core.NumberKind
opts []metric.ObserverOptionApplier
meter *meter
callback interface{}
}

type int64ObsImpl struct {
observer *obsImpl
}

type float64ObsImpl struct {
observer *obsImpl
}

// this is a common subset of the metric observers interfaces
type observerUnregister interface {
Unregister()
}

type labelSet struct {
delegate unsafe.Pointer // (* metric.LabelSet)

meter *meter
value []core.KeyValue

initialize sync.Once
delegate unsafe.Pointer // (* metric.LabelSet)
}

type instHandle struct {
delegate unsafe.Pointer // (*metric.HandleImpl)

inst *instImpl
labels metric.LabelSet

initialize sync.Once
delegate unsafe.Pointer // (*metric.HandleImpl)
}

var _ metric.Provider = &meterProvider{}
Expand All @@ -86,6 +118,10 @@ var _ metric.LabelSet = &labelSet{}
var _ metric.LabelSetDelegate = &labelSet{}
var _ metric.InstrumentImpl = &instImpl{}
var _ metric.BoundInstrumentImpl = &instHandle{}
var _ metric.Int64Observer = int64ObsImpl{}
var _ metric.Float64Observer = float64ObsImpl{}
var _ observerUnregister = (metric.Int64Observer)(nil)
var _ observerUnregister = (metric.Float64Observer)(nil)

// Provider interface and delegation

Expand Down Expand Up @@ -130,6 +166,13 @@ func (m *meter) setDelegate(provider metric.Provider) {
inst.setDelegate(*d)
}
m.instruments = nil
for _, obs := range m.orderedObservers {
if _, ok := m.liveObservers[obs]; ok {
obs.setDelegate(*d)
}
}
m.liveObservers = nil
m.orderedObservers = nil
}

func (m *meter) newInst(name string, mkind metricKind, nkind core.NumberKind, opts interface{}) metric.InstrumentImpl {
Expand Down Expand Up @@ -203,6 +246,68 @@ func (bound *instHandle) Unbind() {
(*implPtr).Unbind()
}

// Any Observer delegation

func (obs *obsImpl) setDelegate(d metric.Meter) {
if obs.nkind == core.Int64NumberKind {
obs.setInt64Delegate(d)
} else {
obs.setFloat64Delegate(d)
}
}

func (obs *obsImpl) unregister() {
unreg := obs.getUnregister()
if unreg != nil {
unreg.Unregister()
return
}
obs.meter.lock.Lock()
defer obs.meter.lock.Unlock()
delete(obs.meter.liveObservers, obs)
if len(obs.meter.liveObservers) == 0 {
obs.meter.liveObservers = nil
obs.meter.orderedObservers = nil
}
}

func (obs *obsImpl) getUnregister() observerUnregister {
ptr := atomic.LoadPointer(&obs.delegate)
if ptr == nil {
return nil
}
if obs.nkind == core.Int64NumberKind {
return *(*metric.Int64Observer)(ptr)
}
return *(*metric.Float64Observer)(ptr)
}

// Int64Observer delegation

func (obs *obsImpl) setInt64Delegate(d metric.Meter) {
obsPtr := new(metric.Int64Observer)
cb := obs.callback.(metric.Int64ObserverCallback)
*obsPtr = d.RegisterInt64Observer(obs.name, cb, obs.opts...)
atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr))
}

func (obs int64ObsImpl) Unregister() {
obs.observer.unregister()
}

// Float64Observer delegation

func (obs *obsImpl) setFloat64Delegate(d metric.Meter) {
obsPtr := new(metric.Float64Observer)
cb := obs.callback.(metric.Float64ObserverCallback)
*obsPtr = d.RegisterFloat64Observer(obs.name, cb, obs.opts...)
atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr))
}

func (obs float64ObsImpl) Unregister() {
obs.observer.unregister()
}

// Metric updates

func (m *meter) RecordBatch(ctx context.Context, labels metric.LabelSet, measurements ...metric.Measurement) {
Expand Down Expand Up @@ -296,3 +401,64 @@ func (m *meter) NewInt64Measure(name string, opts ...metric.MeasureOptionApplier
func (m *meter) NewFloat64Measure(name string, opts ...metric.MeasureOptionApplier) metric.Float64Measure {
return metric.WrapFloat64MeasureInstrument(m.newInst(name, measureKind, core.Float64NumberKind, opts))
}

func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Int64Observer {
m.lock.Lock()
defer m.lock.Unlock()

if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).RegisterInt64Observer(name, callback, oos...)
}

obs := &obsImpl{
name: name,
nkind: core.Int64NumberKind,
opts: oos,
meter: m,
callback: callback,
}
m.addObserver(obs)
return int64ObsImpl{
observer: obs,
}
}

func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Float64Observer {
m.lock.Lock()
defer m.lock.Unlock()

if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).RegisterFloat64Observer(name, callback, oos...)
}

obs := &obsImpl{
name: name,
nkind: core.Float64NumberKind,
opts: oos,
meter: m,
callback: callback,
}
m.addObserver(obs)
return float64ObsImpl{
observer: obs,
}
}

func (m *meter) addObserver(obs *obsImpl) {
if m.liveObservers == nil {
m.liveObservers = make(map[*obsImpl]struct{})
}
m.liveObservers[obs] = struct{}{}
m.orderedObservers = append(m.orderedObservers, obs)
}

func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{
"meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate),
"meter.delegate": unsafe.Offsetof(meter{}.delegate),
"instImpl.delegate": unsafe.Offsetof(instImpl{}.delegate),
"obsImpl.delegate": unsafe.Offsetof(obsImpl{}.delegate),
"labelSet.delegate": unsafe.Offsetof(labelSet{}.delegate),
"instHandle.delegate": unsafe.Offsetof(instHandle{}.delegate),
}
}

0 comments on commit 241940a

Please sign in to comment.