Skip to content

Commit

Permalink
Feat/fast modulo (#1)
Browse files Browse the repository at this point in the history
* Use uint32 indices
* Use fastdiv for non-power-of-two sizes
* Improve README
* New ringbuffer no longer errors
  • Loading branch information
sevagh committed Dec 1, 2019
1 parent a8e0916 commit 51385df
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 80 deletions.
9 changes: 4 additions & 5 deletions README.md
Expand Up @@ -2,12 +2,11 @@ Two ringbuffers written in Go.

### ringbuffer1

[ringbuffer1](./ringbuffer1) stores bytes. It implements an indexing strategy learned from https://www.snellman.net/blog/archive/2016-12-13-ring-buffers/
[ringbuffer1](./ringbuffer1) stores bytes. It features:

Dependencies for ringbuffer1 are for running tests:

* https://github.com/flyingmutant/rapid
* https://github.com/google/gofuzz
1. indexing strategy learned from https://www.snellman.net/blog/archive/2016-12-13-ring-buffers/
2. https://github.com/bmkessler/fastdiv for faster modulo on the read/write indices
2. https://github.com/google/gofuzz and https://github.com/flyingmutant/rapid for testing

### ringbuffer2

Expand Down
2 changes: 1 addition & 1 deletion ringbuffer1/example_ringbuffer_test.go
Expand Up @@ -9,7 +9,7 @@ import (
func ExampleRingbuffer() {
coolText := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce tincidunt nisi tincidunt velit euismod gravida. Aenean at turpis nec lectus faucibus lobortis vitae at arcu. In hac habitasse platea dictumst. Praesent commodo tellus vitae massa maximus porta. Etiam tortor justo, pulvinar nec tempus sit amet, sodales sit amet libero. Nullam non massa mi. Aenean laoreet arcu nec efficitur interdum. Donec vel neque id enim ullamcorper congue nec cursus metus. In hac habitasse platea dictumst. Vestibulum gravida enim non luctus consectetur. Maecenas sit amet risus id orci ultrices viverra dapibus a justo. Nulla facilisi. Nulla velit justo, convallis vel rhoncus a, laoreet ac lectus. Duis ex augue, facilisis id tortor eu, vulputate consectetur felis. Curabitur vitae aliquam nisi. Fusce blandit placerat metus eu elementum. Nam sed lectus tellus. Etiam nec eros sed turpis pulvinar interdum in sed quam. Phasellus tempus lobortis justo. Nullam vulputate nisl sed felis porta, sit amet auctor lectus porttitor. Aenean neque quam, luctus viverra felis eu, feugiat venenatis diam. Nulla facilisi. Nullam luctus augue erat, sit amet suscipit tortor eleifend non. Maecenas congue luctus nisi, ac ullamcorper ligula tristique nec. Nullam non magna bibendum, pretium lorem eget, faucibus nisl. Praesent risus tellus, commodo vitae convallis ac, dictum at felis. Cras lacus risus, accumsan nec lectus sit amet, consectetur pharetra ante. Nulla eget bibendum erat, ut placerat sapien. Mauris nulla magna, fringilla sed turpis eget, sodales luctus orci. Vestibulum lectus sapien, facilisis nec eros vitae, dictum feugiat purus. Phasellus felis nunc, condimentum et dolor mattis, sagittis varius mauris. Proin diam turpis, efficitur vitae ipsum convallis, cursus imperdiet libero. Etiam porta sapien sapien, quis fringilla magna porttitor a. Maecenas ac urna vitae ante dapibus convallis. Pellentesque sit amet varius justo. Morbi tristique elementum pellentesque. Vivamus id sodales risus, eget condimentum libero. Sed maximus, metus vitae mollis ultricies, ligula purus sodales enim, ut accumsan felis nibh non enim. Phasellus in orci sem. Sed ac felis rutrum, aliquet felis eget, tristique sem. Quisque non justo pharetra, condimentum nibh sit amet, condimentum neque. Donec a odio at quam hendrerit eleifend at ac mi. Vivamus a iaculis sem, quis tempor ante. Vestibulum quis ligula nisi. Vestibulum commodo orci at lectus consequat, consectetur eleifend est tristique. Proin consectetur elementum metus eget fermentum. Sed facilisis sapien id orci cursus pellentesque.")

ringbuf, _ := ringbuffer.NewRingbuffer(16)
ringbuf := ringbuffer.NewRingbuffer(16)
oneByte := make([]byte, 1)

for _, b := range coolText {
Expand Down
1 change: 1 addition & 0 deletions ringbuffer1/go.mod
Expand Up @@ -3,6 +3,7 @@ module github.com/sevagh/ringworm/ringbuffer1
go 1.13

require (
github.com/bmkessler/fastdiv v0.0.0-20190227075523-41d5178f2044
github.com/flyingmutant/rapid v0.0.0-20190904072629-5761511f78c8
github.com/google/gofuzz v1.0.0
)
2 changes: 2 additions & 0 deletions ringbuffer1/go.sum
@@ -1,3 +1,5 @@
github.com/bmkessler/fastdiv v0.0.0-20190227075523-41d5178f2044 h1:8Rz0TcIbkvU+x53bDQgezQ3tbjrQSpZRr6h9JnR9lZU=
github.com/bmkessler/fastdiv v0.0.0-20190227075523-41d5178f2044/go.mod h1:OI0uaNyGvxANSxteY6/mFRZs9EcQGqK30Bd1wqQj9zQ=
github.com/flyingmutant/rapid v0.0.0-20190904072629-5761511f78c8 h1:OnvMbsaIwSDKu9PAFhSLcjJZcCpW/cTAwSa2ysl2NrQ=
github.com/flyingmutant/rapid v0.0.0-20190904072629-5761511f78c8/go.mod h1:DPTQ0FCCNncrUta8PWuWx0OTWIS4Q8RxPXn45rqMoPg=
github.com/flyingmutant/rapid v0.2.0 h1:MuUmVHJ+p4IoJ5BiUZgHHrgzeu9ieEPUYdPeR4or0NI=
Expand Down
57 changes: 29 additions & 28 deletions ringbuffer1/ringbuffer.go
Expand Up @@ -4,11 +4,10 @@ Package ringbuffer implements a simple circular buffer.
The indexing strategy is taken from the conversations here:
https://www.snellman.net/blog/archive/2016-12-13-ring-buffers/
The read and write pointers are int32s that are stored modulo 2*capacity,
The read and write pointers are uint32s that are stored modulo 2*capacity,
and are moduloed with capacity to index the underlying []byte storage.
Here are some of the characteristics:
- Size must be power of two (for efficient modulo)
- SPSC (single producer single consumer)
- Lock-free using sync.atomic
- Fixed size, no growing
Expand All @@ -21,29 +20,33 @@ package ringbuffer
import (
"fmt"
"sync/atomic"

"github.com/bmkessler/fastdiv"
)

// A Ringbuffer is a struct that allows users to store and read []byte data.
type Ringbuffer struct {
read int32
write int32
read uint32
write uint32
buf []byte
n1 fastdiv.Uint32
n2 fastdiv.Uint32
}

func (r *Ringbuffer) mask(ptr int32) int32 {
return ptr & (int32(len(r.buf)) - 1)
func (r *Ringbuffer) mask(ptr uint32) uint32 {
return r.n1.Mod(ptr)
}

func (r *Ringbuffer) mask2(ptr int32) int32 {
return ptr & (2*int32(len(r.buf)) - 1)
func (r *Ringbuffer) mask2(ptr uint32) uint32 {
return r.n2.Mod(ptr)
}

func (r *Ringbuffer) writePtr() int32 {
return atomic.LoadInt32(&r.write)
func (r *Ringbuffer) writePtr() uint32 {
return atomic.LoadUint32(&r.write)
}

func (r *Ringbuffer) readPtr() int32 {
return atomic.LoadInt32(&r.read)
func (r *Ringbuffer) readPtr() uint32 {
return atomic.LoadUint32(&r.read)
}

// Size returns the size (bytes written by the user) of the ringbuffer.
Expand All @@ -69,17 +72,15 @@ func (r *Ringbuffer) Capacity() int {
}

// NewRingbuffer creates a ringbuffer with the specified capacity.
// Note that the capacity must be a power of two or an error is returned.
func NewRingbuffer(capacity int) (Ringbuffer, error) {
if (capacity == 0) || ((capacity & (capacity - 1)) != 0) {
return Ringbuffer{}, fmt.Errorf("please use a power-of-two size")
}
func NewRingbuffer(capacity int) Ringbuffer {
buf := make([]byte, capacity)
return Ringbuffer{
read: 0,
write: 0,
buf: buf,
}, nil
n1: fastdiv.NewUint32(uint32(len(buf))),
n2: fastdiv.NewUint32(uint32(2 * len(buf))),
}
}

// Write copies all the bytes in the provided []byte slice into the ringbuffer.
Expand All @@ -93,9 +94,9 @@ func (r *Ringbuffer) Write(buf []byte) error {
if len(buf) > emptyCount {
return fmt.Errorf("write %d is too big for remaining capacity %d", len(buf), emptyCount)
}
desiredWrite := int32(len(buf))
desiredWrite := uint32(len(buf))

capacity := int32(len(r.buf))
capacity := uint32(len(r.buf))
writeIdx := r.mask(r.writePtr())

copy(r.buf[writeIdx:], buf)
Expand All @@ -106,8 +107,8 @@ func (r *Ringbuffer) Write(buf []byte) error {
copy(r.buf, buf[remain:])
}

atomic.AddInt32(&r.write, desiredWrite)
atomic.SwapInt32(&r.write, r.mask2(r.writePtr()))
atomic.AddUint32(&r.write, desiredWrite)
atomic.SwapUint32(&r.write, r.mask2(r.writePtr()))

return nil
}
Expand All @@ -128,13 +129,13 @@ func (r *Ringbuffer) Read(buf []byte) {
if size < readCountTmp {
readCountTmp = r.Size()
}
readCount := int32(readCountTmp)
readCount := uint32(readCountTmp)

capacity := int32(len(r.buf))
capacity := uint32(len(r.buf))
readIdx := r.mask(r.readPtr())

var remain int32 = 0
var firstChunk int32 = 0
var remain uint32 = 0
var firstChunk uint32 = 0
possibleFirstChunk := capacity - readIdx

if readCount > possibleFirstChunk {
Expand All @@ -147,8 +148,8 @@ func (r *Ringbuffer) Read(buf []byte) {
copy(buf, r.buf[readIdx:readIdx+firstChunk])
copy(buf[firstChunk:], r.buf[:remain])

atomic.AddInt32(&r.read, int32(readCount))
atomic.SwapInt32(&r.read, r.mask2(r.readPtr()))
atomic.AddUint32(&r.read, uint32(readCount))
atomic.SwapUint32(&r.read, r.mask2(r.readPtr()))
}

// Drain creates and returns a []byte slice containing all data in the
Expand Down
45 changes: 17 additions & 28 deletions ringbuffer1/ringbuffer_bench_test.go
@@ -1,43 +1,32 @@
package ringbuffer_test

import (
"encoding/binary"
"testing"

"github.com/sevagh/ringworm/ringbuffer1"
)

var CoolText []byte = []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce tincidunt nisi tincidunt velit euismod gravida. Aenean at turpis nec lectus faucibus lobortis vitae at arcu. In hac habitasse platea dictumst. Praesent commodo tellus vitae massa maximus porta. Etiam tortor justo, pulvinar nec tempus sit amet, sodales sit amet libero. Nullam non massa mi. Aenean laoreet arcu nec efficitur interdum. Donec vel neque id enim ullamcorper congue nec cursus metus. In hac habitasse platea dictumst. Vestibulum gravida enim non luctus consectetur. Maecenas sit amet risus id orci ultrices viverra dapibus a justo. Nulla facilisi. Nulla velit justo, convallis vel rhoncus a, laoreet ac lectus. Duis ex augue, facilisis id tortor eu, vulputate consectetur felis. Curabitur vitae aliquam nisi. Fusce blandit placerat metus eu elementum. Nam sed lectus tellus. Etiam nec eros sed turpis pulvinar interdum in sed quam. Phasellus tempus lobortis justo. Nullam vulputate nisl sed felis porta, sit amet auctor lectus porttitor. Aenean neque quam, luctus viverra felis eu, feugiat venenatis diam. Nulla facilisi. Nullam luctus augue erat, sit amet suscipit tortor eleifend non. Maecenas congue luctus nisi, ac ullamcorper ligula tristique nec. Nullam non magna bibendum, pretium lorem eget, faucibus nisl. Praesent risus tellus, commodo vitae convallis ac, dictum at felis. Cras lacus risus, accumsan nec lectus sit amet, consectetur pharetra ante. Nulla eget bibendum erat, ut placerat sapien. Mauris nulla magna, fringilla sed turpis eget, sodales luctus orci. Vestibulum lectus sapien, facilisis nec eros vitae, dictum feugiat purus. Phasellus felis nunc, condimentum et dolor mattis, sagittis varius mauris. Proin diam turpis, efficitur vitae ipsum convallis, cursus imperdiet libero. Etiam porta sapien sapien, quis fringilla magna porttitor a. Maecenas ac urna vitae ante dapibus convallis. Pellentesque sit amet varius justo. Morbi tristique elementum pellentesque. Vivamus id sodales risus, eget condimentum libero. Sed maximus, metus vitae mollis ultricies, ligula purus sodales enim, ut accumsan felis nibh non enim. Phasellus in orci sem. Sed ac felis rutrum, aliquet felis eget, tristique sem. Quisque non justo pharetra, condimentum nibh sit amet, condimentum neque. Donec a odio at quam hendrerit eleifend at ac mi. Vivamus a iaculis sem, quis tempor ante. Vestibulum quis ligula nisi. Vestibulum commodo orci at lectus consequat, consectetur eleifend est tristique. Proin consectetur elementum metus eget fermentum. Sed facilisis sapien id orci cursus pellentesque.")

func BenchmarkRingbufferSingleByteManyRoundTrips(b *testing.B) {
//ringbuffer of size one so that every read and write of 1 byte in my bench loop is
// "maximally expensive" (wraparound logic etc.)? or is that crazy
ringbuf, _ := ringbuffer.NewRingbuffer(1)
oneByte := make([]byte, 1)

b.ResetTimer()
func BenchmarkManyRingbuffersBillionsOfIntegers(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, b := range CoolText {
//just round-trip single bytes into the ringbuffer
oneByte[0] = b
ringbuf.Write(oneByte)
ringbuf.Read(oneByte)
}
}
}
ringbuf := ringbuffer.NewRingbuffer(64)
writeBuf := make([]byte, 4)
readBuf := make([]byte, 4)

func BenchmarkRingbufferManyBytesManyRoundTrips(b *testing.B) {
ringbuf, _ := ringbuffer.NewRingbuffer(64)
readBuf := make([]byte, 32)
var chunk int = 0
num := uint32(0)
for j := uint32(0); j < 1000000000; j++ {
num = num*17 + 255

b.ResetTimer()
for i := 0; i < b.N; i++ {
for chunk = 0; chunk < len(CoolText)-32; chunk += 32 {
ringbuf.Write(CoolText[chunk : chunk+32])
binary.LittleEndian.PutUint32(writeBuf, num)

ringbuf.Write(writeBuf)
ringbuf.Read(readBuf)

read := binary.LittleEndian.Uint32(readBuf)

if num != read {
b.Errorf("num not same after round trip")
}
}
//chunk -= 32
remain := len(CoolText) - chunk
ringbuf.Read(readBuf[:remain])
}
}
2 changes: 1 addition & 1 deletion ringbuffer1/ringbuffer_property_test.go
Expand Up @@ -17,7 +17,7 @@ type ringbufferMachine struct {
func (m *ringbufferMachine) Init(t *rapid.T) {
n := rapid.IntsRange(1, 20).Draw(t, "n").(int)
ringbufSizePowerOfTwo := 1 << n
m.r, _ = ringbuffer.NewRingbuffer(ringbufSizePowerOfTwo)
m.r = ringbuffer.NewRingbuffer(ringbufSizePowerOfTwo)

t.Logf("Created ringbuffer with size 2^%d = %d\n", n, ringbufSizePowerOfTwo)
m.n = n
Expand Down
27 changes: 10 additions & 17 deletions ringbuffer1/ringbuffer_test.go
Expand Up @@ -9,15 +9,8 @@ import (
"github.com/sevagh/ringworm/ringbuffer1"
)

func TestRingbufferNonPowerOfTwoSize(t *testing.T) {
_, err := ringbuffer.NewRingbuffer(3)
if err == nil {
t.Errorf("Expected error when creating non-power-of-two capacity ringbuffer")
}
}

func TestRingbufferEmptyReadDoesNothing(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(4)
ringbuf := ringbuffer.NewRingbuffer(4)

emptyBytes := make([]byte, 4)
readBuf := make([]byte, 4)
Expand All @@ -29,7 +22,7 @@ func TestRingbufferEmptyReadDoesNothing(t *testing.T) {
}

func TestRingbufferReadTooMuchOnlyDrains(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(4)
ringbuf := ringbuffer.NewRingbuffer(4)

writeBuf := []byte{0, 1, 2, 3}
err := ringbuf.Write(writeBuf)
Expand Down Expand Up @@ -101,7 +94,7 @@ func TestRingbufferReadTooMuchOnlyDrains(t *testing.T) {
}

func TestRingbufferFillCount(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(128)
ringbuf := ringbuffer.NewRingbuffer(128)
if !ringbuf.Empty() {
t.Errorf("Expected ringbuf to be empty")
}
Expand All @@ -127,7 +120,7 @@ func TestRingbufferFillCount(t *testing.T) {
}

func TestRingbufferWrite(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(32)
ringbuf := ringbuffer.NewRingbuffer(32)

testData := "hello, world!"
dataBuf := []byte(testData)
Expand All @@ -152,7 +145,7 @@ func TestRingbufferWrite(t *testing.T) {
}

func TestRingbufferWriteTooMuch(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(4)
ringbuf := ringbuffer.NewRingbuffer(4)

testData := "hello, world!"
dataBuf := []byte(testData)
Expand All @@ -164,7 +157,7 @@ func TestRingbufferWriteTooMuch(t *testing.T) {
}

func TestRingbufferWriteMultiple(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(32)
ringbuf := ringbuffer.NewRingbuffer(32)

testData := []string{
"aaaaaaaaaaaaaaaa",
Expand Down Expand Up @@ -198,7 +191,7 @@ func TestRingbufferWriteMultiple(t *testing.T) {
}

func TestRingbufferWriteConcurrent(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(512)
ringbuf := ringbuffer.NewRingbuffer(512)

wg := sync.WaitGroup{}
wg.Add(2)
Expand Down Expand Up @@ -230,7 +223,7 @@ func TestRingbufferWriteConcurrent(t *testing.T) {
}

func TestRingbufferDrain(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(32)
ringbuf := ringbuffer.NewRingbuffer(32)

testData := []string{
"aaaaaaaaaaaaaaaa",
Expand Down Expand Up @@ -259,7 +252,7 @@ func TestRingbufferDrain(t *testing.T) {
}

func TestRingbufferWriteptrAdvances(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(64)
ringbuf := ringbuffer.NewRingbuffer(64)

testData := []string{
"aaaaaaaaaaaaaaaa",
Expand Down Expand Up @@ -297,7 +290,7 @@ func TestRingbufferWriteptrAdvances(t *testing.T) {
}

func TestRingbufferFuzzBytes(t *testing.T) {
ringbuf, _ := ringbuffer.NewRingbuffer(128)
ringbuf := ringbuffer.NewRingbuffer(128)

writeBuf := make([]byte, 64)

Expand Down

0 comments on commit 51385df

Please sign in to comment.