New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

input plugin for hystrix-stream-servlet #3309

Open
wants to merge 17 commits into
base: master
from

Conversation

Projects
None yet
6 participants
@nilsmagnus

nilsmagnus commented Oct 5, 2017

Support scraping of the hystrix-stream-servlet to collect metrics.

Required for all PRs:

  • Signed CLA.
  • Associated README.md updated.
  • Has appropriate unit tests.

nilsmagnus added some commits Oct 5, 2017

@nilsmagnus nilsmagnus changed the title from #3308 input plugin for hystrix-stream-servlet to input plugin for hystrix-stream-servlet Oct 5, 2017

@danielnelson danielnelson added the triage label Oct 5, 2017

@nilsmagnus

This comment has been minimized.

nilsmagnus commented Nov 16, 2017

@danielnelson , do you know if this will be merged into master, or does it need any refinement?

@danielnelson

This comment has been minimized.

Contributor

danielnelson commented Nov 17, 2017

I'll try to take a look at it soon

@russorat russorat added new plugin and removed triage labels Mar 22, 2018

@darjisanket

This comment has been minimized.

darjisanket commented May 2, 2018

@danielnelson I am interested in this plugin, May I know What is the plan for releasing this feature?


const sampleConfig = `
## Hystrix stream servlet to connect to (with port and full path)
hystrix_servlet_url = "http://localhost:8090/hystrix"

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

Shorten this to url

`

type HystrixData struct {
Hystrix_servlet_url string

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

Just an aside if we rename this field, but you can use a struct tag here to control the value in the config file:

HystrixServletURL string `toml:"hystrix_servlet_url"`
RollingCountShortCircuited int `json:"rollingCountShortCircuited"`
RollingCountSuccess int `json:"rollingCountSuccess"`
RollingCountThreadPoolRejected int `json:"rollingCountThreadPoolRejected"`
RollingCountTimeout int `json:"rollingCountTimeout"`

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

Don't define fields that won't be processed, they will be ignored if they are in the input JSON.

cachedEntries []HystrixStreamEntry
reader io.ReadCloser
cacheLock sync.Mutex
)

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

The plugin can be defined multiple times, so this code would need to be thread-safe. I think it makes the most sense to make this a new type so each plugin would have it's own parser.

This comment has been minimized.

@nilsmagnus

nilsmagnus May 30, 2018

Good point, I will update the code with a new type

func latestEntries(url string) ([]HystrixStreamEntry, error) {

if !healthy {
resp, err := http.Get(url)

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

Since we are doing HTTP, we will need to add support for a timeout, TLS, and use a custom http.Client. The http input is probably the simplest example.

cachedEntries = make([]HystrixStreamEntry, 0)
go fillCacheForever(scanner)
healthy = true
log.Printf("I! Initialized hystrix-input with url : [%s]", url)

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

Remove this log message, too verbose

result := make([]string, 0)
for scanner.Scan() {
text := scanner.Text()
if isData(text) {

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

Don't add/call this function, use if text == "" which is Go convention.

text := scanner.Text()
if isData(text) {
result = append(result, scanner.Text())
break

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

Is this function supposed to only return the first non-empty line? If so why return a slice of strings?

This comment has been minimized.

@nilsmagnus

nilsmagnus May 30, 2018

Thanks, dont know what I was thinking here


func fillCacheForeverMax(scanner *bufio.Scanner, maxEntries int) {
newEntryCounter := 0
for scanner.Err() == nil {

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

I think this should be:

for scanner.Scan() {
	text := scanner.Text()
	if text == "" {
		continue
	}

	// other stuff
}
entries, err := parseChunk(chunk)
if err == nil {
for _, entry := range entries {
cacheLock.Lock()

This comment has been minimized.

@danielnelson

danielnelson May 9, 2018

Contributor

I don't understand the purpose of the cache, can you explain it to me? It seems to me that it would be better to parse without the goroutine and just return the entries.

This comment has been minimized.

@nilsmagnus

nilsmagnus May 30, 2018

The hystrix-servlet is writing to a stream, so I cannot wait until it finishes. If we remove the goroutine/cache we could instead wait for N entries or until a timeout, but the stream we are not reading in between requesting the servlet will be lost.

For instance, if you curl a hystrix-servlet, it will never finish, because it is constantly writing data to the response. The entries in the response contains different information(the next entry could contain information about a different hystrix-circuit than the previous entry), so if they are not parsed in a cache they will be lost.

Do you know of other input-plugins that are parsing streams that solves this differently so that I can study them for inspiration?

This comment has been minimized.

@danielnelson

danielnelson May 30, 2018

Contributor

You will probably need to use the telegraf.ServiceInput interface which adds Start and Stop functions. In the Start function you can launch your goroutine that connects over http to start streaming, and also have it handle reconnects. In the Stop function you can cancel the goroutine and wait for it to complete. I suggest using a context.Context for this.

I recommend hanging onto the Accumulator passed into Start and calling AddFields immediately instead of having the cache.

I would say this is most like one of the queue consumer plugins such as kafka_consumer.

@scolytus

This comment has been minimized.

scolytus commented May 26, 2018

@nilsmagnus any chance you could give us a hint if you will take care of the review comments? No pressure, just curious :)

@nilsmagnus

This comment has been minimized.

nilsmagnus commented May 26, 2018

@scolytus, I have set off time on Wednesday for this 😉

@nilsmagnus

This comment has been minimized.

nilsmagnus commented May 30, 2018

@scolytus , I have made some changes based on your input. Could you have another look at it when you have time?

@scolytus

This comment has been minimized.

scolytus commented Jun 4, 2018

@nilsmagnus it wasn't me who reviewed your pull request, it was @danielnelson :-)

@hekmekk

This comment has been minimized.

hekmekk commented Jun 16, 2018

First off, I'm very interested in and thankful for you to have written this plugin! 👍 Hope it will be merged soon!

When building your fork @nilsmagnus however, I was put off by the fact that ./telegraf --usage hystrix-stream (as suggested by your README) would return an error indicating the plugin could not be found. It required making a guess (./telegraf config | grep -i "inputs\.hystrix") to find out which name the plugin actually goes by.

I basically just now got involved with both telegraf and go, so I'm not sure if the camel-case has some meaning I missed or if this is an accidental mismatch between documentation and fact. Mind clarifying? :)

@nilsmagnus

This comment has been minimized.

nilsmagnus commented Jun 17, 2018

@hekmekk , you are right, it was declared as "HystrixStream" wich does not follow conventions. Corrected this now.

@nilsmagnus

This comment has been minimized.

nilsmagnus commented Jun 17, 2018

@danielnelson, I think the pull-request can be re-reviewed now, whenever you have time for it 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment