Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove comparable constraint on Readers. #3202

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// config contains configuration options for a MeterProvider.
type config struct {
res *resource.Resource
readers map[Reader][]view.View
entries []registryEntry
}

// readerSignals returns a force-flush and shutdown function for a
Expand All @@ -38,9 +38,9 @@ type config struct {
// single functions.
func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
var fFuncs, sFuncs []func(context.Context) error
for r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
for _, ent := range c.entries {
sFuncs = append(sFuncs, ent.reader.Shutdown)
fFuncs = append(fFuncs, ent.reader.ForceFlush)
}

return unify(fFuncs), unifyShutdown(sFuncs)
Expand Down Expand Up @@ -126,14 +126,14 @@ func WithResource(res *resource.Resource) Option {
// operations; no data will be exported without a Reader.
func WithReader(r Reader, views ...view.View) Option {
return optionFunc(func(cfg config) config {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.View)
}
if len(views) == 0 {
views = []view.View{{}}
}

cfg.readers[r] = views
cfg.entries = append(cfg.entries, registryEntry{
reader: r,
views: views,
})
return cfg
})
}
2 changes: 1 addition & 1 deletion sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,5 @@ func TestWithResource(t *testing.T) {
func TestWithReader(t *testing.T) {
r := &reader{}
c := newConfig([]Option{WithReader(r)})
assert.Contains(t, c.readers, r)
assert.Contains(t, c.entries, registryEntry{reader: r, views: []view.View{{}}})
}
39 changes: 24 additions & 15 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,27 +161,34 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve
// new Aggregators from a pipelineRegistry.
type pipelineRegistry struct {
views map[Reader][]view.View
pipelines map[Reader]*pipeline
entries []registryEntry
}

func newPipelineRegistries(views map[Reader][]view.View) *pipelineRegistry {
pipelines := map[Reader]*pipeline{}
for rdr := range views {
pipe := &pipeline{}
rdr.register(pipe)
pipelines[rdr] = pipe
type registryEntry struct {
reader Reader
views []view.View
pipeline *pipeline
}

func newPipelineRegistries(entries []registryEntry) *pipelineRegistry {
reg := &pipelineRegistry{
entries: make([]registryEntry, len(entries)),
}
return &pipelineRegistry{
views: views,
pipelines: pipelines,
for i, entry := range entries {
entry.pipeline = &pipeline{}
entry.reader.register(entry.pipeline)
reg.entries[i] = entry
}
return reg
}

// TODO (#3053) Only register callbacks if any instrument matches in a view.
func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) {
for _, pipe := range reg.pipelines {
pipe.addCallback(fn)
// for _, pipe := range reg.pipelines {
// pipe.addCallback(fn)
// }
for _, entry := range reg.entries {
entry.pipeline.addCallback(fn)
}
}

Expand All @@ -192,8 +199,10 @@ func createAggregators[N int64 | float64](reg *pipelineRegistry, inst view.Instr
var aggs []internal.Aggregator[N]

errs := &multierror{}
for rdr, views := range reg.views {
pipe := reg.pipelines[rdr]
for _, entry := range reg.entries {
rdr := entry.reader
views := entry.views
pipe := entry.pipeline
rdrAggs, err := createAggregatorsForReader[N](rdr, views, inst)
if err != nil {
errs.append(err)
Expand Down
94 changes: 56 additions & 38 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

testCases := []struct {
name string
views map[Reader][]view.View
entries []registryEntry
inst view.Instrument
wantCount int
}{
Expand All @@ -263,59 +263,75 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
{
name: "1 reader 1 view gets 1 aggregator",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
},
entries: []registryEntry{
{reader: testRdr, views: []view.View{{}}},
},
wantCount: 1,
},
{
name: "1 reader 2 views gets 2 aggregator",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
renameView,
entries: []registryEntry{
{
reader: testRdr,
views: []view.View{
{},
renameView,
},
},
},
wantCount: 2,
},
{
name: "2 readers 1 view each gets 2 aggregators",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
entries: []registryEntry{
{
reader: testRdr,
views: []view.View{
{},
},
},
testRdrHistogram: {
{},
{
reader: testRdrHistogram,
views: []view.View{
{},
},
},
},
wantCount: 2,
},
{
name: "2 reader 2 views each gets 4 aggregators",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
renameView,
entries: []registryEntry{
{
reader: testRdr,
views: []view.View{
{},
renameView,
},
},
testRdrHistogram: {
{},
renameView,
{
reader: testRdrHistogram,
views: []view.View{
{},
renameView,
},
},
},
wantCount: 4,
},
{
name: "An instrument is duplicated in two views share the same aggregator",
inst: view.Instrument{Name: "foo"},
views: map[Reader][]view.View{
testRdr: {
{},
{},
entries: []registryEntry{
{
reader: testRdr,
views: []view.View{
{},
{},
},
},
},
wantCount: 1,
Expand All @@ -324,9 +340,9 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
reg := newPipelineRegistries(tt.views)
reg := newPipelineRegistries(tt.entries)
testPipelineRegistryCreateIntAggregators(t, reg, tt.wantCount)
reg = newPipelineRegistries(tt.views)
reg = newPipelineRegistries(tt.entries)
testPipelineRegistryCreateFloatAggregators(t, reg, tt.wantCount)
})
}
Expand All @@ -353,19 +369,17 @@ func testPipelineRegistryCreateFloatAggregators(t *testing.T, reg *pipelineRegis
func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} }))

views := map[Reader][]view.View{
testRdrHistogram: {
{},
},
entries := []registryEntry{
{reader: testRdrHistogram, views: []view.View{{}}},
}
reg := newPipelineRegistries(views)
reg := newPipelineRegistries(entries)
inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge}

intAggs, err := createAggregators[int64](reg, inst, unit.Dimensionless)
assert.Error(t, err)
assert.Len(t, intAggs, 0)

reg = newPipelineRegistries(views)
reg = newPipelineRegistries(entries)

floatAggs, err := createAggregators[float64](reg, inst, unit.Dimensionless)
assert.Error(t, err)
Expand All @@ -377,17 +391,21 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) {
view.MatchInstrumentName("bar"),
view.WithRename("foo"),
)
views := map[Reader][]view.View{
NewManualReader(): {
{},
renameView,

entries := []registryEntry{
{
reader: NewManualReader(),
views: []view.View{
{},
renameView,
},
},
}

fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter}
barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter}

reg := newPipelineRegistries(views)
reg := newPipelineRegistries(entries)

intAggs, err := createAggregators[int64](reg, fooInst, unit.Dimensionless)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {

flush, sdown := conf.readerSignals()

registry := newPipelineRegistries(conf.readers)
registry := newPipelineRegistries(conf.entries)

return &MeterProvider{
res: conf.res,
Expand Down