Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow influxdb_listener to keep the database name from the query string if supplied #6257

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions plugins/inputs/influxdb_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ submits data to InfluxDB determines the destination database.
tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"

## If the write has a database on it then it should be kept
## for metrics further on. The database will be added as a tag.
## This tag can be used in downstream outputs.
keep_database = true
morfien101 marked this conversation as resolved.
Show resolved Hide resolved

## Optional tag name used to store the database if you want to change it to something custom.
## If not set it will be "database"
## Only used if keep_database is set to true.
# database_tag = database
morfien101 marked this conversation as resolved.
Show resolved Hide resolved

## Optional username and password to accept for HTTP basic authentication.
## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar"
Expand Down
53 changes: 41 additions & 12 deletions plugins/inputs/influxdb_listener/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,28 @@ const (
// a single InfluxDB point.
// 64 KB
DEFAULT_MAX_LINE_SIZE = 64 * 1024

// DefaultDatabaseTag is the name of the tag that will be used to carry
// the database collected from the query string
DefaultDatabaseTag = "database"
)

type TimeFunc func() time.Time

type HTTPListener struct {
ServiceAddress string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize internal.Size
MaxLineSize internal.Size
Port int

ServiceAddress string `toml:"service_address"`
// Port gets pulled out of ServiceAddress
Port int
tlsint.ServerConfig

BasicUsername string
BasicPassword string
ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
KeepDatabase bool `toml:"keep_database"`
DatabaseTag string `toml:"database_tag"`

TimeFunc

Expand Down Expand Up @@ -93,6 +99,16 @@ const sampleConfig = `
## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = "64KiB"

## If the write has a database on it then it should be kept
## for metrics further on. The database will be added as a tag.
## This tag can be used in downstream outputs.
keep_database = true

## Optional tag name used to store the database if you want to change it to something custom.
## If not set it will be "database"
## Only used if keep_database is set to true.
# database_tag = database

## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
Expand Down Expand Up @@ -258,6 +274,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
now := h.TimeFunc()

precision := req.URL.Query().Get("precision")
db := req.URL.Query().Get("db")

// Handle gzip request bodies
body := req.Body
Expand Down Expand Up @@ -315,7 +332,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {

if err == io.ErrUnexpectedEOF {
// finished reading the request body
err = h.parse(buf[:n+bufStart], now, precision)
err = h.parse(buf[:n+bufStart], now, precision, db)
if err != nil {
log.Println("D! "+err.Error(), bufStart+n)
return400 = true
Expand Down Expand Up @@ -346,7 +363,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
bufStart = 0
continue
}
if err := h.parse(buf[:i+1], now, precision); err != nil {
if err := h.parse(buf[:i+1], now, precision, db); err != nil {
log.Println("D! " + err.Error())
return400 = true
}
Expand All @@ -359,7 +376,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
}

func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -371,6 +388,16 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
}

for _, m := range metrics {
// Do we need to keep the database name in the query string
if h.KeepDatabase {
// Did we get a database argument. If we didn't get it. We can't set it.
if db != "" {
// Is there already a database set. If not use the database in the query string.
if _, ok := m.Tags()[h.DatabaseTag]; !ok {
morfien101 marked this conversation as resolved.
Show resolved Hide resolved
morfien101 marked this conversation as resolved.
Show resolved Hide resolved
m.AddTag(h.DatabaseTag, db)
}
}
}
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}

Expand Down Expand Up @@ -436,12 +463,14 @@ func init() {
return &HTTPListener{
ServiceAddress: ":8186",
TimeFunc: time.Now,
DatabaseTag: DefaultDatabaseTag,
}
})
inputs.Add("influxdb_listener", func() telegraf.Input {
return &HTTPListener{
ServiceAddress: ":8186",
TimeFunc: time.Now,
DatabaseTag: DefaultDatabaseTag,
}
})
}
20 changes: 5 additions & 15 deletions plugins/inputs/influxdb_listener/http_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func newTestHTTPListener() *HTTPListener {
listener := &HTTPListener{
ServiceAddress: "localhost:0",
TimeFunc: time.Now,
DatabaseTag: DefaultDatabaseTag,
}
return listener
}
Expand Down Expand Up @@ -146,8 +147,9 @@ func TestWriteHTTPBasicAuth(t *testing.T) {
require.EqualValues(t, http.StatusNoContent, resp.StatusCode)
}

func TestWriteHTTP(t *testing.T) {
func TestWriteHTTPKeepDatabase(t *testing.T) {
listener := newTestHTTPListener()
listener.KeepDatabase = true

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
Expand All @@ -162,7 +164,7 @@ func TestWriteHTTP(t *testing.T) {
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
map[string]string{"host": "server01", "database": "mydb"},
)

// post multiple message to listener
Expand All @@ -177,21 +179,9 @@ func TestWriteHTTP(t *testing.T) {
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
map[string]string{"host": hostTag, "database": "mydb"},
)
}

// Post a gigantic metric to the listener and verify that an error is returned:
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 400, resp.StatusCode)

acc.Wait(3)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
}

// http listener should add a newline at the end of the buffer if it's not there
Expand Down