Skip to content

Commit

Permalink
Optimize open-census usage (#416)
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn committed Sep 14, 2021
1 parent b208a23 commit da9399e
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 18 deletions.
93 changes: 77 additions & 16 deletions monitoring/monitoring.go
Expand Up @@ -231,8 +231,14 @@ func (d *derivedFloat64Metric) Register() error {
type float64Metric struct {
*stats.Float64Measure

// tags stores all tags for the metrics
tags []tag.Mutator
// ctx is a precomputed context holding tags, as an optimization
ctx context.Context
view *view.View

incrementMeasure []stats.Measurement
decrementMeasure []stats.Measurement
}

func createOptions(opts ...Options) *options {
Expand All @@ -249,19 +255,23 @@ func newFloat64Metric(name, description string, aggregation *view.Aggregation, o
for _, l := range opts.labels {
tagKeys = append(tagKeys, tag.Key(l))
}
ctx, _ := tag.New(context.Background()) //nolint:errcheck
return &float64Metric{
measure,
make([]tag.Mutator, 0),
&view.View{Measure: measure, TagKeys: tagKeys, Aggregation: aggregation},
Float64Measure: measure,
tags: make([]tag.Mutator, 0),
ctx: ctx,
view: &view.View{Measure: measure, TagKeys: tagKeys, Aggregation: aggregation},
incrementMeasure: []stats.Measurement{measure.M(1)},
decrementMeasure: []stats.Measurement{measure.M(-1)},
}
}

func (f *float64Metric) Increment() {
f.Record(1)
f.recordMeasurements(f.incrementMeasure)
}

func (f *float64Metric) Decrement() {
f.Record(-1)
f.recordMeasurements(f.decrementMeasure)
}

func (f *float64Metric) Name() string {
Expand All @@ -274,20 +284,40 @@ func (f *float64Metric) Record(value float64) {
rh.OnRecordFloat64Measure(f.Float64Measure, f.tags, value)
}
recordHookMutex.RUnlock()
stats.RecordWithTags(context.Background(), f.tags, f.M(value)) //nolint:errcheck
m := f.M(value)
stats.Record(f.ctx, m) //nolint:errcheck
}

func (f *float64Metric) recordMeasurements(m []stats.Measurement) {
recordHookMutex.RLock()
if rh, ok := recordHooks[f.Name()]; ok {
for _, mv := range m {
rh.OnRecordFloat64Measure(f.Float64Measure, f.tags, mv.Value())
}
}
recordHookMutex.RUnlock()
stats.Record(f.ctx, m...)
}

func (f *float64Metric) RecordInt(value int64) {
f.Record(float64(value))
}

func (f *float64Metric) With(labelValues ...LabelValue) Metric {
t := make([]tag.Mutator, len(f.tags))
t := make([]tag.Mutator, len(f.tags), len(f.tags)+len(labelValues))
copy(t, f.tags)
for _, tagValue := range labelValues {
t = append(t, tag.Mutator(tagValue))
}
return &float64Metric{f.Float64Measure, t, f.view}
ctx, _ := tag.New(context.Background(), t...) //nolint:errcheck
return &float64Metric{
Float64Measure: f.Float64Measure,
tags: t,
ctx: ctx,
view: f.view,
incrementMeasure: f.incrementMeasure,
decrementMeasure: f.decrementMeasure,
}
}

func (f *float64Metric) Register() error {
Expand All @@ -297,8 +327,16 @@ func (f *float64Metric) Register() error {
type int64Metric struct {
*stats.Int64Measure

// tags stores all tags for the metrics
tags []tag.Mutator
// ctx is a precomputed context holding tags, as an optimization
ctx context.Context
view *view.View

// incrementMeasure is a precomputed +1 measurement to avoid extra allocations in Increment()
incrementMeasure []stats.Measurement
// decrementMeasure is a precomputed -1 measurement to avoid extra allocations in Decrement()
decrementMeasure []stats.Measurement
}

func newInt64Metric(name, description string, aggregation *view.Aggregation, opts *options) *int64Metric {
Expand All @@ -307,19 +345,23 @@ func newInt64Metric(name, description string, aggregation *view.Aggregation, opt
for _, l := range opts.labels {
tagKeys = append(tagKeys, tag.Key(l))
}
ctx, _ := tag.New(context.Background()) //nolint:errcheck
return &int64Metric{
measure,
make([]tag.Mutator, 0),
&view.View{Measure: measure, TagKeys: tagKeys, Aggregation: aggregation},
Int64Measure: measure,
tags: make([]tag.Mutator, 0),
ctx: ctx,
view: &view.View{Measure: measure, TagKeys: tagKeys, Aggregation: aggregation},
incrementMeasure: []stats.Measurement{measure.M(1)},
decrementMeasure: []stats.Measurement{measure.M(-1)},
}
}

func (i *int64Metric) Increment() {
i.RecordInt(1)
i.recordMeasurements(i.incrementMeasure)
}

func (i *int64Metric) Decrement() {
i.RecordInt(-1)
i.recordMeasurements(i.decrementMeasure)
}

func (i *int64Metric) Name() string {
Expand All @@ -330,22 +372,41 @@ func (i *int64Metric) Record(value float64) {
i.RecordInt(int64(math.Floor(value)))
}

func (i *int64Metric) recordMeasurements(m []stats.Measurement) {
recordHookMutex.RLock()
if rh, ok := recordHooks[i.Name()]; ok {
for _, mv := range m {
rh.OnRecordInt64Measure(i.Int64Measure, i.tags, int64(math.Floor(mv.Value())))
}
}
recordHookMutex.RUnlock()
stats.Record(i.ctx, m...) //nolint:errcheck
}

func (i *int64Metric) RecordInt(value int64) {
recordHookMutex.RLock()
if rh, ok := recordHooks[i.Name()]; ok {
rh.OnRecordInt64Measure(i.Int64Measure, i.tags, value)
}
recordHookMutex.RUnlock()
stats.RecordWithTags(context.Background(), i.tags, i.M(value)) //nolint:errcheck
stats.Record(i.ctx, i.M(value)) //nolint:errcheck
}

func (i *int64Metric) With(labelValues ...LabelValue) Metric {
t := make([]tag.Mutator, len(i.tags))
t := make([]tag.Mutator, len(i.tags), len(i.tags)+len(labelValues))
copy(t, i.tags)
for _, tagValue := range labelValues {
t = append(t, tag.Mutator(tagValue))
}
return &int64Metric{i.Int64Measure, t, i.view}
ctx, _ := tag.New(context.Background(), t...) //nolint:errcheck
return &int64Metric{
Int64Measure: i.Int64Measure,
tags: t,
ctx: ctx,
view: i.view,
incrementMeasure: i.incrementMeasure,
decrementMeasure: i.decrementMeasure,
}
}

func (i *int64Metric) Register() error {
Expand Down
14 changes: 12 additions & 2 deletions monitoring/monitoring_test.go
Expand Up @@ -111,7 +111,7 @@ func TestSum(t *testing.T) {
testSumVal = sd.Value
}
} else {
return fmt.Errorf("unknown row in results: %v", r)
return fmt.Errorf("unknown row in results: %+v", r)
}
}
if got, want := goofySumVal, 44.0; got != want {
Expand All @@ -127,7 +127,7 @@ func TestSum(t *testing.T) {
int64SumVal = int64(sd.Value)
}
} else {
return fmt.Errorf("unknown row in results: %v", r)
return fmt.Errorf("unknown row in results: %+v", r)
}
}
if got, want := int64SumVal, int64(21); got != want {
Expand Down Expand Up @@ -458,3 +458,13 @@ func (r *testRecordHook) OnRecordFloat64Measure(f *stats.Float64Measure, tags []
}
hookSum.With(name.Value(v)).Record(value)
}

func BenchmarkCounter(b *testing.B) {
exp := &testExporter{rows: make(map[string][]*view.Row)}
view.RegisterExporter(exp)
view.SetReportingPeriod(1 * time.Millisecond)

for n := 0; n < b.N; n++ {
int64Sum.Increment()
}
}

0 comments on commit da9399e

Please sign in to comment.