Skip to content

Commit

Permalink
Influx: populate selected message fields as tags
Browse files Browse the repository at this point in the history
  • Loading branch information
grandcat committed Oct 16, 2018
1 parent 2a6911a commit b4d9da8
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 21 deletions.
10 changes: 0 additions & 10 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,6 @@ func LoadConf(path string) (MqttConf, InfluxDBConf, error) {
return MqttConf{}, InfluxDBConf{}, err
}

em := MqttConf{}
ei := InfluxDBConf{}

if cfg.Mqtt == em {
log.Fatal(`empty "mqforward-mqtt" configuration`)
}
if cfg.InfluxDB == ei {
log.Fatal(`empty "mqforward-influxdb" configuration`)
}

if cfg.General.Debug {
log.SetLevel(log.DebugLevel)
}
Expand Down
34 changes: 23 additions & 11 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ const (
)

type InfluxDBConf struct {
Hostname string
Port int
Db string
UserName string
Password string
Tick int
UDP bool
Debug string
Hostname string
Port int
Db string
UserName string
Password string
Tick int
UDP bool
Debug string
TagsAttributes []string
}

type InfluxDBClient struct {
Expand All @@ -50,7 +51,7 @@ func NewInfluxDBClient(conf InfluxDBConf, ifChan chan Message, commandChan chan
}
// Make client
con, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: host,
Addr: host,
Username: conf.UserName,
Password: conf.Password,
})
Expand Down Expand Up @@ -166,7 +167,6 @@ func (ifc *InfluxDBClient) Msg2Series(msgs []Message) influxdb.BatchPoints {
return nil
}


for _, msg := range msgs {
if msg.Topic == "" && len(msg.Payload) == 0 {
break
Expand All @@ -178,10 +178,22 @@ func (ifc *InfluxDBClient) Msg2Series(msgs []Message) influxdb.BatchPoints {
}

name := strings.Replace(msg.Topic, "/", ".", -1)

// Store default tag attributes
tags := map[string]string{
"topic": msg.Topic,
}
pt, err := influxdb.NewPoint(name, tags, j, now);
// Transform user-defined JSON fields to tags
for _, tag := range ifc.Config.TagsAttributes {
if v, ok := j[tag]; ok {
if tagVal, ok := v.(string); ok {
tags[tag] = tagVal
delete(j, tag)
}
}
}

pt, err := influxdb.NewPoint(name, tags, j, now)
if err != nil {
log.Warn(err)
continue
Expand Down
1 change: 1 addition & 0 deletions mqforward.ini.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ port = 8086
db = test
username = test
password = password
tagsAttributes = key1,key2

0 comments on commit b4d9da8

Please sign in to comment.