Skip to content

Commit

Permalink
fix and optimize varint encoding (#305)
Browse files Browse the repository at this point in the history
* fix and optimize varint encofing

* make the algorithm more robust and add documentation

* move sz check out of the inner loop
  • Loading branch information
Achille committed Jul 8, 2019
1 parent 1aea340 commit 03ea927
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 16 deletions.
57 changes: 46 additions & 11 deletions read.go
Expand Up @@ -43,17 +43,52 @@ func readInt64(r *bufio.Reader, sz int, v *int64) (int, error) {
}

func readVarInt(r *bufio.Reader, sz int, v *int64) (remain int, err error) {
l := 0
remain = sz
for done := false; !done && err == nil; {
remain, err = peekRead(r, remain, 1, func(b []byte) {
done = b[0]&0x80 == 0
*v |= int64(b[0]&0x7f) << uint(l*7)
})
l++
}
*v = (*v >> 1) ^ -(*v & 1)
return
// Optimistically assume that most of the time, there will be data buffered
// in the reader. If this is not the case, the buffer will be refilled after
// consuming zero bytes from the input.
input, _ := r.Peek(r.Buffered())
x := uint64(0)
s := uint(0)

for {
if len(input) > sz {
input = input[:sz]
}

for i, b := range input {
if b < 0x80 {
x |= uint64(b) << s
*v = int64(x>>1) ^ -(int64(x) & 1)
n, err := r.Discard(i + 1)
return sz - n, err
}

x |= uint64(b&0x7f) << s
s += 7
}

// Make room in the input buffer to load more data from the underlying
// stream. The x and s variables are left untouched, ensuring that the
// varint decoding can continue on the next loop iteration.
n, _ := r.Discard(len(input))
sz -= n
if sz == 0 {
return 0, errShortRead
}

// Fill the buffer: ask for one more byte, but in practice the reader
// will load way more from the underlying stream.
if _, err := r.Peek(1); err != nil {
if err == io.EOF {
err = errShortRead
}
return sz, err
}

// Grab as many bytes as possible from the buffer, then go on to the
// next loop iteration which is going to consume it.
input, _ = r.Peek(r.Buffered())
}
}

func readBool(r *bufio.Reader, sz int, v *bool) (int, error) {
Expand Down
44 changes: 44 additions & 0 deletions read_test.go
Expand Up @@ -3,6 +3,8 @@ package kafka
import (
"bufio"
"bytes"
"io/ioutil"
"math"
"reflect"
"testing"
)
Expand Down Expand Up @@ -167,3 +169,45 @@ func TestReadNewBytes(t *testing.T) {
}
})
}

func BenchmarkWriteVarInt(b *testing.B) {
wb := bufio.NewWriter(ioutil.Discard)

for i := 0; i < b.N; i++ {
writeVarInt(wb, math.MaxInt64)
wb.Flush()
}
}

func BenchmarkReadVarInt(b *testing.B) {
b1 := new(bytes.Buffer)
wb := bufio.NewWriter(b1)

const N = math.MaxInt64
writeVarInt(wb, N)
wb.Flush()

b2 := bytes.NewReader(b1.Bytes())
rb := bufio.NewReader(b2)
n := b1.Len()

for i := 0; i < b.N; i++ {
v := int64(0)
r, err := readVarInt(rb, n, &v)

if err != nil {
b.Fatalf("unexpected error reading a varint from the input: %v", err)
}

if r != 0 {
b.Fatalf("unexpected bytes remaining to be read in the input (%d B)", r)
}

if v != N {
b.Fatalf("value mismatch, expected %d but found %d", N, v)
}

b2.Reset(b1.Bytes())
rb.Reset(b2)
}
}
12 changes: 7 additions & 5 deletions write.go
Expand Up @@ -47,12 +47,14 @@ func writeInt64(w *bufio.Writer, i int64) {
}

func writeVarInt(w *bufio.Writer, i int64) {
i = i<<1 ^ i>>63
for i&0x7f != i {
w.WriteByte(byte(i&0x7f | 0x80))
i >>= 7
u := uint64((i << 1) ^ (i >> 63))

for u >= 0x80 {
w.WriteByte(byte(u) | 0x80)
u >>= 7
}
w.WriteByte(byte(i))

w.WriteByte(byte(u))
}

func varIntLen(i int64) (l int) {
Expand Down

0 comments on commit 03ea927

Please sign in to comment.