Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Metric Producer as a new interface, which returns scope metrics #3524

Merged
merged 15 commits into from
Dec 15, 2022
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)

### Added

- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)

## [1.11.2/0.34.0] 2022-12-05

### Added
Expand Down
21 changes: 13 additions & 8 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ func unify(funcs []func(context.Context) error) func(context.Context) error {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
return unifyErrors(errs)
}
}

// unifyErrors combines multiple errors into a single error
func unifyErrors(errs []error) error {
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
}

Expand Down
16 changes: 9 additions & 7 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
)

type reader struct {
producer producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
producer sdkProducer
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Reader = (*reader)(nil)
Expand All @@ -42,7 +43,8 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n
return r.aggregationFunc(kind)
}

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) register(p sdkProducer) { r.producer = p }
func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.externalProducers, p) }
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
Expand Down
51 changes: 40 additions & 11 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
// manualReader is a simple Reader that allows an application to
// read metrics on demand.
type manualReader struct {
producer atomic.Value
sdkProducer atomic.Value
shutdownOnce sync.Once

mu sync.Mutex
externalProducers atomic.Value

temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
}
Expand All @@ -41,22 +44,36 @@ var _ = map[Reader]struct{}{&manualReader{}: {}}
// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader(opts ...ManualReaderOption) Reader {
cfg := newManualReaderConfig(opts)
return &manualReader{
r := &manualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
r.externalProducers.Store([]Producer{})
return r
}

// register stores the Producer which enables the caller to read
// metrics on demand.
func (mr *manualReader) register(p producer) {
// register stores the sdkProducer which enables the caller
// to read metrics from the SDK on demand.
func (mr *manualReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
global.Error(errDuplicateRegister, msg)
}
}

// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
func (mr *manualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
currentProducers := mr.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
mr.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
Expand All @@ -77,18 +94,18 @@ func (mr *manualReader) Shutdown(context.Context) error {
err := ErrReaderShutdown
mr.shutdownOnce.Do(func() {
// Any future call to Collect will now return ErrReaderShutdown.
mr.producer.Store(produceHolder{
mr.sdkProducer.Store(produceHolder{
produce: shutdownProducer{}.produce,
})
err = nil
})
return err
}

// Collect gathers all metrics from the SDK, calling any callbacks necessary.
// Collect will return an error if called after shutdown.
// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
p := mr.producer.Load()
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
}
Expand All @@ -103,7 +120,19 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
return metricdata.ResourceMetrics{}, err
}

return ph.produce(ctx)
rm, err := ph.produce(ctx)
if err != nil {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
return metricdata.ResourceMetrics{}, err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
}

// manualReaderConfig contains configuration options for a ManualReader.
Expand Down
45 changes: 37 additions & 8 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
cancel: cancel,
done: make(chan struct{}),
}
r.externalProducers.Store([]Producer{})
dashpole marked this conversation as resolved.
Show resolved Hide resolved

go func() {
defer func() { close(r.done) }()
Expand All @@ -126,7 +127,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
// periodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type periodicReader struct {
producer atomic.Value
sdkProducer atomic.Value

mu sync.Mutex
externalProducers atomic.Value

timeout time.Duration
exporter Exporter
Expand Down Expand Up @@ -166,14 +170,25 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
}

// register registers p as the producer of this reader.
func (r *periodicReader) register(p producer) {
func (r *periodicReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
}
}

// RegisterProducer registers p as an external Producer of this reader.
func (r *periodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
currentProducers := r.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
r.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
Expand All @@ -195,12 +210,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
// the SDK and other Producers. The returned metric data is not exported
// to the configured exporter, it is left to the caller to handle that if
// desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collect(ctx, r.producer.Load())
return r.collect(ctx, r.sdkProducer.Load())
}

// collect unwraps p as a produceHolder and returns its produce results.
Expand All @@ -218,7 +234,20 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
}
return ph.produce(ctx)

rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
}

// export exports metric data m using r's exporter.
Expand Down Expand Up @@ -259,7 +288,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
<-r.done

// Any future call to Collect will now return ErrReaderShutdown.
ph := r.producer.Swap(produceHolder{
ph := r.sdkProducer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
})

Expand Down
20 changes: 12 additions & 8 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (ts *periodicReaderTestSuite) SetupTest() {
}

ts.ErrReader = NewPeriodicReader(e)
ts.ErrReader.register(testProducer{})
ts.ErrReader.register(testSDKProducer{})
ts.ErrReader.RegisterProducer(testExternalProducer{})
}

func (ts *periodicReaderTestSuite) TearDownTest() {
Expand Down Expand Up @@ -186,14 +187,15 @@ func TestPeriodicReaderRun(t *testing.T) {

exp := &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
// The testSDKProducer produces testResourceMetricsAB.
assert.Equal(t, testResourceMetricsAB, m)
return assert.AnError
},
}

r := NewPeriodicReader(exp)
r.register(testProducer{})
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
trigger <- time.Now()
assert.Equal(t, assert.AnError, <-eh.Err)

Expand All @@ -210,8 +212,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
called = new(bool)
return &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
// The testSDKProducer produces testResourceMetricsA.
assert.Equal(t, testResourceMetricsAB, m)
*called = true
return assert.AnError
},
Expand All @@ -221,7 +223,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")

Expand All @@ -232,7 +235,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})
Expand Down
19 changes: 16 additions & 3 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ type Reader interface {
// register registers a Reader with a MeterProvider.
// The producer argument allows the Reader to signal the sdk to collect
// and send aggregated metric measurements.
register(producer)
register(sdkProducer)

// RegisterProducer registers a an external Producer with this Reader.
// The Producer is used as a source of aggregated metric data which is
// incorporated into metrics collected from the SDK.
RegisterProducer(Producer)

// temporality reports the Temporality for the instrument kind provided.
temporality(InstrumentKind) metricdata.Temporality
Expand Down Expand Up @@ -84,14 +89,22 @@ type Reader interface {
Shutdown(context.Context) error
}

// producer produces metrics for a Reader.
type producer interface {
// sdkProducer produces metrics for a Reader.
type sdkProducer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (metricdata.ResourceMetrics, error)
}

// Producer produces metrics for a Reader from an external source.
type Producer interface {
// Produce returns aggregated metrics from an external source.
//
// This method should be safe to call concurrently.
Produce(context.Context) ([]metricdata.ScopeMetrics, error)
}

// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
Expand Down