Skip to content
This repository has been archived by the owner on Oct 27, 2020. It is now read-only.

Commit

Permalink
Create InfluxDB if it doesn't exist
Browse files Browse the repository at this point in the history
  • Loading branch information
GRECO, FRANK committed Aug 16, 2017
1 parent 5099244 commit 6bff9ea
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- If InfluxDB database doesn't exist when writing, it will be created.

## [1.1.1] - 2017-08-15
### Changed
- `TrafficFactory` now implements the `Store` interface.
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (
FlagInfluxdbAddr = flag{
long: "influxdb-addr",
short: "i",
value: "monitoring-influxdb.kube-system.svc.cluster.local",
value: "http://monitoring-influxdb.kube-system.svc.cluster.local",
usage: "Influxdb address. Addr should be of the form 'http://host:port' or 'http://[ipv6-host%zone]:port'",
}
// FlagJaegerSamplerServerURL specifies the endpoint to the Jaeger sampler server
Expand Down
2 changes: 1 addition & 1 deletion helm/values.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

imageRegistry: northwesternmutual

dockerImageTag: v1.1.1
dockerImageTag: v1.1.2

pullPolicy: Always

Expand Down
68 changes: 40 additions & 28 deletions monitor/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"fmt"
"time"

"github.com/Sirupsen/logrus"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/northwesternmutual/kanali/config"
"github.com/northwesternmutual/kanali/metrics"
Expand All @@ -40,7 +39,6 @@ type InfluxController struct {
// NewInfluxdbController creates a new controller allowing
// connection to Influxdb
func NewInfluxdbController() (*InfluxController, error) {

influxClient, err := influx.NewHTTPClient(influx.HTTPConfig{
Addr: viper.GetString(config.FlagInfluxdbAddr.GetLong()),
Username: viper.GetString(config.FlagInfluxdbUsername.GetLong()),
Expand All @@ -49,57 +47,71 @@ func NewInfluxdbController() (*InfluxController, error) {
if err != nil {
return nil, err
}

// create db
q := influx.NewQuery(fmt.Sprintf("CREATE DATABASE %s", viper.GetString(config.FlagInfluxdbDatabase.GetLong())), "", "")
if response, err := influxClient.Query(q); err != nil || response.Error() != nil {
return nil, err
}

return &InfluxController{Client: influxClient}, nil

}

// WriteRequestData writes contextual request metrics to Influxdb
func (c *InfluxController) WriteRequestData(m *metrics.Metrics) (err error) {

defer func() {
if r := recover(); r != nil {
err = errors.New("influxdb paniced while attempting to writing - this probably means that Kanali was unable to establish a connection on startup")
err = errors.New("influxdb paniced while attempting to write")
}
}()

bp, err := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: viper.GetString(config.FlagInfluxdbDatabase.GetLong()),
Precision: "s",
Database: viper.GetString(config.FlagInfluxdbDatabase.GetLong()),
})
if err != nil {
return err
}
tags, err := getTags(m)
if err != nil {
return err
}
pt, err := influx.NewPoint("request_details", tags, getFields(m), time.Now())
if err != nil {
return err
}
bp.AddPoint(pt)
if err := c.Client.Write(bp); err == nil {
return nil
}
if err := createDatabase(c.Client); err != nil {
return err
}
return c.Client.Write(bp)
}

tags := make(map[string]string)
fields := make(map[string]interface{})
func createDatabase(c influx.Client) error {
q := influx.NewQuery(fmt.Sprintf("CREATE DATABASE %s", viper.GetString(config.FlagInfluxdbDatabase.GetLong())), "", "")
response, err := c.Query(q)
if err != nil {
return err
}
if response != nil && response.Error() != nil {
return response.Error()
}
return nil
}

func getTags(m *metrics.Metrics) (map[string]string, error) {
tags := make(map[string]string)
for _, metric := range *m {
fields[metric.Name] = metric.Value
if !metric.Index {
continue
}
tagValue, ok := metric.Value.(string)
if !ok {
logrus.Errorf("the metric %s is configured to be indexed. InfluxDB requires that indexed fields have a string value", metric.Name)
continue
return nil, fmt.Errorf("InfluxDB requires that the indexed field %s be of type string", metric.Name)
}
tags[metric.Name] = tagValue
}
return tags, nil
}

pt, err := influx.NewPoint("request_details", tags, fields, time.Now())
if err != nil {
return err
func getFields(m *metrics.Metrics) map[string]interface{} {
fields := make(map[string]interface{})
for _, metric := range *m {
fields[metric.Name] = metric.Value
}

bp.AddPoint(pt)

return c.Client.Write(bp)

return fields
}
104 changes: 104 additions & 0 deletions monitor/influxdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package monitor

import (
"errors"
"regexp"
"testing"
"time"

influx "github.com/influxdata/influxdb/client/v2"
"github.com/northwesternmutual/kanali/config"
"github.com/northwesternmutual/kanali/metrics"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)

type mockClient struct {
db string
}

func (c *mockClient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 123456789, "", nil
}

func (c *mockClient) Write(bp influx.BatchPoints) error {
if bp.Database() == "" || c.db != bp.Database() {
return errors.New("database does not exist")
}
return nil
}

func (c *mockClient) Query(q influx.Query) (*influx.Response, error) {
re := regexp.MustCompile("^CREATE DATABASE (.*)")
match := re.FindStringSubmatch(q.Command)
if len(match) != 2 {
return nil, errors.New("query incorrect")
}
if match[1] == "" {
return nil, errors.New("no database name")
}
c.db = match[1]
return nil, nil
}

func (c *mockClient) Close() error {
return nil
}

func TestWriteRequestData(t *testing.T) {
ctlr := &InfluxController{Client: &mockClient{}}
m := &metrics.Metrics{
metrics.Metric{"metric-one", "value-one", true},
metrics.Metric{"metric-two", "value-two", false},
}
assert.Equal(t, ctlr.WriteRequestData(m).Error(), "no database name")
viper.SetDefault(config.FlagInfluxdbDatabase.GetLong(), "mydb")
assert.Nil(t, ctlr.WriteRequestData(m))
assert.Nil(t, ctlr.WriteRequestData(m))
viper.SetDefault(config.FlagInfluxdbDatabase.GetLong(), "")
ctlr = &InfluxController{Client: nil}
assert.Equal(t, ctlr.WriteRequestData(m).Error(), "influxdb paniced while attempting to write")
}

func TestNewInfluxdbController(t *testing.T) {
_, err := NewInfluxdbController()
assert.NotNil(t, err)
viper.SetDefault(config.FlagInfluxdbAddr.GetLong(), "http://foo.bar.com")
_, err = NewInfluxdbController()
assert.Nil(t, err)
}

func TestCreateDatabase(t *testing.T) {
err := createDatabase(&mockClient{})
assert.Equal(t, err.Error(), "no database name")
viper.SetDefault(config.FlagInfluxdbDatabase.GetLong(), "mydb")
assert.Nil(t, createDatabase(&mockClient{}))
viper.SetDefault(config.FlagInfluxdbDatabase.GetLong(), "")
}

func TestGetFields(t *testing.T) {
assert.Equal(t, getFields(&metrics.Metrics{
metrics.Metric{"metric-one", "value-one", true},
metrics.Metric{"metric-two", "value-two", false},
}), map[string]interface{}{
"metric-one": "value-one",
"metric-two": "value-two",
})
}

func TestGetTags(t *testing.T) {
tags, err := getTags(&metrics.Metrics{
metrics.Metric{"metric-one", "value-one", true},
metrics.Metric{"metric-two", "value-two", false},
})
assert.Nil(t, err)
assert.Equal(t, tags, map[string]string{
"metric-one": "value-one",
})
tags, err = getTags(&metrics.Metrics{
metrics.Metric{"metric-one", 5, true},
metrics.Metric{"metric-two", "value-two", false},
})
assert.Nil(t, tags)
assert.Equal(t, err.Error(), "InfluxDB requires that the indexed field metric-one be of type string")
}

0 comments on commit 6bff9ea

Please sign in to comment.