Skip to content

Commit

Permalink
Implement variable length handling
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Jun 17, 2024
1 parent 54740f6 commit b1cb469
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 60 deletions.
3 changes: 2 additions & 1 deletion plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ to use them.
data_format = "influx"

## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
## _ denotes an ignored entry in the topic path,
## # denotes a variable length path element (can only be used once per setting)
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""
Expand Down
34 changes: 8 additions & 26 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,14 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
metric.AddTag(m.topicTagParse, msg.Topic())
}
for _, p := range m.topicParsers {
m.parseTopic(p, msg, metric)
if err := p.Parse(metric, msg.Topic()); err != nil {
if m.PersistentSession {
msg.Ack()
}
m.acc.AddError(err)
<-m.sem
return
}
}
}
m.messagesMutex.Lock()
Expand Down Expand Up @@ -326,31 +333,6 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
return opts, nil
}

func (m *MQTTConsumer) parseTopic(p *TopicParser, msg mqtt.Message, metric telegraf.Metric) {
measurement, tags, fields, err := p.Parse(msg.Topic())
if err != nil {
if errors.Is(err, ErrNoMatch) {
return
}

if m.PersistentSession {
msg.Ack()
}
m.acc.AddError(err)
<-m.sem
return
}
if measurement != "" {
metric.SetName(measurement)
}
for k, v := range tags {
metric.AddTag(k, v)
}
for k, v := range fields {
metric.AddField(k, v)
}
}

func New(factory ClientFactory) *MQTTConsumer {
return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"},
Expand Down
63 changes: 63 additions & 0 deletions plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,69 @@ func TestTopicTag(t *testing.T) {
),
},
},
{
name: "topic parsing with variable length",
topic: "/telegraf/123/foo/test/hello",
topicTag: func() *string {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
{
Topic: "/telegraf/#/test/hello",
Measurement: "/#/measurement/_",
Tags: "/testTag/#/moreTag/_/_",
Fields: "/_/testNumber/#/testString",
FieldTypes: map[string]string{
"testNumber": "int",
},
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{
"testTag": "telegraf",
"moreTag": "foo",
},
map[string]interface{}{
"testNumber": 123,
"testString": "hello",
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
{
name: "topic parsing with variable length too short",
topic: "/telegraf/123",
topicTag: func() *string {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
{
Topic: "/telegraf/#",
Measurement: "/#/measurement/_",
Tags: "/testTag/#/moreTag/_/_",
Fields: "/_/testNumber/#/testString",
FieldTypes: map[string]string{
"testNumber": "int",
},
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion plugins/inputs/mqtt_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@
data_format = "influx"

## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
## _ denotes an ignored entry in the topic path,
## # denotes a variable length path element (can only be used once per setting)
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""
Expand Down
151 changes: 119 additions & 32 deletions plugins/inputs/mqtt_consumer/topic_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"fmt"
"strconv"
"strings"
)

var ErrNoMatch = errors.New("not matching")
"github.com/influxdata/telegraf"
)

type TopicParsingConfig struct {
Topic string `toml:"topic"`
Expand All @@ -18,7 +18,9 @@ type TopicParsingConfig struct {
}

type TopicParser struct {
topic []string
topicIndices map[string]int
topicVarLength bool
topicMinLength int

extractMeasurement bool
measurementIndex int
Expand All @@ -29,88 +31,173 @@ type TopicParser struct {

func (cfg *TopicParsingConfig) NewParser() (*TopicParser, error) {
p := &TopicParser{
extractMeasurement: cfg.Measurement != "",
fieldTypes: cfg.FieldTypes,
topic: strings.Split(cfg.Topic, "/"),
fieldTypes: cfg.FieldTypes,
}

// Build a check list for topic elements
var topicMinLength int
var topicInvert bool
topicParts := strings.Split(cfg.Topic, "/")
p.topicIndices = make(map[string]int, len(topicParts))
for i, k := range topicParts {
switch k {
case "+":
topicMinLength++
case "#":
if p.topicVarLength {
return nil, errors.New("topic can only contain one hash")
}
p.topicVarLength = true
topicInvert = true
default:
if !topicInvert {
p.topicIndices[k] = i
} else {
p.topicIndices[k] = i - len(topicParts)
}
topicMinLength++
}
}

// Determine metric name selection
var measurementMinLength int
var measurementInvert bool
measurementParts := strings.Split(cfg.Measurement, "/")
for i, k := range measurementParts {
if k != "_" && k != "" {
if k == "_" || k == "" {
measurementMinLength++
continue
}

if k == "#" {
measurementInvert = true
continue
}

if p.extractMeasurement {
return nil, errors.New("measurement can only contain one element")
}

if !measurementInvert {
p.measurementIndex = i
break
} else {
p.measurementIndex = i - len(measurementParts)
}
p.extractMeasurement = true
measurementMinLength++
}

// Determine tag selections
var tagMinLength int
var tagInvert bool
tagParts := strings.Split(cfg.Tags, "/")
p.tagIndices = make(map[string]int, len(tagParts))
for i, k := range tagParts {
if k != "_" && k != "" {
if k == "_" || k == "" {
tagMinLength++
continue
}
if k == "#" {
tagInvert = true
continue
}
if !tagInvert {
p.tagIndices[k] = i
} else {
p.tagIndices[k] = i - len(tagParts)
}
tagMinLength++
}

// Determine tag selections
var fieldMinLength int
var fieldInvert bool
fieldParts := strings.Split(cfg.Fields, "/")
p.fieldIndices = make(map[string]int, len(fieldParts))
for i, k := range fieldParts {
if k != "_" && k != "" {
if k == "_" || k == "" {
fieldMinLength++
continue
}
if k == "#" {
fieldInvert = true
continue
}
if !fieldInvert {
p.fieldIndices[k] = i
} else {
p.fieldIndices[k] = i - len(fieldParts)
}
fieldMinLength++
}

if len(measurementParts) != len(p.topic) && len(measurementParts) != 1 {
return nil, errors.New("measurement length does not equal topic length")
}
if !p.topicVarLength {
if measurementMinLength != topicMinLength && p.extractMeasurement {
return nil, errors.New("measurement length does not equal topic length")
}

if len(fieldParts) != len(p.topic) && cfg.Fields != "" {
return nil, errors.New("fields length does not equal topic length")
}
if fieldMinLength != topicMinLength && cfg.Fields != "" {
return nil, errors.New("fields length does not equal topic length")
}

if len(tagParts) != len(p.topic) && cfg.Tags != "" {
return nil, errors.New("tags length does not equal topic length")
if tagMinLength != topicMinLength && cfg.Tags != "" {
return nil, errors.New("tags length does not equal topic length")
}
}

p.topicMinLength = max(topicMinLength, measurementMinLength, tagMinLength, fieldMinLength)

return p, nil
}

func (p *TopicParser) Parse(topic string) (string, map[string]string, map[string]interface{}, error) {
func (p *TopicParser) Parse(metric telegraf.Metric, topic string) error {
// Split the actual topic into its elements and check for a match
topicParts := strings.Split(topic, "/")
if len(p.topic) != len(topicParts) {
return "", nil, nil, ErrNoMatch
if p.topicVarLength && len(topicParts) < p.topicMinLength || !p.topicVarLength && len(topicParts) != p.topicMinLength {
return nil
}
for i, expected := range p.topic {
if topicParts[i] != expected && expected != "+" {
return "", nil, nil, ErrNoMatch
for expected, i := range p.topicIndices {
if i >= 0 && topicParts[i] != expected || i < 0 && topicParts[len(topicParts)+i] != expected {
return nil
}
}

// Extract the measurement name
var measurement string
if p.extractMeasurement {
measurement = topicParts[p.measurementIndex]
if p.measurementIndex >= 0 {
measurement = topicParts[p.measurementIndex]
} else {
measurement = topicParts[len(topicParts)+p.measurementIndex]
}
metric.SetName(measurement)
}

// Extract the tags
tags := make(map[string]string, len(p.tagIndices))
for k, i := range p.tagIndices {
tags[k] = topicParts[i]
if i >= 0 {
metric.AddTag(k, topicParts[i])
} else {
metric.AddTag(k, topicParts[len(topicParts)+i])
}
}

// Extract the fields
fields := make(map[string]interface{}, len(p.fieldIndices))
for k, i := range p.fieldIndices {
v, err := p.convertToFieldType(topicParts[i], k)
var raw string
if i >= 0 {
raw = topicParts[i]
} else {
raw = topicParts[len(topicParts)+i]
}
v, err := p.convertToFieldType(raw, k)
if err != nil {
return "", nil, nil, err
return err
}
fields[k] = v
metric.AddField(k, v)
}

return measurement, tags, fields, nil
return nil
}

func (p *TopicParser) convertToFieldType(value string, key string) (interface{}, error) {
Expand Down

0 comments on commit b1cb469

Please sign in to comment.