Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the metric Export interface to accept a *ResourceMetrics instead of ResourceMetrics #3853

Merged
merged 11 commits into from
Mar 16, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Avoid creating new objects on all calls to `WithDeferredSetup` and `SkipContextSetup` in OpenTracing bridge. (#3833)
- The `New` and `Detect` functions from `go.opentelemetry.io/otel/sdk/resource` return errors that wrap underlying errors instead of just containing the underlying error strings. (#3844)
- The metric `Export` interface from `go.opentelemetry.io/otel/sdk/metric` accepts a `*ResourceMetrics` instead of `ResourceMetrics` (#3853)

### Removed

Expand Down
2 changes: 1 addition & 1 deletion bridge/opencensus/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (e *exporter) ExportMetrics(ctx context.Context, ocmetrics []*ocmetricdata.
if len(otelmetrics) == 0 {
return nil
}
return e.base.Export(ctx, metricdata.ResourceMetrics{
return e.base.Export(ctx, &metricdata.ResourceMetrics{
Resource: e.res,
ScopeMetrics: []metricdata.ScopeMetrics{
{
Expand Down
4 changes: 2 additions & 2 deletions bridge/opencensus/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ type fakeExporter struct {
err error
}

func (f *fakeExporter) Export(ctx context.Context, data metricdata.ResourceMetrics) error {
func (f *fakeExporter) Export(ctx context.Context, data *metricdata.ResourceMetrics) error {
if f.err == nil {
f.data = &data
f.data = data
}
return f.err
}
2 changes: 1 addition & 1 deletion exporters/otlp/otlpmetric/internal/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation
}

// Export transforms and transmits metric data to an OTLP receiver.
func (e *exporter) Export(ctx context.Context, rm metricdata.ResourceMetrics) error {
func (e *exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
otlpRm, err := transform.ResourceMetrics(rm)
// Best effort upload of transformable metrics.
e.clientMu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlpmetric/internal/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestExporterClientConcurrency(t *testing.T) {
const goroutines = 5

exp := New(&client{})
rm := metricdata.ResourceMetrics{}
rm := new(metricdata.ResourceMetrics)
ctx := context.Background()

done := make(chan struct{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// ResourceMetrics returns an OTLP ResourceMetrics generated from rm. If rm
// contains invalid ScopeMetrics, an error will be returned along with an OTLP
// ResourceMetrics that contains partial OTLP ScopeMetrics.
func ResourceMetrics(rm metricdata.ResourceMetrics) (*mpb.ResourceMetrics, error) {
func ResourceMetrics(rm *metricdata.ResourceMetrics) (*mpb.ResourceMetrics, error) {
sms, err := ScopeMetrics(rm.ScopeMetrics)
return &mpb.ResourceMetrics{
Resource: &rpb.Resource{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ var (
},
}

otelResourceMetrics = metricdata.ResourceMetrics{
otelResourceMetrics = &metricdata.ResourceMetrics{
Resource: otelRes,
ScopeMetrics: otelScopeMetrics,
}
Expand Down
6 changes: 3 additions & 3 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestConfig(t *testing.T) {
exp, coll := factoryFunc(nil, WithHeaders(headers))
t.Cleanup(coll.Shutdown)
ctx := context.Background()
require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))

Expand All @@ -187,7 +187,7 @@ func TestConfig(t *testing.T) {
t.Cleanup(coll.Shutdown)
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
err := exp.Export(ctx, metricdata.ResourceMetrics{})
err := exp.Export(ctx, &metricdata.ResourceMetrics{})
assert.ErrorContains(t, err, context.DeadlineExceeded.Error())
})

Expand All @@ -197,7 +197,7 @@ func TestConfig(t *testing.T) {
exp, coll := factoryFunc(nil, WithDialOption(grpc.WithUserAgent(customerUserAgent)))
t.Cleanup(coll.Shutdown)
ctx := context.Background()
require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))

Expand Down
16 changes: 8 additions & 8 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestConfig(t *testing.T) {
exp, coll := factoryFunc("", nil, WithHeaders(headers))
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))

Expand All @@ -94,7 +94,7 @@ func TestConfig(t *testing.T) {
// Push this after Shutdown so the HTTP server doesn't hang.
t.Cleanup(func() { close(rCh) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
err := exp.Export(ctx, metricdata.ResourceMetrics{})
err := exp.Export(ctx, &metricdata.ResourceMetrics{})
assert.ErrorContains(t, err, context.DeadlineExceeded.Error())
})

Expand All @@ -103,7 +103,7 @@ func TestConfig(t *testing.T) {
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
assert.Len(t, coll.Collect().Dump(), 1)
})

Expand Down Expand Up @@ -133,7 +133,7 @@ func TestConfig(t *testing.T) {
// Push this after Shutdown so the HTTP server doesn't hang.
t.Cleanup(func() { close(rCh) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}), "failed retry")
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}), "failed retry")
assert.Len(t, rCh, 0, "failed HTTP responses did not occur")
})

Expand All @@ -144,7 +144,7 @@ func TestConfig(t *testing.T) {
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
assert.Len(t, coll.Collect().Dump(), 1)
})

Expand All @@ -155,7 +155,7 @@ func TestConfig(t *testing.T) {
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
assert.Len(t, coll.Collect().Dump(), 1)
})

Expand All @@ -166,7 +166,7 @@ func TestConfig(t *testing.T) {
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
assert.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
assert.Len(t, coll.Collect().Dump(), 1)
})

Expand All @@ -176,7 +176,7 @@ func TestConfig(t *testing.T) {
exp, coll := factoryFunc("", nil, WithHeaders(headers))
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))

Expand Down
4 changes: 2 additions & 2 deletions exporters/stdout/stdoutmetric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func Example() {
// This is where the sdk would be used to create a Meter and from that
// instruments that would make measurements of your code. To simulate that
// behavior, call export directly with mocked data.
_ = exp.Export(ctx, mockData)
_ = exp.Export(ctx, &mockData)

// Ensure the periodic reader is cleaned up by shutting down the sdk.
_ = sdk.Shutdown(ctx)
Expand Down Expand Up @@ -325,6 +325,6 @@ func Example() {
// }
// }
// ],
// "ScopeMetrics": []
// "ScopeMetrics": null
// }
}
26 changes: 7 additions & 19 deletions exporters/stdout/stdoutmetric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (e *exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation
return e.aggregationSelector(k)
}

func (e *exporter) Export(ctx context.Context, data metricdata.ResourceMetrics) error {
func (e *exporter) Export(ctx context.Context, data *metricdata.ResourceMetrics) error {
select {
case <-ctx.Done():
// Don't do anything if the context has already timed out.
Expand All @@ -71,7 +71,7 @@ func (e *exporter) Export(ctx context.Context, data metricdata.ResourceMetrics)
// Context is still valid, continue.
}
if e.redactTimestamps {
data = redactTimestamps(data)
redactTimestamps(data)
}
return e.encVal.Load().(encoderHolder).Encode(data)
}
Expand All @@ -90,26 +90,14 @@ func (e *exporter) Shutdown(ctx context.Context) error {
return ctx.Err()
}

func redactTimestamps(orig metricdata.ResourceMetrics) metricdata.ResourceMetrics {
rm := metricdata.ResourceMetrics{
Resource: orig.Resource,
ScopeMetrics: make([]metricdata.ScopeMetrics, len(orig.ScopeMetrics)),
}
func redactTimestamps(orig *metricdata.ResourceMetrics) {
for i, sm := range orig.ScopeMetrics {
rm.ScopeMetrics[i] = metricdata.ScopeMetrics{
Scope: sm.Scope,
Metrics: make([]metricdata.Metrics, len(sm.Metrics)),
}
for j, m := range sm.Metrics {
rm.ScopeMetrics[i].Metrics[j] = metricdata.Metrics{
Name: m.Name,
Description: m.Description,
Unit: m.Unit,
Data: redactAggregationTimestamps(m.Data),
}
metrics := sm.Metrics
for j, m := range metrics {
data := m.Data
orig.ScopeMetrics[i].Metrics[j].Data = redactAggregationTimestamps(data)
}
}
return rm
}

var (
Expand Down
4 changes: 2 additions & 2 deletions exporters/stdout/stdoutmetric/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ func TestExporterHonorsContextErrors(t *testing.T) {
exp, err := stdoutmetric.New(testEncoderOption())
require.NoError(t, err)
return func(ctx context.Context) error {
var data metricdata.ResourceMetrics
data := new(metricdata.ResourceMetrics)
return exp.Export(ctx, data)
}
}))
}

func TestShutdownExporterReturnsShutdownErrorOnExport(t *testing.T) {
var (
data metricdata.ResourceMetrics
data = new(metricdata.ResourceMetrics)
ctx = context.Background()
exp, err = stdoutmetric.New(testEncoderOption())
)
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Exporter interface {
// The passed ResourceMetrics may be reused when the call completes. If an
// exporter needs to hold this data after it returns, it needs to make a
// copy.
Export(context.Context, metricdata.ResourceMetrics) error
Export(context.Context, *metricdata.ResourceMetrics) error

// ForceFlush flushes any metric data held by an exporter.
//
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
err := r.Collect(ctx, rm)
if err == nil {
err = r.export(ctx, *rm)
err = r.export(ctx, rm)
}
r.rmPool.Put(rm)
return err
Expand Down Expand Up @@ -275,7 +275,7 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricd
}

// export exports metric data m using r's exporter.
func (r *periodicReader) export(ctx context.Context, m metricdata.ResourceMetrics) error {
func (r *periodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
c, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
return r.exporter.Export(c, m)
Expand Down Expand Up @@ -321,7 +321,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
err = r.collect(ctx, ph, m)
if err == nil {
err = r.export(ctx, *m)
err = r.export(ctx, m)
}
r.rmPool.Put(m)
}
Expand Down
14 changes: 7 additions & 7 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestIntervalEnvAndOption(t *testing.T) {
type fnExporter struct {
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
exportFunc func(context.Context, metricdata.ResourceMetrics) error
exportFunc func(context.Context, *metricdata.ResourceMetrics) error
flushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
Expand All @@ -173,7 +173,7 @@ func (e *fnExporter) Aggregation(k InstrumentKind) aggregation.Aggregation {
return DefaultAggregationSelector(k)
}

func (e *fnExporter) Export(ctx context.Context, m metricdata.ResourceMetrics) error {
func (e *fnExporter) Export(ctx context.Context, m *metricdata.ResourceMetrics) error {
if e.exportFunc != nil {
return e.exportFunc(ctx, m)
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (ts *periodicReaderTestSuite) SetupTest() {
ts.readerTestSuite.SetupTest()

e := &fnExporter{
exportFunc: func(context.Context, metricdata.ResourceMetrics) error { return assert.AnError },
exportFunc: func(context.Context, *metricdata.ResourceMetrics) error { return assert.AnError },
flushFunc: func(context.Context) error { return assert.AnError },
shutdownFunc: func(context.Context) error { return assert.AnError },
}
Expand Down Expand Up @@ -282,9 +282,9 @@ func TestPeriodicReaderRun(t *testing.T) {
otel.SetErrorHandler(eh)

exp := &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
exportFunc: func(_ context.Context, m *metricdata.ResourceMetrics) error {
// The testSDKProducer produces testResourceMetricsAB.
assert.Equal(t, testResourceMetricsAB, m)
assert.Equal(t, testResourceMetricsAB, *m)
return assert.AnError
},
}
Expand All @@ -307,9 +307,9 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
expFunc := func(t *testing.T) (exp Exporter, called *bool) {
called = new(bool)
return &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
exportFunc: func(_ context.Context, m *metricdata.ResourceMetrics) error {
// The testSDKProducer produces testResourceMetricsA.
assert.Equal(t, testResourceMetricsAB, m)
assert.Equal(t, testResourceMetricsAB, *m)
*called = true
return assert.AnError
},
Expand Down