Skip to content

Commit

Permalink
Add packetio interface for network buffering
Browse files Browse the repository at this point in the history
We're not able to use `bytes.Buffer` because it would combine multiple
Writes into a single Read. `packetio.Buffer` provides a similar but is
designed to work with packets, avoiding this situation.

There are similar buffers implemented separarely in webrtc/srtp/ice/mux.
These used channels and were too slow to keep up with the read loop. I
wrote a benchmark for comparison:

```
name                   old time/op    new time/op     delta
BenchmarkBuffer14-8       859ns ± 5%      129ns ± 4%   -84.93%
BenchmarkBuffer140-8      832ns ± 4%      154ns ± 4%   -81.43%
BenchmarkBuffer1400-8     825ns ± 8%      351ns ± 4%   -57.49%

name                   old speed      new speed       delta
BenchmarkBuffer14-8    16.3MB/s ± 5%  107.8MB/s ± 3%  +561.08%
BenchmarkBuffer140-8    168MB/s ± 4%    904MB/s ± 4%  +436.70%
BenchmarkBuffer1400-8  1.70GB/s ± 8%   3.99GB/s ± 4%  +134.74%
```

Note that this implementation has an unbounded buffer, while the channel
implementation has no buffer.
  • Loading branch information
kixelated committed Mar 5, 2019
1 parent 36a71eb commit 8a2aa8e
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 0 deletions.
126 changes: 126 additions & 0 deletions packetio/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package packetio

import (
"io"
"sync"
)

type Buffer struct {
mutex sync.Mutex
packets [][]byte
notify chan struct{}
subs int
closed bool
}

func NewBuffer() *Buffer {
return &Buffer{
notify: make(chan struct{}),
}
}

func (b *Buffer) Write(packet []byte) (n int, err error) {
// Copy the packet before adding it.
packet = append([]byte{}, packet...)

b.mutex.Lock()

// Make sure we're not closed.
if b.closed {
b.mutex.Unlock()
return 0, io.ErrClosedPipe
}

var notify chan struct{}

// Decide if we need to wake up any readers.
if b.subs > 0 {
// If so, close the notify channel and make a new one.
// This effectively behaves like a broadcast, waking up any blocked goroutines.
// We close after we release the lock to reduce contention.
notify = b.notify
b.notify = make(chan struct{})

// Reset the subs counter.
b.subs = 0
}

// Add the packet to the queue.
b.packets = append(b.packets, packet)
b.mutex.Unlock()

// Actually close the notify channel down here.
if notify != nil {
close(notify)
}

return len(packet), nil
}

func (b *Buffer) Read(packet []byte) (int, error) {
for {
b.mutex.Lock()

// See if there are any packets in the queue.
if len(b.packets) > 0 {
first := b.packets[0]

// This is a packet-based reader/writer so we can't truncate.
if len(first) > len(packet) {
b.mutex.Unlock()
return 0, io.ErrShortBuffer
}

// Remove our packet and continue.
b.packets = b.packets[1:]
b.mutex.Unlock()

// Actually transfer the data.
n := copy(packet, first)
return n, nil
}

// Make sure the reader isn't actually closed.
// This is done after checking packets to fully read the buffer.
if b.closed {
b.mutex.Unlock()
return 0, io.EOF
}

// Get the current notify channel.
// This will be closed when there is new data available, waking us up.
notify := b.notify

// Increment the subs counter, telling the writer we're waiting.
b.subs++

b.mutex.Unlock()

// Wake for the broadcast.
<-notify
}
}

func (b *Buffer) Close() (err error) {
// note: We don't use defer so we can close the notify channel after unlocking.
// This will unblock goroutines that can grab the lock immediately, instead of blocking again.
b.mutex.Lock()

if b.closed {
b.mutex.Unlock()
return nil
}

b.closed = true

notify := b.notify
subs := b.subs
b.mutex.Unlock()

// Overkill, but only close the notify when there are subs.
if subs > 0 {
close(notify)
}

return nil
}
138 changes: 138 additions & 0 deletions packetio/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package packetio

import (
"bytes"
"io"
"testing"
)

func TestBuffer(t *testing.T) {
buffer := NewBuffer()
packet := make([]byte, 4)

// Write once
n, err := buffer.Write([]byte{0, 1})
if err != nil {
t.Error(err)
}

if n != 2 {
t.Error("wrong size")
}

// Read once
n, err = buffer.Read(packet)
if err != nil {
t.Error(err)
}

if n != 2 {
t.Error("wrong size")
}

if !bytes.Equal(packet[:n], []byte{0, 1}) {
t.Error("wrong output")
}

// Write twice
n, err = buffer.Write([]byte{2, 3, 4})

if n != 3 {
t.Error("wrong size")
}

n, err = buffer.Write([]byte{5, 6, 7})
if err != nil {
t.Error(err)
}

if n != 3 {
t.Error("wrong size")
}

// Read twice
n, err = buffer.Read(packet)
if err != nil {
t.Error(err)
}

if n != 3 {
t.Error("wrong size")
}

if !bytes.Equal(packet[:n], []byte{2, 3, 4}) {
t.Error("wrong output")
}

n, err = buffer.Read(packet)
if err != nil {
t.Error(err)
}

if n != 3 {
t.Error("wrong size")
}

if !bytes.Equal(packet[:n], []byte{5, 6, 7}) {
t.Error("wrong output")
}

// Close
err = buffer.Close()
if err != nil {
t.Error(err)
}

_, err = buffer.Write([]byte{3})
if err == nil {
t.Error("expected error")
}

_, err = buffer.Read(packet)
if err != io.EOF {
t.Error("expected EOF")
}
}

func benchmarkBuffer(b *testing.B, size int64) {
buffer := NewBuffer()
b.SetBytes(size)

go func() {
packet := make([]byte, size)

for {
_, err := buffer.Read(packet)
if err == io.EOF {
return
} else if err != nil {
b.Fatal(err)
}
}
}()

packet := make([]byte, size)

b.ResetTimer()

for i := 0; i < b.N; i += 1 {
_, err := buffer.Write(packet)
if err != nil {
b.Fatal(err)
}
}

buffer.Close()
}

func BenchmarkBuffer14(b *testing.B) {
benchmarkBuffer(b, 14)
}

func BenchmarkBuffer140(b *testing.B) {
benchmarkBuffer(b, 140)
}

func BenchmarkBuffer1400(b *testing.B) {
benchmarkBuffer(b, 1400)
}

0 comments on commit 8a2aa8e

Please sign in to comment.