Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add influxdb plugin #449

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/exec"
_ "github.com/influxdb/telegraf/plugins/haproxy"
_ "github.com/influxdb/telegraf/plugins/httpjson"
_ "github.com/influxdb/telegraf/plugins/influxdb"
_ "github.com/influxdb/telegraf/plugins/jolokia"
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/leofs"
Expand Down
73 changes: 73 additions & 0 deletions plugins/influxdb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# influxdb plugin

The influxdb plugin collects InfluxDB-formatted data from JSON endpoints.

With a configuration of:

```toml
[[plugins.influxdb]]
name = "produce"
urls = [
"http://127.0.0.1:8086/debug/vars",
"http://192.168.2.1:8086/debug/vars"
]
```

And if 127.0.0.1 responds with this JSON:

```json
{
"k1": {
"name": "fruit",
"tags": {
"kind": "apple"
},
"values": {
"inventory": 371,
"sold": 112
}
},
"k2": {
"name": "fruit",
"tags": {
"kind": "banana"
},
"values": {
"inventory": 1000,
"sold": 403
}
}
}
```

And if 192.168.2.1 responds like so:

```json
{
"k3": {
"name": "transactions",
"tags": {},
"values": {
"total": 100,
"balance": 184.75
}
}
}
```

Then the collected metrics will be:

```
influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='apple' inventory=371.0,sold=112.0
influxdb_produce_fruit,url='http://127.0.0.1:8086/debug/vars',kind='banana' inventory=1000.0,sold=403.0

influxdb_produce_transactions,url='http://192.168.2.1:8086/debug/vars' total=100.0,balance=184.75
```

There are two important details to note about the collected metrics:

1. Even though the values in JSON are being displayed as integers, the metrics are reported as floats.
JSON encoders usually don't print the fractional part for round floats.
Because you cannot change the type of an existing field in InfluxDB, we assume all numbers are floats.

2. The top-level keys' names (in the example above, `"k1"`, `"k2"`, and `"k3"`) are not considered when recording the metrics.
150 changes: 150 additions & 0 deletions plugins/influxdb/influxdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package influxdb

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"

"github.com/influxdb/telegraf/plugins"
)

type InfluxDB struct {
Name string
URLs []string `toml:"urls"`
}

func (*InfluxDB) Description() string {
return "Read InfluxDB-formatted JSON metrics from one or more HTTP endpoints"
}

func (*InfluxDB) SampleConfig() string {
return `
# Reads InfluxDB-formatted JSON from given URLs.
# Works with InfluxDB debug endpoints out of the box, but other services can use this format too.
# See the influxdb plugin's README for more details.
[[plugins.influxdb]]
# Name to use for measurement
name = "influxdb"

# Multiple URLs from which to read InfluxDB-formatted JSON
urls = [
"http://localhost:8086/debug/vars"
]
`
}

func (i *InfluxDB) Gather(acc plugins.Accumulator) error {
errorChannel := make(chan error, len(i.URLs))

var wg sync.WaitGroup
for _, u := range i.URLs {
wg.Add(1)
go func(url string) {
defer wg.Done()
if err := i.gatherURL(acc, url); err != nil {
errorChannel <- fmt.Errorf("[name=%s][url=%s]: %s", i.Name, url, err)
}
}(u)
}

wg.Wait()
close(errorChannel)

// If there weren't any errors, we can return nil now.
if len(errorChannel) == 0 {
return nil
}

// There were errors, so join them all together as one big error.
errorStrings := make([]string, 0, len(errorChannel))
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
}

return errors.New(strings.Join(errorStrings, "\n"))
}

type point struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}

// Gathers data from a particular URL
// Parameters:
// acc : The telegraf Accumulator to use
// url : endpoint to send request to
//
// Returns:
// error: Any error that may have occurred
func (i *InfluxDB) gatherURL(
acc plugins.Accumulator,
url string,
) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()

// It would be nice to be able to decode into a map[string]point, but
// we'll get a decoder error like:
// `json: cannot unmarshal array into Go value of type influxdb.point`
// if any of the values aren't objects.
// To avoid that error, we decode by hand.
dec := json.NewDecoder(resp.Body)

// Parse beginning of object
if t, err := dec.Token(); err != nil {
return err
} else if t != json.Delim('{') {
return errors.New("document root must be a JSON object")
}

// Loop through rest of object
for {
// Nothing left in this object, we're done
if !dec.More() {
break
}

// Read in a string key. We don't do anything with the top-level keys, so it's discarded.
_, err := dec.Token()
if err != nil {
return err
}

// Attempt to parse a whole object into a point.
// It might be a non-object, like a string or array.
// If we fail to decode it into a point, ignore it and move on.
var p point
if err := dec.Decode(&p); err != nil {
continue
}

// If the object was a point, but was not fully initialized, ignore it and move on.
if p.Name == "" || p.Tags == nil || p.Values == nil || len(p.Values) == 0 {
continue
}

// Add a tag to indicate the source of the data.
p.Tags["url"] = url

acc.AddFields(
i.Name+"_"+p.Name,
p.Values,
p.Tags,
)
}

return nil
}

func init() {
plugins.Add("influxdb", func() plugins.Plugin {
return &InfluxDB{}
})
}
101 changes: 101 additions & 0 deletions plugins/influxdb/influxdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package influxdb_test

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/influxdb/telegraf/plugins/influxdb"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestBasic(t *testing.T) {
js := `
{
"_1": {
"name": "foo",
"tags": {
"id": "ex1"
},
"values": {
"i": -1,
"f": 0.5,
"b": true,
"s": "string"
}
},
"ignored": {
"willBeRecorded": false
},
"ignoredAndNested": {
"hash": {
"is": "nested"
}
},
"array": [
"makes parsing more difficult than necessary"
],
"string": "makes parsing more difficult than necessary",
"_2": {
"name": "bar",
"tags": {
"id": "ex2"
},
"values": {
"x": "x"
}
},
"pointWithoutFields_willNotBeIncluded": {
"name": "asdf",
"tags": {
"id": "ex3"
},
"values": {}
}
}
`
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
_, _ = w.Write([]byte(js))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeServer.Close()

plugin := &influxdb.InfluxDB{
Name: "test",
URLs: []string{fakeServer.URL + "/endpoint"},
}

var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))

require.Len(t, acc.Points, 2)
require.NoError(t, acc.ValidateTaggedFieldsValue(
"test_foo",
map[string]interface{}{
// JSON will truncate floats to integer representations.
// Since there's no distinction in JSON, we can't assume it's an int.
"i": -1.0,
"f": 0.5,
"b": true,
"s": "string",
},
map[string]string{
"id": "ex1",
"url": fakeServer.URL + "/endpoint",
},
))
require.NoError(t, acc.ValidateTaggedFieldsValue(
"test_bar",
map[string]interface{}{
"x": "x",
},
map[string]string{
"id": "ex2",
"url": fakeServer.URL + "/endpoint",
},
))
}