Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hook for statsd integration #7417

Merged
merged 4 commits into from Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions go/stats/histogram.go
Expand Up @@ -27,6 +27,7 @@ import (
// splitting the counts under different buckets
// using specified cutoffs.
type Histogram struct {
name string
help string
cutoffs []int64
labels []string
Expand Down Expand Up @@ -60,6 +61,7 @@ func NewGenericHistogram(name, help string, cutoffs []int64, labels []string, co
panic("mismatched cutoff and label lengths")
}
h := &Histogram{
name: name,
help: help,
cutoffs: cutoffs,
labels: labels,
Expand All @@ -85,6 +87,9 @@ func (h *Histogram) Add(value int64) {
if h.hook != nil {
h.hook(value)
}
if defaultStatsdHook.histogramHook != nil && h.name != "" {
defaultStatsdHook.histogramHook(h.name, value)
}
}

// String returns a string representation of the Histogram.
Expand Down
34 changes: 34 additions & 0 deletions go/stats/hooks.go
@@ -0,0 +1,34 @@
/*
Copyright 2019 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package stats

type statsdHook struct {
timerHook func(string, string, int64, *Timings)
histogramHook func(string, int64)
}

var defaultStatsdHook = statsdHook{}

// RegisterTimerHook registers timer hook
func RegisterTimerHook(hook func(string, string, int64, *Timings)) {
defaultStatsdHook.timerHook = hook
}

// RegisterHistogramHook registers timer hook
func RegisterHistogramHook(hook func(string, int64)) {
defaultStatsdHook.histogramHook = hook
}
58 changes: 15 additions & 43 deletions go/stats/statsd/statsd.go
Expand Up @@ -46,25 +46,6 @@ func makeLabels(labelNames []string, labelValsCombined string) []string {
return tags
}

func (sb StatsBackend) addHistogram(name string, h *stats.Histogram, tags []string) {
labels := h.Labels()
buckets := h.Buckets()
for i := range labels {
name := fmt.Sprintf("%s.%s", name, labels[i])
sb.statsdClient.Gauge(name, float64(buckets[i]), tags, sb.sampleRate)
}
sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.CountLabel()),
(float64)(h.Count()),
tags,
sb.sampleRate,
)
sb.statsdClient.Gauge(fmt.Sprintf("%s.%s", name, h.TotalLabel()),
(float64)(h.Total()),
tags,
sb.sampleRate,
)
}

// Init initializes the statsd with the given namespace.
func Init(namespace string) {
servenv.OnRun(func() {
Expand All @@ -81,16 +62,23 @@ func Init(namespace string) {
sb.statsdClient = statsdC
sb.sampleRate = *statsdSampleRate
stats.RegisterPushBackend("statsd", sb)
stats.RegisterTimerHook(func(statsName, name string, value int64, timings *stats.Timings) {
tags := makeLabels(strings.Split(timings.Label(), "."), name)
if err := statsdC.TimeInMilliseconds(statsName, float64(value), tags, sb.sampleRate); err != nil {
log.Errorf("Fail to TimeInMilliseconds %v: %v", statsName, err)
}
})
stats.RegisterHistogramHook(func(name string, val int64) {
if err := statsdC.Histogram(name, float64(val), []string{}, sb.sampleRate); err != nil {
log.Errorf("Fail to Histogram for %v: %v", name, err)
}
})
})
}

func (sb StatsBackend) addExpVar(kv expvar.KeyValue) {
k := kv.Key
switch v := kv.Value.(type) {
case *stats.String:
if err := sb.statsdClient.Set(k, v.Get(), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add String %v for key %v", v, k)
}
case *stats.Counter:
if err := sb.statsdClient.Count(k, v.Get(), nil, sb.sampleRate); err != nil {
log.Errorf("Failed to add Counter %v for key %v", v, k)
Expand Down Expand Up @@ -159,25 +147,9 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) {
log.Errorf("Failed to add GaugesWithSingleLabel %v for key %v", v, k)
}
}
case *stats.MultiTimings:
labels := v.Labels()
hists := v.Histograms()
for labelValsCombined, histogram := range hists {
sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined))
}
case *stats.Timings:
// TODO: for statsd.timing metrics, there is no good way to transfer the histogram to it
// If we store a in memory buffer for stats.Timings and flush it here it's hard to make the stats
// thread safe.
// Instead, we export the timings stats as histogram here. We won't have the percentile breakdown
// for the metrics, but we can still get the average from total and count
labels := []string{v.Label()}
hists := v.Histograms()
for labelValsCombined, histogram := range hists {
sb.addHistogram(k, histogram, makeLabels(labels, labelValsCombined))
}
case *stats.Histogram:
sb.addHistogram(k, v, []string{})
case *stats.Timings, *stats.MultiTimings, *stats.Histogram:
// it does not make sense to export static expvar to statsd,
// instead we rely on hooks to integrate with statsd' timing and histogram api directly
case expvar.Func:
// Export memstats as gauge so that we don't need to call extra ReadMemStats
if k == "memstats" {
Expand Down Expand Up @@ -209,7 +181,7 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) {
}
}
}
case *stats.StringMapFunc, *stats.Rates, *stats.RatesFunc:
case *stats.Rates, *stats.RatesFunc, *stats.String, *stats.StringFunc, *stats.StringMapFunc:
// Silently ignore metrics that does not make sense to be exported to statsd
default:
log.Warningf("Silently ignore metrics with key %v [%T]", k, kv.Value)
Expand Down
76 changes: 14 additions & 62 deletions go/stats/statsd/statsd_test.go
Expand Up @@ -25,35 +25,14 @@ func getBackend(t *testing.T) (StatsBackend, *net.UDPConn) {
sb.namespace = "foo"
sb.sampleRate = 1
sb.statsdClient = client
return sb, server
}

func TestStatsdString(t *testing.T) {
sb, server := getBackend(t)
defer server.Close()
name := "string_name"
stats.NewString(name).Set("foo")
found := false
expvar.Do(func(kv expvar.KeyValue) {
if kv.Key == name {
found = true
sb.addExpVar(kv)
if err := sb.statsdClient.Flush(); err != nil {
t.Errorf("Error flushing: %s", err)
}
bytes := make([]byte, 4096)
n, err := server.Read(bytes)
if err != nil {
t.Fatal(err)
}
result := string(bytes[:n])
expected := "test.string_name:foo|s"
assert.Equal(t, result, expected)
}
stats.RegisterTimerHook(func(stats, name string, value int64, timings *stats.Timings) {
tags := makeLabels(strings.Split(timings.Label(), "."), name)
client.TimeInMilliseconds(stats, float64(value), tags, sb.sampleRate)
})
if !found {
t.Errorf("Stat %s not found...", name)
}
stats.RegisterHistogramHook(func(name string, val int64) {
client.Histogram(name, float64(val), []string{}, sb.sampleRate)
})
return sb, server
}

func TestStatsdCounter(t *testing.T) {
Expand Down Expand Up @@ -393,20 +372,8 @@ func TestStatsdMultiTimings(t *testing.T) {
t.Fatal(err)
}
result := string(bytes[:n])
expected := []string{
"test.multi_timings_name.500000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.1000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.5000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.10000000:1.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.50000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.100000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.500000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.1000000000:0.000000|g|#label1:foo,label2:bar",
"test.multi_timings_name.5000000000:0.000000|g|#label1:foo,label2:bar",
}
for i, res := range strings.Split(result, "\n") {
assert.Equal(t, res, expected[i])
}
expected := "test.multi_timings_name:10.000000|ms|#label1:foo,label2:bar"
assert.Equal(t, result, expected)
}
})
if !found {
Expand Down Expand Up @@ -434,20 +401,8 @@ func TestStatsdTimings(t *testing.T) {
t.Fatal(err)
}
result := string(bytes[:n])
expected := []string{
"test.timings_name.500000:0.000000|g|#label1:foo",
"test.timings_name.1000000:0.000000|g|#label1:foo",
"test.timings_name.5000000:1.000000|g|#label1:foo",
"test.timings_name.10000000:0.000000|g|#label1:foo",
"test.timings_name.50000000:0.000000|g|#label1:foo",
"test.timings_name.100000000:0.000000|g|#label1:foo",
"test.timings_name.500000000:0.000000|g|#label1:foo",
"test.timings_name.1000000000:0.000000|g|#label1:foo",
"test.timings_name.5000000000:0.000000|g|#label1:foo",
}
for i, res := range strings.Split(result, "\n") {
assert.Equal(t, res, expected[i])
}
expected := "test.timings_name:2.000000|ms|#label1:foo"
assert.Equal(t, result, expected)
}
})
if !found {
Expand Down Expand Up @@ -478,12 +433,9 @@ func TestStatsdHistogram(t *testing.T) {
}
result := string(bytes[:n])
expected := []string{
"test.histogram_name.1:0.000000|g",
"test.histogram_name.5:2.000000|g",
"test.histogram_name.10:1.000000|g",
"test.histogram_name.inf:0.000000|g",
"test.histogram_name.Count:3.000000|g",
"test.histogram_name.Total:11.000000|g",
"test.histogram_name:2.000000|h",
"test.histogram_name:3.000000|h",
"test.histogram_name:6.000000|h",
}
for i, res := range strings.Split(result, "\n") {
assert.Equal(t, res, expected[i])
Expand Down
16 changes: 12 additions & 4 deletions go/stats/timings.go
Expand Up @@ -34,6 +34,7 @@ type Timings struct {
mu sync.RWMutex
histograms map[string]*Histogram

name string
help string
label string
labelCombined bool
Expand All @@ -46,6 +47,7 @@ type Timings struct {
func NewTimings(name, help, label string, categories ...string) *Timings {
t := &Timings{
histograms: make(map[string]*Histogram),
name: name,
help: help,
label: label,
labelCombined: IsDimensionCombined(label),
Expand Down Expand Up @@ -87,6 +89,9 @@ func (t *Timings) Add(name string, elapsed time.Duration) {
}
t.mu.Unlock()
}
if defaultStatsdHook.timerHook != nil && t.name != "" {
defaultStatsdHook.timerHook(t.name, name, elapsed.Milliseconds(), t)
}

elapsedNs := int64(elapsed)
hist.Add(elapsedNs)
Expand Down Expand Up @@ -198,16 +203,19 @@ type MultiTimings struct {

// NewMultiTimings creates a new MultiTimings object.
func NewMultiTimings(name string, help string, labels []string) *MultiTimings {
combinedLabels := make([]bool, len(labels))
for i, label := range labels {
combinedLabels[i] = IsDimensionCombined(label)
}
t := &MultiTimings{
Timings: Timings{
histograms: make(map[string]*Histogram),
name: name,
help: help,
label: safeJoinLabels(labels, combinedLabels),
},
labels: labels,
combinedLabels: make([]bool, len(labels)),
}
for i, label := range labels {
t.combinedLabels[i] = IsDimensionCombined(label)
combinedLabels: combinedLabels,
}
if name != "" {
publish(name, t)
Expand Down