Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write batching strategy #156

Open
FZambia opened this issue Dec 7, 2022 · 16 comments
Open

Write batching strategy #156

FZambia opened this issue Dec 7, 2022 · 16 comments
Labels
enhancement New feature or request

Comments

@FZambia
Copy link
Contributor

FZambia commented Dec 7, 2022

Hey @rueian, this is me again.

I was preparing Rueidis-based code for release and suddenly discovered an interesting thing. I did quite a lot of Go benchmarks to make sure the new implementation based on Rueidis produces a better operation latency and a better throughput. And it does.

I also expected that migration to Rueidis will provide Centrifugo a better CPU utilization since Rueidis produces less memory allocations. And here are dragons.

Before making release I decided to do macro-benchmarks and found that Centrifugo consumes more CPU than before in equal conditions. Moreover, Rueidis-based implementation results into more CPU usage on Redis instance than we had with previous implementation. I did not expect that at all. To investigate that I made a repo: https://github.com/FZambia/pipelines.

In that repo I implemented 3 benchmarks: for pipelined Redigo, pipelined Go-Redis and Rueidis.

After running benchmarks I observed the following:

input_1.mp4
❯ go test -run xxx -bench . -benchtime 10s
goos: darwin
goarch: arm64
pkg: github.com/FZambia/pipelines
BenchmarkRedigo-8    	12460600	       959.6 ns/op	     181 B/op	       4 allocs/op
BenchmarkGoredis-8   	 8069197	      1534 ns/op	     343 B/op	       5 allocs/op
BenchmarkRueidis-8   	19451470	       620.0 ns/op	      80 B/op	       1 allocs/op

Here we can see that CPU usage is:

Redigo Goredis Rueidis
Application CPU, % 285 270 470
Redis CPU, % 56 34 80

Nothing too special here – all numbers are +/- expected. Rueidis produced better throughput so it loaded Redis more and the price for the better throughput is application CPU utilization.

But in Centrifugo case I compared CPU usage with Redigo and Rueidis in equal conditions. So I added rate limiter to benchmarks in the https://github.com/FZambia/pipelines repo to generate the same load in all cases. Limiting load to 100 commands per millisecond (100k per second).

input_2.mp4
❯ PIPE_LIMITED=1 go test -run xxx -bench . -benchtime 10s
goos: darwin
goarch: arm64
pkg: github.com/FZambia/pipelines
BenchmarkRedigo-8    	 1000000	     10000 ns/op	     198 B/op	       5 allocs/op
BenchmarkGoredis-8   	 1000000	     10000 ns/op	     350 B/op	       8 allocs/op
BenchmarkRueidis-8   	 1000000	     10000 ns/op	     113 B/op	       2 allocs/op
PASS
ok  	github.com/FZambia/pipelines	30.629s
Redigo Goredis Rueidis
Application CPU, % 91 96 118
Redis CPU, % 36 34 45

This is more interesting. We are generating the same load in all benchmarks but both app and Redis CPU is the worst in Rueidis case.

Turned out the difference here is the result of different batch sizes we are sending to Redis. In Redigo/Goredis case we have larger batches than in Rueidis case. In Rueidis case we have smaller size batches and thus more syscalls in app and on Redis side. As we can see CPU is very sensitive to this.

There is a project called Twemproxy which acts as a proxy between applications and Redis and makes automatic batches thus reducing load on Redis, so in general pipelining is known not only to increase throughput but to reduce CPU usage of Redis. As Redis is single threaded its capacity is quite limited actually.

I tried to find a simple way to improve batching of Rueidis somehow. The simplest solution I found at this point is this one: main...FZambia:rueidis:GetWriterEachConn

I.e. introducing an option to provide custom bufio.Writer. I used it like this:

func rueidisClient() rueidis.Client {
	options := rueidis.ClientOption{
		InitAddress:  []string{":6379"},
		DisableCache: true,
	}
	if os.Getenv("PIPE_DELAYED") != "" {
		options.GetWriterEachConn = func(writer io.Writer) (*bufio.Writer, func()) {
			mlw := newDelayWriter(bufio.NewWriterSize(writer, 1<<19), time.Millisecond)
			w := bufio.NewWriterSize(mlw, 1<<19)
			return w, func() { mlw.close() }
		}
	}
	client, err := rueidis.NewClient(options)
	if err != nil {
		log.Fatal(err)
	}
	return client
}


type writeFlusher interface {
	io.Writer
	Flush() error
}

type delayWriter struct {
	dst   writeFlusher
	delay time.Duration // zero means to flush immediately

	mu           sync.Mutex // protects tm, flushPending, and dst.Flush
	tm           *time.Timer
	err          error
	flushPending bool
}

func newDelayWriter(dst writeFlusher, delay time.Duration) *delayWriter {
	return &delayWriter{dst: dst, delay: delay}
}

func (m *delayWriter) Write(p []byte) (n int, err error) {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.err != nil {
		return 0, err
	}
	n, err = m.dst.Write(p)
	if m.delay <= 0 {
		err = m.dst.Flush()
		return
	}
	if m.flushPending {
		return
	}
	if m.tm == nil {
		m.tm = time.AfterFunc(m.delay, m.delayedFlush)
	} else {
		m.tm.Reset(m.delay)
	}
	m.flushPending = true
	return
}

func (m *delayWriter) delayedFlush() {
	m.mu.Lock()
	defer m.mu.Unlock()
	if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
		return
	}
	err := m.dst.Flush()
	if err != nil {
		m.err = err
	}
	m.flushPending = false
}

func (m *delayWriter) close() {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.flushPending = false
	if m.tm != nil {
		m.tm.Stop()
	}
}

The code of delayed writer inspired by Caddy's code. It basically delays writes into connection.

We sacrifice latency for less syscalls.

input_3.mp4
❯ PIPE_LIMITED=1 PIPE_DELAYED=1 go test -run xxx -bench . -benchtime 10s
goos: darwin
goarch: arm64
pkg: github.com/FZambia/pipelines
BenchmarkRedigo-8    	 1000000	     10000 ns/op	     198 B/op	       5 allocs/op
BenchmarkGoredis-8   	 1000000	     10000 ns/op	     350 B/op	       8 allocs/op
BenchmarkRueidis-8   	 1000000	     10002 ns/op	     114 B/op	       2 allocs/op
PASS
ok  	github.com/FZambia/pipelines	30.712s
Redigo Goredis Rueidis
Application CPU, % 91 96 51
Redis CPU, % 36 34 6

From these results we can see that by better batching we can reduce both application and Redis CPU usage, as we make less read/write syscalls. For Rueidis CPU of benchmark process reduced from 118 to 51 %, for Redis process from 45 to 6 %. Extra millisecond latency seems tolerable for such a huge resource reduction.


Unfortunately, it may be that I missed sth – so would be interesting to listen to your opinion, whether you see potential issues with this approach. Actually under different level of parallelism results may be different – since batch sizes change. All libraries in the test may perform better or worse.

I think resource reduction like this is great to have. In Centrifugo case users tend to add more Centrifugo nodes that work with single Redis instance - so possibility to keep Redis CPU as low as possible seems nice. Probably you may suggest a better approach to achieve this.

@rueian
Copy link
Collaborator

rueian commented Dec 8, 2022

Hi @FZambia,

Thank you very much for your benchmark. It is very impressive.

Your benchmark clearly shows that the rueidis' pipe._backgroundWrite() is more aggressive comparing to the select or break pipelining technique you used with redigo and goredis. In other words, it loops faster than receiving commands from go channel and misses too many chances to batch commands.

Therefore, if we make it slow down a little bit, for example:

diff --git a/pipe.go b/pipe.go
index 8807779..e800a0d 100644
--- a/pipe.go
+++ b/pipe.go
@@ -292,6 +292,7 @@ func (p *pipe) _backgroundWrite() (err error) {
                                err = p.Error()
                        } else {
                                err = p.w.Flush()
+                               time.Sleep(time.Microsecond * 100)
                        }
                        if err == nil {
                                if atomic.LoadInt32(&p.state) == 1 {

Then we can have a similar result:

slowbatch.mov

This approach is probably simpler than customizing bufio writer.

Actually under different level of parallelism results may be different – since batch sizes change. All libraries in the test may perform better or worse.

Indeed. Even the real network environment should also be taken into consideration.

Although it is really hard or almost impossible for users to tweak this kind of delay, I think we can still have an option in ClientOption to slow pipe._backgroundWrite() down.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 8, 2022

Actually I also did Sleep initially while looking for a reason but then decided it's too aggressive change to the library. Also, I was unsure how blocking on this stage may affect rueidis. In additional buffer case I showed we do not block writer loop. But you think blocking it with sleep wont introduce any downsides outside the increased latency right?

@rueian
Copy link
Collaborator

rueian commented Dec 8, 2022

Yes, I think so. Maybe the sleep should be added under the if err == nil block.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 8, 2022

Thinking more... I still worrying it's not a strict equivalent to intermediate buffer. Because in the intermediate buffer approach we never block writing, timer is asynchronous there. So time interval to use in the intermediate buffer does not play the same role as it does if we add Sleep to backgroundWrite loop directly. 🤔 Possibly we can somehow get best from two worlds to avoid blocking.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 8, 2022

Probably sth like this? (optional FlushInterval for ClientOption):

diff --git a/pipe.go b/pipe.go
index 2eb242d..c7a50bd 100644
--- a/pipe.go
+++ b/pipe.go
@@ -46,6 +46,7 @@ type pipe struct {
 	cache           cache
 	r               *bufio.Reader
 	w               *bufio.Writer
+	flushInterval   time.Duration
 	close           chan struct{}
 	onInvalidations func([]RedisMessage)
 	r2psFn          func() (p *pipe, err error)
@@ -82,6 +83,8 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
 		r:     bufio.NewReaderSize(conn, option.ReadBufferEachConn),
 		w:     bufio.NewWriterSize(conn, option.WriteBufferEachConn),

+		flushInterval: time.Millisecond,
+
 		nsubs: newSubs(),
 		psubs: newSubs(),
 		ssubs: newSubs(),
@@ -286,13 +289,37 @@ func (p *pipe) _backgroundWrite() (err error) {
 		ch    chan RedisResult
 	)

+	var mu sync.Mutex
+
+	if p.flushInterval > 0 {
+		go func() {
+			for {
+				select {
+				case <-time.After(p.flushInterval):
+					mu.Lock()
+					err = p.w.Flush()
+					mu.Unlock()
+					if err != nil {
+						// TODO.
+					}
+				case <-p.close:
+					return
+				}
+			}
+		}()
+	}
+
 	for atomic.LoadInt32(&p.state) < 3 {
 		if ones[0], multi, ch = p.queue.NextWriteCmd(); ch == nil {
+			mu.Lock()
 			if p.w.Buffered() == 0 {
 				err = p.Error()
 			} else {
-				err = p.w.Flush()
+				if p.flushInterval == 0 {
+					err = p.w.Flush()
+				}
 			}
+			mu.Unlock()
 			if err == nil {
 				if atomic.LoadInt32(&p.state) == 1 {
 					ones[0], multi, ch = p.queue.WaitForWrite()
@@ -306,7 +333,9 @@ func (p *pipe) _backgroundWrite() (err error) {
 			multi = ones
 		}
 		for _, cmd := range multi {
+			mu.Lock()
 			err = writeCmd(p.w, cmd.Commands())
+			mu.Unlock()
 		}
 		if err != nil {
 			if err != ErrClosing { // ignore ErrClosing to allow final QUIT command to be sent

(Locks may be avoided if FlushInterval not used)

@rueian
Copy link
Collaborator

rueian commented Dec 9, 2022

Oh… Additional goroutine and lock looks too heavy. Furthermore, the lock should also be taken on the synchronous path.

Thinking more... I still worrying it's not a strict equivalent to intermediate buffer. Because in the intermediate buffer approach we never block writing, timer is asynchronous there.

The only difference I can think of is missing some chances to trigger implicit flush of bufio writer if there are some commands exceeding the size of its buffer during we are slept. Given that the sleeping interval should be small, I think this difference is not a big deal.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 9, 2022

I tested both approaches - with extra flush goroutine and with:

if p.flushInterval > 0 {
	select {
	case <-time.After(time.Millisecond):
	case <-p.close:
		return
	}
}

– after Flush (i.e. basically Sleep).

For 128 parallelism (old is extra flush goroutine, new is pause in background write loop):

name       old time/op    new time/op    delta
Rueidis-8    1.09µs ± 1%    1.19µs ± 1%  +9.58%  (p=0.000 n=20+20)

name       old alloc/op   new alloc/op   delta
Rueidis-8     80.0B ± 0%     80.0B ± 0%    ~     (all equal)

name       old allocs/op  new allocs/op  delta
Rueidis-8      1.00 ± 0%      1.00 ± 0%    ~     (all equal)

So approach with pause gives a slightly worse latency than flush in goroutine.

For 1024 paralellism:

name       old time/op    new time/op    delta
Rueidis-8    1.08µs ± 1%    1.21µs ± 1%  +12.01%  (p=0.000 n=17+20)

name       old alloc/op   new alloc/op   delta
Rueidis-8     82.0B ± 0%     82.0B ± 0%     ~     (all equal)

name       old allocs/op  new allocs/op  delta
Rueidis-8      1.00 ± 0%      1.00 ± 0%     ~     (all equal)

Similar picture.


Though for app and Redis CPU usage the picture is like this:

For 128 parallelism:

Flush goroutine: App cpu 290%, Redis CPU 42%
Sleep/Pause: App cpu 177%, Redis CPU 38%

For 1024 parallelism:

App cpu 388%, Redis CPU 40%
Sleep/Pause: App cpu 280%, Redis CPU 38%

App CPU reduction seems significant in sleep/pause case. So I think sleep/pause approach is OK.

It's worth mentioning that when concurrency is small, then both approaches result into significantly less throughput. Sth like 8k requests per second instead of 245k rps on parallelism == 1, 77k rps instead of 1mln rps on parallelism == 10. For my use case it's totally OK as requests come from concurrent parts of the application, I can't figure out at the moment whether it's possible to introduce more adaptive strategy here. I.e. some factor which will increase batch sizes as soon as concurrency and number of requests grows (but again, for my use case it's not important).

@FZambia
Copy link
Contributor Author

FZambia commented Dec 9, 2022

Also tested for sequential case: it's 1k rps vs 45k rps.

for my use case it's not important

Actually it seems that for some scenarios this may be important.. Whether it's possible to avoid sleeping at all and still have good batches in all cases 🤔 Probably try to combine current rueidis approach with a smart batching technique I had before.

@rueian
Copy link
Collaborator

rueian commented Dec 9, 2022

Hi @FZambia,

So approach with pause gives a slightly worse latency than flush in goroutine.

I also got similar results of +10% latency. But I quickly realized pausing pipe._backgroundWrite() for 1 millisecond was too long in the sense of just slowing it down a little bit.

I think pausing it for 20 microseconds is enough for local redis server. Here are results:

For 128 parallelism (old is extra flush goroutine, new is pause in background write loop):

▶ benchstat old.txt new.txt
name        old time/op    new time/op    delta
Rueidis-10     608ns ± 2%     596ns ± 2%  -1.89%  (p=0.000 n=20+19)

name        old alloc/op   new alloc/op   delta
Rueidis-10     82.0B ± 0%     80.0B ± 0%  -2.44%  (p=0.000 n=20+20)

name        old allocs/op  new allocs/op  delta
Rueidis-10      1.00 ± 0%      1.00 ± 0%    ~     (all equal)

For 1024 parallelism:

▶ benchstat old.txt new.txt
name        old time/op    new time/op    delta
Rueidis-10     679ns ± 7%     663ns ± 7%  -2.35%  (p=0.013 n=20+20)

name        old alloc/op   new alloc/op   delta
Rueidis-10     83.4B ± 1%     81.0B ± 0%  -2.82%  (p=0.000 n=20+20)

name        old allocs/op  new allocs/op  delta
Rueidis-10      1.00 ± 0%      1.00 ± 0%    ~     (all equal)

And still have comparable throughput when parallelism = 1:

▶ go test -run xxx -bench . -benchtime 10s
goos: darwin
goarch: arm64
pkg: github.com/FZambia/pipelines
BenchmarkRedigo-10     	 2640541	      4532 ns/op	     173 B/op	       4 allocs/op
BenchmarkGoredis-10    	 2459900	      4871 ns/op	     328 B/op	       6 allocs/op
BenchmarkRueidis-10    	 2707039	      4435 ns/op	      80 B/op	       1 allocs/op
PASS
ok  	github.com/FZambia/pipelines	50.350s

However, for sequential usage, 20 microseconds is still too long. Thankfully it is possible to detect this case:

diff --git a/pipe.go b/pipe.go
index 8807779..28a4bb7 100644
--- a/pipe.go
+++ b/pipe.go
@@ -291,7 +291,13 @@ func (p *pipe) _backgroundWrite() (err error) {
                        if p.w.Buffered() == 0 {
                                err = p.Error()
                        } else {
-                               err = p.w.Flush()
+                               if atomic.LoadInt32(&p.waits) == 1 {
+                                       err = p.w.Flush()
+                               } else {
+                                       ts := time.Now()
+                                       err = p.w.Flush()
+                                       time.Sleep(time.Microsecond*20 - time.Since(ts))
+                               }
                        }
                        if err == nil {
                                if atomic.LoadInt32(&p.state) == 1 {

Also note that in pausing case we probably should record the elapsed time of p.w.Flush() to take network condition into account.


I can't figure out at the moment whether it's possible to introduce more adaptive strategy here.

I think it is possible and probably can support multiple strategies by swapping pipes when a standalone monitoring goroutine find a better pipe configuration.

@FZambia
Copy link
Contributor Author

FZambia commented Dec 10, 2022

I think pausing it for 20 microseconds is enough for local redis server. Here are results:

20 microseconds seem to work fine in all scenarios, right! (And actually produces better throughput for me for non-limited scenario). But I suppose it should be optional anyway? Or default since improves throughput a bit? And I think it would be nice to tune it for non-local setup where RTT is larger so we can afford larger time to collect batches? Should we also listen to pipe close channel while sleeping to return quickly?

More advanced strategies are nice to have - though much more complex and I can't even imagine one now, and probably should be tested with all the parallelism, request rates, network latencies taken into account.

@rueian
Copy link
Collaborator

rueian commented Dec 10, 2022

But I suppose it should be optional anyway? Or default since improves throughput a bit?

Even though it produces better throughput for non-limited scenario, I think it should be optional because latency is still critical to many users.

Should we also listen to pipe close channel while sleeping to return quickly?

Actually, we shouldn't and it won't have effect. p.close is closed only after pipe._backgroundWrite().

And I think it would be nice to tune it for non-local setup where RTT is larger so we can afford larger time to collect batches?

Yes, I propose that adding an optional MaxFlushDelay time.Duration to ClientOption. Would you like to change #157 to this proposal?

More advanced strategies are nice to have - though much more complex and I can't even imagine one now, and probably should be tested with all the parallelism, request rates, network latencies taken into account.

Sure. Thankfully, it is pipe._backgroundWrite() runs too fast and gives us rooms for future improvement. I think we can probably choose a better sleeping duration automatically based on some statistics, such as per byte flush latency.

rueian added a commit that referenced this issue Dec 10, 2022
rueian added a commit that referenced this issue Dec 10, 2022
@rueian
Copy link
Collaborator

rueian commented Dec 10, 2022

Hi @FZambia, many thanks! The MaxFlushDelay option has already released in v0.0.89. I think we can leave this issue open for future improvement.

@lgothard
Copy link

lgothard commented Dec 20, 2022

@rueian Noticed the following being sent to Redis Enterprise when DisableCache: true with this change

"HELLO" "3" "AUTH"
"ERR Syntax error in HELLO option 'AUTH'"

@rueian
Copy link
Collaborator

rueian commented Dec 20, 2022

Hi @lgothard,

Would you mind providing the code snippet and how you set up Redis Enterprise?

I have tried to reproduce the error but failed. In my case, Redis Enterprise 6.2.6 returns unknown command 'HELLO' instead of ERR Syntax error in HELLO option 'AUTH'

@lgothard
Copy link

Hey @rueian,

This was my bad. I had my windows confused. I got this error on Redis Stack v6.2.7. On our Redis Enterprise v6.0.16 test server, I don’t see it. Sorry for the confusion.

@rueian
Copy link
Collaborator

rueian commented Dec 29, 2022

This was my bad. I had my windows confused. I got this error on Redis Stack v6.2.7. On our Redis Enterprise v6.0.16 test server, I don’t see it. Sorry for the confusion.

Hi @lgothard,

The latest version of Redis Stack seems to be v6.2.6-v0. I have tested the following snippet with both v6.2.6-v0 and 7.0.6-RC2 and it worked fine:

package main

import (
	"github.com/rueian/rueidis"
)

func main() {
	c, err := rueidis.NewClient(rueidis.ClientOption{
		InitAddress:  []string{"127.0.0.1:6379"},
		DisableCache: true,
	})
	if err != nil {
		panic(err)
	}
	defer c.Close()
}

Would you mind providing your code snippet as well?

"HELLO" "3" "AUTH"
"ERR Syntax error in HELLO option 'AUTH'"

The error message seemed to indicate that there was no username and password in the AUTH part. But I currently couldn't find a way to reproduce the situation and provide a proper fix.

@rueian rueian added the help wanted Extra attention is needed label May 14, 2023
@rueian rueian added the enhancement New feature or request label May 24, 2023
@rueian rueian removed the help wanted Extra attention is needed label Oct 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants