-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed race condition in the Wavefront parser #5764
Fixed race condition in the Wavefront parser #5764
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, someday I want to make it so the parsers don't need to be threadsafe and can be stateful. I messed around with this on the tail plugin a bit, we needed the parser to know what file it was parsing in order to name metrics. Ended up with this interface as a quick fix:
type ParserFuncInput interface {
// GetParser returns a new parser.
SetParserFunc(fn ParserFunc)
}
I think it's not quite the interface we want long term, but what we really need is a way for plugins to get shared things from Telegraf (perhaps a logger or other objects besides just the parser). On inputs we could stick things into the accumulator, but for outputs/processors it doesn't really work.
plugins/parsers/wavefront/parser.go
Outdated
@@ -61,6 +99,8 @@ func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) { | |||
buf = append(buf, []byte("\n")...) | |||
} | |||
|
|||
//log.Printf("D! [parsers.wavefront] Received data: %s", string(buf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
plugins/parsers/wavefront/parser.go
Outdated
wp := &WavefrontParser{defaultTags: defaultTags} | ||
wp.parsers = &sync.Pool{ | ||
New: func() interface{} { | ||
log.Printf("D! [parsers.wavefront] Creating new parser for %s", wp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This is the ci test failure) I would just remove the for %s
part completely. Default tags never change unless Telegraf is reloaded and are shared on all inputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
(cherry picked from commit f32b064)
Required for all PRs:
The Wavefront parser had some statefulness to it making it panic when receiving heavy traffic from multiple sources. Solved it by moving stateful objects to a separate struct and using a sync.Pool to make sure they are reused but never shared across goroutines.