Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvement - Introduce queue for raw statsd payloads #459

Closed

Conversation

pedro-stanaka
Copy link
Contributor

Summary of this PR

In this PR I am introducing a queue for raw statsd payloads (calling them packets), and a worker pool to parse the packets in a parallel manner.

Problem

I am currently managing a fleet of statsd exporters for a project and I noticed it suffers to keep up with high traffic applications. I have observed memory spikes that goes over 500Mib and when profiling those high memory usage instances I could see the hot path was mostly in statsd parsing (see example heap profile).

I could not reach a conclusion on what exactly was causing the memory, I suspected at first it was the GC not being able to react fast enough to the burst of data or dangling references to the maps that get created while parsing tags.
Some things I have tried:

  • Play with GOGC and try to make GC more aggressive: this yielded limited success, in general memory was lower, but we still saw spikes and OOM kills.
  • Build the exporter with Go 1.19, set GOMEMLIMIT: almost same result as GOGC with more stability, but nevertheless OOM kills (with the limits we thing are acceptable for our case).
  • To make sure it was a problem with the rate of which payloads get delivered to the exporter we tried rate-limiting with a custom tool before the exporter, this gave a good result, but metric correctness was hurt so we could not use it.

Solution

Using knowledge of applications my team already built in the past and by looking at the code from the Datadog Agent, I tried to introduce a "buffer" for statsd payloads before the exporter starts parsing the payloads. By doing this you free the "wire" of the UDP connection faster and allow for parallelism on the hard job that is parsing (hot path of heap allocation).

Apart from that, I moved the parsing from each listener to a pool of "workers" which are responsible for doing the work of parsing, relaying and send events to the EventQueue.

Results

  1. First of all we don't have observed OOM Kills anymore, even under higher loads (12 Mib/s of payload traffic).
  2. Memory usage and spikes are much lower, also CPU throttle went away.
    image

Bench comparison

I am omitting the benchmarks that are equal (parsing, line format, etc).

▶ benchstat /tmp/master-bench.txt /tmp/new-bench           
name                            old time/op    new time/op    delta
UDPListener1-8                    8.46µs ± 0%    3.39µs ± 0%   ~     (p=1.000 n=1+1)
UDPListener5-8                     179µs ± 0%     178µs ± 0%   ~     (p=1.000 n=1+1)
UDPListener50-8                   19.7ms ± 0%    17.7ms ± 0%   ~     (p=1.000 n=1+1)
ExporterListener-8                5.44ms ± 0%    6.07ms ± 0%   ~     (p=1.000 n=1+1)
LineToEventsMixed1-8              5.45µs ± 0%    6.14µs ± 0%   ~     (p=1.000 n=1+1)
LineToEventsMixed5-8              27.3µs ± 0%    30.9µs ± 0%   ~     (p=1.000 n=1+1)
LineToEventsMixed50-8              277µs ± 0%     308µs ± 0%   ~     (p=1.000 n=1+1)
LineFormats/dogStatsd-8            390ns ± 0%     454ns ± 0%   ~     (p=1.000 n=1+1)
LineFormats/invalidDogStatsd-8     468ns ± 0%     539ns ± 0%   ~     (p=1.000 n=1+1)
LineFormats/signalFx-8             411ns ± 0%     472ns ± 0%   ~     (p=1.000 n=1+1)
LineFormats/invalidSignalFx-8      303ns ± 0%     348ns ± 0%   ~     (p=1.000 n=1+1)
LineFormats/influxDb-8             381ns ± 0%     453ns ± 0%   ~     (p=1.000 n=1+1)
LineFormats/invalidInfluxDb-8      435ns ± 0%     529ns ± 0%   ~     (p=1.000 n=1+1)
LineFormats/statsd-8               254ns ± 0%     281ns ± 0%   ~     (p=1.000 n=1+1)
LineFormats/invalidStatsd-8        392ns ± 0%     431ns ± 0%   ~     (p=1.000 n=1+1)

name                            old alloc/op   new alloc/op   delta
UDPListener1-8                    6.85kB ± 0%    5.25kB ± 0%   ~     (p=1.000 n=1+1)
UDPListener5-8                     171kB ± 0%     222kB ± 0%   ~     (p=1.000 n=1+1)
UDPListener50-8                   17.1MB ± 0%    22.3MB ± 0%   ~     (p=1.000 n=1+1)
ExporterListener-8                3.44MB ± 0%    3.44MB ± 0%   ~     (p=1.000 n=1+1)
LineToEventsMixed1-8              4.56kB ± 0%    4.56kB ± 0%   ~     (p=1.000 n=1+1)
LineToEventsMixed5-8              22.8kB ± 0%    22.8kB ± 0%   ~     (p=1.000 n=1+1)
LineToEventsMixed50-8              228kB ± 0%     228kB ± 0%   ~     (p=1.000 n=1+1)

Future TODOs

There are some things I would like to have, but I will leave them for another PR to avoid making this PR too big to review:

  • I want to introduce metric for the size of both queues, we did that in other application by wrapping metric handler and passing the channels to them and calling len() before reporting metrics.

Fixing linting problems

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>
@pedro-stanaka
Copy link
Contributor Author

pedro-stanaka commented Aug 22, 2022

So, seems like as soon you introduce parallelism on the workers (more than one routine parsing) we start getting problems with race conditions. I will hold off this PR for now, if you want you can close. Once I have a proper solution I will reopen/open a new one.

Logs (if you might have a tip for me, I thank you)


ts=2022-08-22T08:55:57.155Z caller=main.go:296 level=info msg="Starting StatsD -> Prometheus Exporter" version="(version=, branch=, revision=)"
ts=2022-08-22T08:55:57.156Z caller=main.go:297 level=info msg="Build context" context="(go=go1.19, user=, date=)"
ts=2022-08-22T08:55:57.159Z caller=main.go:359 level=info msg="Accepting StatsD Traffic" udp=:9125 tcp= unixgram=
ts=2022-08-22T08:55:57.159Z caller=main.go:360 level=info msg="Accepting Prometheus Requests" addr=:9102
fatal error: concurrent map writes

goroutine 53 [running]:
github.com/prometheus/statsd_exporter/pkg/exporter.(*Exporter).handleEvent(0xc0000ace40, {0xa6e148, 0xc00054e1a0})
        /go/github.com/prometheus/statsd-exporter/pkg/exporter/exporter.go:109 +0x12f4
github.com/prometheus/statsd_exporter/pkg/exporter.(*Exporter).Listen(0xc0000ace40, 0xc0000aca20)
        /go/github.com/prometheus/statsd-exporter/pkg/exporter/exporter.go:72 +0x1ff
created by main.main
        /go/github.com/prometheus/statsd-exporter/main.go:546 +0x5d9e

goroutine 1 [select]:
main.main()
        /go/github.com/prometheus/statsd-exporter/main.go:552 +0x5e89

goroutine 26 [chan receive]:
github.com/prometheus/statsd_exporter/pkg/event.NewEventQueue.func1()
        /go/github.com/prometheus/statsd-exporter/pkg/event/event.go:94 +0x30
created by github.com/prometheus/statsd_exporter/pkg/event.NewEventQueue
        /go/github.com/prometheus/statsd-exporter/pkg/event/event.go:92 +0x1b9

goroutine 30 [runnable]:
net.wrapSyscallError(...)
        /usr/local/go/src/net/error_posix.go:17
net.(*netFD).readFromInet6(0xc0000bde00, {0xc00040ff69?, 0xde1dc0?, 0xc00040fe30?}, 0x46b70e?)
        /usr/local/go/src/net/fd_posix.go:74 +0x30
net.(*UDPConn).readFrom(0x0?, {0xc00040ff69?, 0xc00040ff18?, 0xc00040ff18?}, 0xc00041ff88)
        /usr/local/go/src/net/udpsock_posix.go:59 +0x85
net.(*UDPConn).readFromUDP(0xc0000b4548, {0xc00040ff69?, 0x8e?, 0xffff?}, 0xc00041ff88?)
        /usr/local/go/src/net/udpsock.go:149 +0x31
net.(*UDPConn).ReadFromUDP(...)
        /usr/local/go/src/net/udpsock.go:141
github.com/prometheus/statsd_exporter/pkg/listener.(*StatsDUDPListener).Listen(0xc00036ccc0)
        /go/github.com/prometheus/statsd-exporter/pkg/listener/listener.go:48 +0xe8
created by main.main
        /go/github.com/prometheus/statsd-exporter/main.go:395 +0x48dd

goroutine 31 [runnable]:
github.com/prometheus/statsd_exporter/pkg/line.buildEvent({0xc0001108f6, 0x2}, {0xc0001108cf, 0x1b}, 0x4079104db163baba, 0x0, 0xc0004406f0)
        /go/github.com/prometheus/statsd-exporter/pkg/line/line.go:64 +0x371
github.com/prometheus/statsd_exporter/pkg/line.(*Parser).LineToEvents(0x0?, {0xc0001108cf, 0x2f}, {0xc0000a00f0}, {0xa701a8, 0xc00006e6c0}, {0xa701a8, 0xc0002182a0}, {0xa701a8, 0xc00006e780}, ...)
        /go/github.com/prometheus/statsd-exporter/pkg/line/line.go:306 +0xa65
github.com/prometheus/statsd_exporter/pkg/parser.(*Worker).handle(0xc0000bde80, {0xc000110870?, 0x65?})
        /go/github.com/prometheus/statsd-exporter/pkg/parser/worker.go:84 +0x266
github.com/prometheus/statsd_exporter/pkg/parser.(*Worker).Consume(0xc0000bde80, 0x0?)
        /go/github.com/prometheus/statsd-exporter/pkg/parser/worker.go:72 +0x34
created by main.main
        /go/github.com/prometheus/statsd-exporter/main.go:490 +0x558c

goroutine 51 [IO wait]:
internal/poll.runtime_pollWait(0x7f603fcf8028, 0x72)
        /usr/local/go/src/runtime/netpoll.go:305 +0x89
internal/poll.(*pollDesc).wait(0xc000220000?, 0x6?, 0x0)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0x32
internal/poll.(*pollDesc).waitRead(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Accept(0xc000220000)
        /usr/local/go/src/internal/poll/fd_unix.go:614 +0x234
net.(*netFD).accept(0xc000220000)
        /usr/local/go/src/net/fd_unix.go:172 +0x35
net.(*TCPListener).accept(0xc000206300)
        /usr/local/go/src/net/tcpsock_posix.go:142 +0x28
net.(*TCPListener).Accept(0xc000206300)
        /usr/local/go/src/net/tcpsock.go:288 +0x3d
net/http.(*Server).Serve(0xc00021e000, {0xa6d508, 0xc000206300})
        /usr/local/go/src/net/http/server.go:3070 +0x385
net/http.(*Server).ListenAndServe(0xc00021e000)
        /usr/local/go/src/net/http/server.go:2999 +0x7d
net/http.ListenAndServe(...)
        /usr/local/go/src/net/http/server.go:3255
main.serveHTTP({0xa6a440?, 0xdb5700}, {0x7fffc5173454, 0x5}, {0xa69c00?, 0xc0000ae580?})
        /go/github.com/prometheus/statsd-exporter/main.go:172 +0xb8
created by main.main
        /go/github.com/prometheus/statsd-exporter/main.go:543 +0x5c7e

goroutine 52 [chan receive]:
main.sighupConfigReloader({0x7fffc517359f, 0x20}, 0x0?, {0xa69c00, 0xc0000ae580})
        /go/github.com/prometheus/statsd-exporter/main.go:180 +0x9a
created by main.main
        /go/github.com/prometheus/statsd-exporter/main.go:545 +0x5d3e

goroutine 55 [syscall]:
os/signal.signal_recv()
        /usr/local/go/src/runtime/sigqueue.go:152 +0x2f
os/signal.loop()
        /usr/local/go/src/os/signal/signal_unix.go:23 +0x19
created by os/signal.Notify.func1.1
        /usr/local/go/src/os/signal/signal.go:151 +0x2a

goroutine 345 [runnable]:
github.com/prometheus/statsd_exporter/pkg/mappercache/lru.(*metricMapperLRUCache).Add.func1()
        /go/github.com/prometheus/statsd-exporter/pkg/mappercache/lru/lru.go:53
runtime.goexit()
        /usr/local/go/src/runtime/asm_amd64.s:1594 +0x1
created by github.com/prometheus/statsd_exporter/pkg/mappercache/lru.(*metricMapperLRUCache).Add
        /go/github.com/prometheus/statsd-exporter/pkg/mappercache/lru/lru.go:53 +0x6e

Copy link
Contributor

@matthiasr matthiasr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this in substance, but I would like to make the transition as easy as possible for library users and make the APIs as clean as possible. I understand that these two goals are in conflict 😅

  • Is there a way we could preserve compatibility with the existing API, or make the necessary change as minimal as possible?
  • How can we make it as easy as possible to set up the workers? I don't like that every user of the library now has to write a loop to set up workers. Would it make sense to wrap this up in a convenience function, or even do it implicitly?
    • the line package kind of has a builder pattern, what would it take to add an optional .Parallel(10) step to that?
    • Or could line.Parser be an interface, implemented by a synchronous parser and a parallel/asynchronous one that wraps it?
  • Are there any other interface changes (like accepting []byte instead of string) that would make sense to do now so we don't have to break the API again later? I am least sure about this, maybe two small breaking changes are better than one complex one.

}
}

func (w *Worker) Consume(c <-chan string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of a side track …

I wonder if we can enable further improvements by accepting a []byte (or an io.Reader) here and in handle below? I did some preliminary benchmarking and while there's a large initial overhead for allocating a bufio.Scanner, the memory use of that is constant even when there's way more lines.

strings_test.go
package main

import (
	"bufio"
	"bytes"
	"strconv"
	"strings"
	"testing"
)

func genInputString(n int) string {
	lines := make([]string, 0, n)
	for i := 0; i < n; i++ {
		lines = append(lines, "aaaaaaa")
	}
	return strings.Join(lines, "\n")
}

func genInputBytes(n int) []byte {
	return []byte(genInputString(n))
}

func BenchmarkMultiLineProcessing(b *testing.B) {
	ns := []int{1, 10, 1000, 10000, 100000}
	for _, n := range ns {
		b.Run("string_"+strconv.Itoa(n), func(b *testing.B) {
			in := genInputString(n)
			b.ResetTimer()
			for i := 0; i < b.N; i++ {
				for s := range strings.Split(in, "\n") {
					_ = s
				}
			}
		})

		b.Run("bytes_split_"+strconv.Itoa(n), func(b *testing.B) {
			in := genInputBytes(n)
			b.ResetTimer()
			for i := 0; i < b.N; i++ {
				for s := range bytes.Split(in, []byte{'\n'}) {
					_ = s
				}
			}
		})

		b.Run("bytes_scan_"+strconv.Itoa(n), func(b *testing.B) {
			in := genInputBytes(n)
			b.ResetTimer()
			for i := 0; i < b.N; i++ {
				reader := bytes.NewReader(in)
				scanner := bufio.NewScanner(reader)
				scanner.Split(bufio.ScanLines)
				for scanner.Scan() {
					_ = scanner.Bytes()
				}
			}
		})
	}
}
Results
goos: darwin
goarch: arm64
pkg: github.com/matthiasr/go-string-split
BenchmarkMultiLineProcessing
BenchmarkMultiLineProcessing/string_1
BenchmarkMultiLineProcessing/string_1-10         	54618621	        22.32 ns/op	      16 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_split_1
BenchmarkMultiLineProcessing/bytes_split_1-10    	44746274	        25.73 ns/op	      24 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_scan_1
BenchmarkMultiLineProcessing/bytes_scan_1-10     	 2749543	       394.7 ns/op	    4144 B/op	       2 allocs/op
BenchmarkMultiLineProcessing/string_10
BenchmarkMultiLineProcessing/string_10-10        	 8026826	       151.1 ns/op	     160 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_split_10
BenchmarkMultiLineProcessing/bytes_split_10-10   	 7045616	       172.7 ns/op	     240 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_scan_10
BenchmarkMultiLineProcessing/bytes_scan_10-10    	 2207896	       566.9 ns/op	    4144 B/op	       2 allocs/op
BenchmarkMultiLineProcessing/string_1000
BenchmarkMultiLineProcessing/string_1000-10      	   77553	     14974 ns/op	   16384 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_split_1000
BenchmarkMultiLineProcessing/bytes_split_1000-10 	   77864	     15383 ns/op	   24576 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_scan_1000
BenchmarkMultiLineProcessing/bytes_scan_1000-10  	   72218	     16174 ns/op	    4144 B/op	       2 allocs/op
BenchmarkMultiLineProcessing/string_10000
BenchmarkMultiLineProcessing/string_10000-10     	    7884	    147825 ns/op	  163840 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_split_10000
BenchmarkMultiLineProcessing/bytes_split_10000-10         	    7954	    152173 ns/op	  245760 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_scan_10000
BenchmarkMultiLineProcessing/bytes_scan_10000-10          	    7416	    158794 ns/op	    4144 B/op	       2 allocs/op
BenchmarkMultiLineProcessing/string_100000
BenchmarkMultiLineProcessing/string_100000-10             	     758	   1560920 ns/op	 1605635 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_split_100000
BenchmarkMultiLineProcessing/bytes_split_100000-10        	     750	   1571428 ns/op	 2400256 B/op	       1 allocs/op
BenchmarkMultiLineProcessing/bytes_scan_100000
BenchmarkMultiLineProcessing/bytes_scan_100000-10         	     765	   1574084 ns/op	    4144 B/op	       2 allocs/op
PASS
ok  	github.com/matthiasr/go-string-split	21.298s

I guess a UDP or datagram receiver would benefit from this only if the input is >4KiB, but if could hook up a single scanner to a TCP connection somehow (not sure how to do this) we could get close to a zero-copy parsing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the idea of having byte slice, but I am running into a weird behavior with the TCP listener. I don't think I have time to drill down into what is happening right now. Seems a problem when decoding the invalid UTF-8 chars, at least all tests regarding that are failing.

main.go Outdated Show resolved Hide resolved
@@ -118,7 +118,7 @@ func (eq *EventQueue) Flush() {

func (eq *EventQueue) FlushUnlocked() {
eq.C <- eq.q
eq.q = make([]Event, 0, cap(eq.q))
eq.q = eq.q[:0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly happens differently here? We just passed this eq.q into a channel – is it safe to [:0] it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here was actually the solution to my race/concurrent map access conditions. The only thing, is that now memory spikes again, not as bad as it was before, but I don't know if it is enough to create a PR for this. I can try to work on the API improvements you mentioned, if you still are interested in this parallel parsing.

From what I saw, tuning the flush threshold to a really low value, helps keep memory in control (even on master). So the culprit for my OOM Kills is actually this internal queue here. It is instantiated here, and is long-lived (references for maps will be kept until the code that is handling the registry).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, do I understand right that we have a fundamental throughput vs. memory use trade-off here? This queueing/batching was introduced introduced in #227 to relieve another (locking) bottleneck. Given this batching, how much of a difference can parallelized parsing make?

@matthiasr matthiasr marked this pull request as draft August 22, 2022 17:15
@matthiasr
Copy link
Contributor

I marked the PR as draft for now – feel free to mark it as ready for review once you found the race conditions.

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>
Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>
@matthiasr
Copy link
Contributor

One more thought because it's not immediately obvious (or I missed it) – what happens on configuration reload? Should we keep using the same workers, or replace and cancel them?

@matthiasr
Copy link
Contributor

One thing that worries me about the parallelization is that we are re-introducing a channel that handles every packet. It's not quite as bad as the problems that #227 addressed as long as there's more than one line per packet, but in edge cases it might be.

On reflection, I am also worried about accidentally reordering events. That's probably OK™ for counter and histogram events but could cause wrong results with gauges. How could we prevent that without introducing more locking?

@kullanici0606
Copy link
Contributor

While I was investigating an issue of k6 (grafana/k6#2044), I also saw that UDP packets are dropped due to parsing and I also tried to fix the issue by separating reading UDP packets from socket and parsing them:

https://github.com/kullanici0606/statsd_exporter

(for diff: master...kullanici0606:statsd_exporter:master)

Then when I wanted to create a PR to get a feedback whether this was a good idea, I saw this PR, which has a very similar idea with mine.

The difference is that I only introduced one goroutine which means that packet ordering is not going to be an issue, but it may not be as performant as this PR.

Do you think that my changes can be used as a transitional way for this PR?

If this PR is already on merging process, then I will not create another PR so that I do not take time of the developers of this repo.

@pedro-stanaka
Copy link
Contributor Author

While I was investigating an issue of k6 (grafana/k6#2044), I also saw that UDP packets are dropped due to parsing and I also tried to fix the issue by separating reading UDP packets from socket and parsing them:

https://github.com/kullanici0606/statsd_exporter

(for diff: master...kullanici0606:statsd_exporter:master)

Then when I wanted to create a PR to get a feedback whether this was a good idea, I saw this PR, which has a very similar idea with mine.

The difference is that I only introduced one goroutine which means that packet ordering is not going to be an issue, but it may not be as performant as this PR.

Do you think that my changes can be used as a transitional way for this PR?

If this PR is already on merging process, then I will not create another PR so that I do not take time of the developers of this repo.

I would suggest writing some benchmarks for your solution in terms of memory usage. This loop with a slice instantiation seems like will put pressure on the heap.

image

If the changes are acceptable by the maintainers, I think it is a nice addition though.

Regarding the order of packets, I don't think it will be such a huge problem. If you are scraping the exporter at a 15 second interval you are getting only the very last gauge sample in this 15s interval, but parsing takes milliseconds (or less), so if two gauges observations from the same series happen you will have occasionally problems with wrong data.

@kullanici0606
Copy link
Contributor

Thank you for the feedback, I will try to write benchmarks for memory usage. My initial concern and the problem in the issue was udp packet drops due to parsing.

In that code, I copy the bytes to a new slice because listener re-uses the buf so it is modified while other goroutine tries to parse it, therefore copying is necessary.

In your code, you do the same actually, but implicitly:

l.PacketBuffer <- string(packet)

Since string also copies the bytes internally, so I think same amount of memory is used.

I will try to measure it though.

@matthiasr
Copy link
Contributor

Closing this, since a simplified form landed in #511. Many thanks @pedro-stanaka for collaborating on that!

@matthiasr matthiasr closed this Oct 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants