Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Migrate wavefront parser to new style #11374

Merged
merged 1 commit into from Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/parsers/all/all.go
Expand Up @@ -5,5 +5,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/csv"
_ "github.com/influxdata/telegraf/plugins/parsers/json"
_ "github.com/influxdata/telegraf/plugins/parsers/json_v2"
_ "github.com/influxdata/telegraf/plugins/parsers/wavefront"
_ "github.com/influxdata/telegraf/plugins/parsers/xpath"
)
7 changes: 0 additions & 7 deletions plugins/parsers/registry.go
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2"
"github.com/influxdata/telegraf/plugins/parsers/temporary/xpath"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
)

// Creator is the function to create a new parser
Expand Down Expand Up @@ -232,8 +231,6 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags,
config.Separator,
config.Templates)
case "wavefront":
parser, err = NewWavefrontParser(config.DefaultTags)
case "grok":
parser, err = newGrokParser(
config.MetricName,
Expand Down Expand Up @@ -365,10 +362,6 @@ func NewLogFmtParser(metricName string, defaultTags map[string]string, tagKeys [
return parser, err
}

func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
return wavefront.NewWavefrontParser(defaultTags), nil
}

func NewFormUrlencodedParser(
metricName string,
defaultTags map[string]string,
Expand Down
45 changes: 26 additions & 19 deletions plugins/parsers/wavefront/parser.go
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)

const MaxBufferSize = 2
Expand All @@ -22,10 +23,10 @@ type Point struct {
Tags map[string]string
}

type WavefrontParser struct {
type Parser struct {
parsers *sync.Pool
defaultTags map[string]string
Log telegraf.Logger `toml:"-"`
DefaultTags map[string]string `toml:"-"`
Log telegraf.Logger `toml:"-"`
}

// PointParser is a thread-unsafe parser and must be kept in a pool.
Expand All @@ -39,7 +40,7 @@ type PointParser struct {
scanBuf bytes.Buffer // buffer reused for scanning tokens
writeBuf bytes.Buffer // buffer reused for parsing elements
Elements []ElementParser
parent *WavefrontParser
parent *Parser
}

// NewWavefrontElements returns a slice of ElementParser's for the Graphite format
Expand All @@ -53,22 +54,17 @@ func NewWavefrontElements() []ElementParser {
return elements
}

func NewWavefrontParser(defaultTags map[string]string) *WavefrontParser {
wp := &WavefrontParser{defaultTags: defaultTags}
wp.parsers = &sync.Pool{
func (p *Parser) Init() error {
p.parsers = &sync.Pool{
New: func() interface{} {
return NewPointParser(wp)
elements := NewWavefrontElements()
return &PointParser{Elements: elements, parent: p}
},
}
return wp
return nil
}

func NewPointParser(parent *WavefrontParser) *PointParser {
elements := NewWavefrontElements()
return &PointParser{Elements: elements, parent: parent}
}

func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) {
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
buf := []byte(line)

metrics, err := p.Parse(buf)
Expand All @@ -83,7 +79,7 @@ func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) {
return nil, nil
}

func (p *WavefrontParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
pp := p.parsers.Get().(*PointParser)
defer p.parsers.Put(pp)
return pp.Parse(buf)
Expand Down Expand Up @@ -127,8 +123,8 @@ func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

func (p *WavefrontParser) SetDefaultTags(tags map[string]string) {
p.defaultTags = tags
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.Metric, error) {
Expand All @@ -140,7 +136,7 @@ func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.M
tags[k] = v
}
// apply default tags after parsed tags
for k, v := range p.parent.defaultTags {
for k, v := range p.parent.DefaultTags {
tags[k] = v
}

Expand Down Expand Up @@ -218,3 +214,14 @@ func (p *PointParser) reset(buf []byte) {
}
p.buf.n = 0
}

func (p *Parser) InitFromConfig(_ *parsers.Config) error {
return p.Init()
}

func init() {
parsers.Add("wavefront",
func(_ string) telegraf.Parser {
return &Parser{}
})
}
19 changes: 13 additions & 6 deletions plugins/parsers/wavefront/parser_test.go
Expand Up @@ -11,7 +11,8 @@ import (
)

func TestParse(t *testing.T) {
parser := NewWavefrontParser(nil)
parser := &Parser{}
require.NoError(t, parser.Init())

parsedMetrics, err := parser.Parse([]byte("test.metric 1"))
require.NoError(t, err)
Expand Down Expand Up @@ -78,7 +79,8 @@ func TestParse(t *testing.T) {
}

func TestParseLine(t *testing.T) {
parser := NewWavefrontParser(nil)
parser := &Parser{}
require.NoError(t, parser.Init())

parsedMetric, err := parser.ParseLine("test.metric 1")
require.NoError(t, err)
Expand Down Expand Up @@ -113,7 +115,8 @@ func TestParseLine(t *testing.T) {
}

func TestParseMultiple(t *testing.T) {
parser := NewWavefrontParser(nil)
parser := &Parser{}
require.NoError(t, parser.Init())

parsedMetrics, err := parser.Parse([]byte("test.metric 1\ntest.metric2 2 1530939936"))
require.NoError(t, err)
Expand Down Expand Up @@ -148,7 +151,8 @@ func TestParseMultiple(t *testing.T) {
}

func TestParseSpecial(t *testing.T) {
parser := NewWavefrontParser(nil)
parser := &Parser{}
require.NoError(t, parser.Init())

parsedMetric, err := parser.ParseLine("\"test.metric\" 1 1530939936")
require.NoError(t, err)
Expand All @@ -162,7 +166,8 @@ func TestParseSpecial(t *testing.T) {
}

func TestParseInvalid(t *testing.T) {
parser := NewWavefrontParser(nil)
parser := &Parser{}
require.NoError(t, parser.Init())

_, err := parser.Parse([]byte("test.metric"))
require.Error(t, err)
Expand Down Expand Up @@ -193,7 +198,9 @@ func TestParseInvalid(t *testing.T) {
}

func TestParseDefaultTags(t *testing.T) {
parser := NewWavefrontParser(map[string]string{"myDefault": "value1", "another": "test2"})
parser := &Parser{}
require.NoError(t, parser.Init())
parser.SetDefaultTags(map[string]string{"myDefault": "value1", "another": "test2"})

parsedMetrics, err := parser.Parse([]byte("test.metric 1 1530939936"))
require.NoError(t, err)
Expand Down