Skip to content

Commit

Permalink
statsd: allow template parsing fields. Default to value=
Browse files Browse the repository at this point in the history
closes #602
  • Loading branch information
sparrc committed Jan 28, 2016
1 parent 962325c commit b70900a
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 58 deletions.
112 changes: 69 additions & 43 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/influxdb/services/graphite"

Expand Down Expand Up @@ -51,6 +52,8 @@ type Statsd struct {
done chan struct{}

// Cache gauges, counters & sets so they can be aggregated as they arrive
// gauges and counters map measurement/tags hash -> field name -> metrics
// sets and timings map measurement/tags hash -> metrics
gauges map[string]cachedgauge
counters map[string]cachedcounter
sets map[string]cachedset
Expand Down Expand Up @@ -80,6 +83,7 @@ func NewStatsd() *Statsd {
// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
type metric struct {
name string
field string
bucket string
hash string
intvalue int64
Expand All @@ -91,21 +95,21 @@ type metric struct {
}

type cachedset struct {
name string
set map[int64]bool
tags map[string]string
name string
fields map[string]map[int64]bool
tags map[string]string
}

type cachedgauge struct {
name string
value float64
tags map[string]string
name string
fields map[string]interface{}
tags map[string]string
}

type cachedcounter struct {
name string
value int64
tags map[string]string
name string
fields map[string]interface{}
tags map[string]string
}

type cachedtimings struct {
Expand Down Expand Up @@ -160,6 +164,7 @@ func (_ *Statsd) SampleConfig() string {
func (s *Statsd) Gather(acc telegraf.Accumulator) error {
s.Lock()
defer s.Unlock()
now := time.Now()

for _, metric := range s.timings {
fields := make(map[string]interface{})
Expand All @@ -172,28 +177,32 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
name := fmt.Sprintf("%v_percentile", percentile)
fields[name] = metric.stats.Percentile(percentile)
}
acc.AddFields(metric.name, fields, metric.tags)
acc.AddFields(metric.name, fields, metric.tags, now)
}
if s.DeleteTimings {
s.timings = make(map[string]cachedtimings)
}

for _, metric := range s.gauges {
acc.Add(metric.name, metric.value, metric.tags)
acc.AddFields(metric.name, metric.fields, metric.tags, now)
}
if s.DeleteGauges {
s.gauges = make(map[string]cachedgauge)
}

for _, metric := range s.counters {
acc.Add(metric.name, metric.value, metric.tags)
acc.AddFields(metric.name, metric.fields, metric.tags, now)
}
if s.DeleteCounters {
s.counters = make(map[string]cachedcounter)
}

for _, metric := range s.sets {
acc.Add(metric.name, int64(len(metric.set)), metric.tags)
fields := make(map[string]interface{})
for field, set := range metric.fields {
fields[field] = int64(len(set))
}
acc.AddFields(metric.name, fields, metric.tags, now)
}
if s.DeleteSets {
s.sets = make(map[string]cachedset)
Expand Down Expand Up @@ -358,7 +367,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
}

// Parse the name & tags from bucket
m.name, m.tags = s.parseName(m.bucket)
m.name, m.field, m.tags = s.parseName(m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
Expand Down Expand Up @@ -389,8 +398,8 @@ func (s *Statsd) parseStatsdLine(line string) error {
// parseName parses the given bucket name with the list of bucket maps in the
// config file. If there is a match, it will parse the name of the metric and
// map of tags.
// Return values are (<name>, <tags>)
func (s *Statsd) parseName(bucket string) (string, map[string]string) {
// Return values are (<name>, <field>, <tags>)
func (s *Statsd) parseName(bucket string) (string, string, map[string]string) {
tags := make(map[string]string)

bucketparts := strings.Split(bucket, ",")
Expand All @@ -410,17 +419,21 @@ func (s *Statsd) parseName(bucket string) (string, map[string]string) {
DefaultTags: tags,
}

var field string
name := bucketparts[0]
p, err := graphite.NewParserWithOptions(o)
if err == nil {
name, tags, _, _ = p.ApplyTemplate(name)
name, tags, field, _ = p.ApplyTemplate(name)
}
if s.ConvertNames {
name = strings.Replace(name, ".", "_", -1)
name = strings.Replace(name, "-", "__", -1)
}
if field == "" {
field = "value"
}

return name, tags
return name, field, tags
}

// Parse the key,value out of a string that looks like "key=value"
Expand Down Expand Up @@ -466,46 +479,59 @@ func (s *Statsd) aggregate(m metric) {
s.timings[m.hash] = cached
}
case "c":
cached, ok := s.counters[m.hash]
// check if the measurement exists
_, ok := s.counters[m.hash]
if !ok {
s.counters[m.hash] = cachedcounter{
name: m.name,
value: m.intvalue,
tags: m.tags,
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
}
} else {
cached.value += m.intvalue
s.counters[m.hash] = cached
}
// check if the field exists
_, ok = s.counters[m.hash].fields[m.field]
if !ok {
s.counters[m.hash].fields[m.field] = int64(0)
}
s.counters[m.hash].fields[m.field] =
s.counters[m.hash].fields[m.field].(int64) + m.intvalue
case "g":
cached, ok := s.gauges[m.hash]
// check if the measurement exists
_, ok := s.gauges[m.hash]
if !ok {
s.gauges[m.hash] = cachedgauge{
name: m.name,
value: m.floatvalue,
tags: m.tags,
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
}
}
// check if the field exists
_, ok = s.gauges[m.hash].fields[m.field]
if !ok {
s.gauges[m.hash].fields[m.field] = float64(0)
}
if m.additive {
s.gauges[m.hash].fields[m.field] =
s.gauges[m.hash].fields[m.field].(float64) + m.floatvalue
} else {
if m.additive {
cached.value = cached.value + m.floatvalue
} else {
cached.value = m.floatvalue
}
s.gauges[m.hash] = cached
s.gauges[m.hash].fields[m.field] = m.floatvalue
}
case "s":
cached, ok := s.sets[m.hash]
// check if the measurement exists
_, ok := s.sets[m.hash]
if !ok {
// Completely new metric (initialize with count of 1)
s.sets[m.hash] = cachedset{
name: m.name,
tags: m.tags,
set: map[int64]bool{m.intvalue: true},
name: m.name,
fields: make(map[string]map[int64]bool),
tags: m.tags,
}
} else {
cached.set[m.intvalue] = true
s.sets[m.hash] = cached
}
// check if the field exists
_, ok = s.sets[m.hash].fields[m.field]
if !ok {
s.sets[m.hash].fields[m.field] = make(map[int64]bool)
}
s.sets[m.hash].fields[m.field][m.intvalue] = true
}
}

Expand Down
Loading

0 comments on commit b70900a

Please sign in to comment.