Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change to NewStreamParser to accept larger inputs from scanner #8892

Merged
merged 2 commits into from
Apr 13, 2021

Conversation

tootedom
Copy link
Contributor

Description

Whilst creating an external processor plugin, I was utilising the shim to provide the reading from stdin and writing to stdout:

   // create the shim. This is what will run your plugins.
   s := shim.New()

   // Check for settings from a config toml file,
   err = s.LoadConfig(configFile)
   if err != nil {
      fmt.Fprintf(os.Stderr, "Err loading input: %s\n", err)
      os.Exit(1)
   }

   // run the plugin until stdin closes or we receive a termination signal
   if err := s.Run(shim.PollIntervalDisabled); err != nil {
      fmt.Fprintf(os.Stderr, "Err: %s\n", err)
      os.Exit(1)
   }

When running the processor plugin I was seeing that the processor was being terminated and restarted with the error message in telegraf's stderr:

write error: Broken pipe

However, when I moved the plugin away from using the shim for the processor's stdin and stdout management to that of what is in the example listed on https://github.com/influxdata/telegraf/tree/master/plugins/processors/execd which is using parser := influx.NewStreamParser(os.Stdin) in a for { loop; I did not encounter the error.


Looking at the code for the shim's processor it is using: scanner := bufio.NewScanner(s.stdin)
This has a default buffer of 64k. Outputting the Err() on the scanner reported bufio.scanner token too long. I was hitting the max default buffer size of 64k for the Scanner.

The reason for hitting this 64k limit is that I was taking the "string" (data_type:value) from a kinesis stream and sending it to the processor for manipulation. On occasions the data from kinesis was larger than the 64k; and the Scanner reported the token too long which propagated up to the processor process exiting and being restarted.


PR Change

Moved the processor's shim from the bufio.NewScanner to that of the influx.NewStreamParser(os.Stdin) from the example. Moving to that I had no issues with the processor exiting.

I'm unsure if there's a reason for using the bufio.NewScanner in the shim vs the using the influx StreamParser.

I added a test to cover this larger than the 64k buffer. The test hangs on the old code that uses the scanner (I didn't into the test code to look into the why for the hang on the old code).

Copy link
Contributor

@telegraf-tiger telegraf-tiger bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤝 ✅ CLA has been signed. Thank you!

@telegraf-tiger telegraf-tiger bot added the feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin label Feb 20, 2021
@ivorybilled ivorybilled merged commit f3229f5 into influxdata:master Apr 13, 2021
reimda pushed a commit that referenced this pull request Apr 28, 2021
* change to NewStreamParser to accept larger inputs from scanner

* fmt changes

(cherry picked from commit f3229f5)
arstercz pushed a commit to arstercz/telegraf that referenced this pull request Mar 5, 2023
…xdata#8892)

* change to NewStreamParser to accept larger inputs from scanner

* fmt changes

(cherry picked from commit f3229f5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants