From ba3adb32e1094496b9bee89cd30fbb46e61cdd22 Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 14 Dec 2022 23:03:54 +0800 Subject: [PATCH] perf: improve user specified flush delay automatically by statistics --- pipe.go | 46 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/pipe.go b/pipe.go index e33378fa..e6437e1d 100644 --- a/pipe.go +++ b/pipe.go @@ -281,26 +281,50 @@ func (p *pipe) _background() { atomic.StoreInt32(&p.state, 4) } +type flush struct { + works time.Duration + waits time.Duration + bytes int +} + +const flushesKeep = 16 +const flushesMask = flushesKeep - 1 + func (p *pipe) _backgroundWrite() (err error) { var ( ones = make([]cmds.Completed, 1) multi []cmds.Completed ch chan RedisResult - flushDelay = p.maxFlushDelay - flushStart = time.Time{} + flushes [flushesKeep]flush + flushId = 0 + bytes = 1 // avoid zero + works = time.Duration(0) + waits = time.Duration(0) + + userDelay = p.maxFlushDelay ) + var ts1 = time.Now() for atomic.LoadInt32(&p.state) < 3 { if ones[0], multi, ch = p.queue.NextWriteCmd(); ch == nil { - if flushDelay != 0 { - flushStart = time.Now() - } - if p.w.Buffered() == 0 { + buf := p.w.Buffered() + if buf == 0 { err = p.Error() + } else if userDelay == 0 { + err = p.w.Flush() } else { + ts2 := time.Now() err = p.w.Flush() + dur, gap := time.Since(ts2), ts2.Sub(ts1) + bytes = bytes + buf - flushes[flushId].bytes + works = works + dur - flushes[flushId].works + waits = waits + gap - flushes[flushId].waits + flushes[flushId] = flush{bytes: buf, works: dur, waits: gap} + flushId = (flushId + 1) & flushesMask + ts1 = ts2 } + if err == nil { if atomic.LoadInt32(&p.state) == 1 { ones[0], multi, ch = p.queue.WaitForWrite() @@ -308,8 +332,14 @@ func (p *pipe) _backgroundWrite() (err error) { runtime.Gosched() continue } - if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage - time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/rueian/rueidis/issues/156 + if userDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage + byteWork := works / time.Duration(bytes) + byteWait := waits / time.Duration(bytes) + delay := byteWait * (works / flushesKeep) / (byteWait + byteWork + 1) // avoid zero + if delay > userDelay { + delay = userDelay + } + time.Sleep(delay) // ref: https://github.com/rueian/rueidis/issues/156 } } }