Skip to content

Commit

Permalink
Improve prometheus plugin
Browse files Browse the repository at this point in the history
closes #707
  • Loading branch information
titilambert authored and sparrc committed Mar 17, 2016
1 parent 57f7582 commit bac1c22
Show file tree
Hide file tree
Showing 7 changed files with 450 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,6 +1,7 @@
## v0.11.2 [unreleased]

### Features
- [#707](https://github.com/influxdata/telegraf/pull/707): Improved prometheus plugin. Thanks @titilambert!

### Bugfixes

Expand Down
75 changes: 75 additions & 0 deletions plugins/inputs/prometheus/README.md
@@ -0,0 +1,75 @@
# Prometheus Input Plugin

The prometheus input plugin gathers metrics from any webpage
exposing metrics with Prometheus format

### Configuration:

Example for Kubernetes apiserver
```toml
# Get all metrics from Kube-apiserver
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["http://my-kube-apiserver:8080/metrics"]
```

You can use more complex configuration
to filter and some tags

```toml
# Get all metrics from Kube-apiserver
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["http://my-kube-apiserver:8080/metrics"]
# Get only metrics with "apiserver_" string is in metric name
namepass = ["apiserver_"]
# Add a metric name prefix
name_prefix = "k8s_"
# Add tags to be able to make beautiful dashboards
[inputs.prometheus.tags]
kubeservice = "kube-apiserver"
```

### Measurements & Fields & Tags:

Measurements and fields could be any thing.
It just depends of what you're quering.

Example:

```
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.00010425500000000001
go_gc_duration_seconds{quantile="0.25"} 0.000139108
go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002
go_gc_duration_seconds{quantile="0.75"} 0.000331463
go_gc_duration_seconds{quantile="1"} 0.000667154
go_gc_duration_seconds_sum 0.0018183950000000002
go_gc_duration_seconds_count 7
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 15
```

- go_goroutines,
- gauge (integer, unit)
- go_gc_duration_seconds
- field3 (integer, bytes)

- All measurements have the following tags:
- url=http://my-kube-apiserver:8080/metrics
- go_goroutines has the following tags:
- kubeservice=kube-apiserver
- go_gc_duration_seconds has the following tags:
- kubeservice=kube-apiserver

### Example Output:

Example of output with configuration given above:

```
$ ./telegraf -config telegraf.conf -test
k8s_go_goroutines,kubeservice=kube-apiserver,url=http://my-kube-apiserver:8080/metrics gauge=536 1456857329391929813
k8s_go_gc_duration_seconds,kubeservice=kube-apiserver,url=http://my-kube-apiserver:8080/metrics 0=0.038002142,0.25=0.041732467,0.5=0.04336492,0.75=0.047271799,1=0.058295811,count=0,sum=208.334617406 1456857329391929813
```
171 changes: 171 additions & 0 deletions plugins/inputs/prometheus/parser.go
@@ -0,0 +1,171 @@
package prometheus

// Parser inspired from
// https://github.com/prometheus/prom2json/blob/master/main.go

import (
"bufio"
"bytes"
"fmt"
"io"
"math"
"mime"

"github.com/influxdata/telegraf"

"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)

// PrometheusParser is an object for Parsing incoming metrics.
type PrometheusParser struct {
// PromFormat
PromFormat map[string]string
// DefaultTags will be added to every parsed metric
// DefaultTags map[string]string
}

// Parse returns a slice of Metrics from a text representation of a
// metrics
func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)

// Get format
mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"])
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
metricFamily := &dto.MetricFamily{}
if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err)
}
metricFamilies[metricFamily.GetName()] = metricFamily
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
/*
for key, value := range p.DefaultTags {
tags[key] = value
}
*/
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// historgram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())

} else {
// standard metric
fields = getNameAndValue(m)
}
// converting to telegraf metric
if len(fields) > 0 {
metric, err := telegraf.NewMetric(metricName, tags, fields)
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
}
return metrics, err
}

// Parse one line
func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))

if err != nil {
return nil, err
}

if len(metrics) < 1 {
return nil, fmt.Errorf(
"Can not parse the line: %s, for data format: prometheus", line)
}

return metrics[0], nil
}

/*
// Set default tags
func (p *PrometheusParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
*/

// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, q := range m.GetSummary().Quantile {
if !math.IsNaN(q.GetValue()) {
fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
}
}
return fields
}

// Get Buckets from histogram metric
func makeBuckets(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, b := range m.GetHistogram().Bucket {
fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
}
return fields
}

// Get labels from metric
func makeLabels(m *dto.Metric) map[string]string {
result := map[string]string{}
for _, lp := range m.Label {
result[lp.GetName()] = lp.GetValue()
}
return result
}

// Get name and value from metric
func getNameAndValue(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["gauge"] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["counter"] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["value"] = float64(m.GetUntyped().GetValue())
}
}
return fields
}

0 comments on commit bac1c22

Please sign in to comment.