Skip to content

Commit

Permalink
implement and use ringbuffer in framer
Browse files Browse the repository at this point in the history
  • Loading branch information
Glonee committed Jun 1, 2023
1 parent c96fbd2 commit 0c91c74
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 8 deletions.
16 changes: 8 additions & 8 deletions framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/quic-go/quic-go/internal/ackhandler"
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/utils/ringbuffer"
"github.com/quic-go/quic-go/internal/wire"
"github.com/quic-go/quic-go/quicvarint"
)
Expand All @@ -28,7 +29,7 @@ type framerI struct {
streamGetter streamGetter

activeStreams map[protocol.StreamID]struct{}
streamQueue []protocol.StreamID
streamQueue ringbuffer.RingBuffer[protocol.StreamID]

controlFrameMutex sync.Mutex
controlFrames []wire.Frame
Expand All @@ -45,7 +46,7 @@ func newFramer(streamGetter streamGetter) framer {

func (f *framerI) HasData() bool {
f.mutex.Lock()
hasData := len(f.streamQueue) > 0
hasData := !f.streamQueue.Empty()
f.mutex.Unlock()
if hasData {
return true
Expand Down Expand Up @@ -84,7 +85,7 @@ func (f *framerI) AppendControlFrames(frames []*ackhandler.Frame, maxLen protoco
func (f *framerI) AddActiveStream(id protocol.StreamID) {
f.mutex.Lock()
if _, ok := f.activeStreams[id]; !ok {
f.streamQueue = append(f.streamQueue, id)
f.streamQueue.PushBack(id)
f.activeStreams[id] = struct{}{}
}
f.mutex.Unlock()
Expand All @@ -95,13 +96,12 @@ func (f *framerI) AppendStreamFrames(frames []*ackhandler.Frame, maxLen protocol
var lastFrame *ackhandler.Frame
f.mutex.Lock()
// pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
numActiveStreams := len(f.streamQueue)
numActiveStreams := f.streamQueue.Len()
for i := 0; i < numActiveStreams; i++ {
if protocol.MinStreamFrameSize+length > maxLen {
break
}
id := f.streamQueue[0]
f.streamQueue = f.streamQueue[1:]
id := f.streamQueue.PopFront()
// This should never return an error. Better check it anyway.
// The stream will only be in the streamQueue, if it enqueued itself there.
str, err := f.streamGetter.GetOrOpenSendStream(id)
Expand All @@ -117,7 +117,7 @@ func (f *framerI) AppendStreamFrames(frames []*ackhandler.Frame, maxLen protocol
remainingLen += quicvarint.Len(uint64(remainingLen))
frame, hasMoreData := str.popStreamFrame(remainingLen, v)
if hasMoreData { // put the stream back in the queue (at the end)
f.streamQueue = append(f.streamQueue, id)
f.streamQueue.PushBack(id)
} else { // no more data to send. Stream is not active any more
delete(f.activeStreams, id)
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func (f *framerI) Handle0RTTRejection() error {
defer f.mutex.Unlock()

f.controlFrameMutex.Lock()
f.streamQueue = f.streamQueue[:0]
f.streamQueue.Clear()
for id := range f.activeStreams {
delete(f.activeStreams, id)
}
Expand Down
75 changes: 75 additions & 0 deletions internal/utils/ringbuffer/ringbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package ringbuffer

type RingBuffer[T any] struct {
ring []T
headPos, tailPos int
full bool
}

func (r *RingBuffer[T]) Init(size int) {
r.ring = make([]T, size)
}

func (r *RingBuffer[T]) Len() int {
if r.full {
return len(r.ring)
}
if r.tailPos >= r.headPos {
return r.tailPos - r.headPos
}
return r.tailPos - r.headPos + len(r.ring)
}

func (r *RingBuffer[T]) Empty() bool {
return !r.full && r.headPos == r.tailPos
}

func (r *RingBuffer[T]) PushBack(t T) {
if r.full || len(r.ring) == 0 {
r.grow()
}
r.ring[r.tailPos] = t
r.tailPos++
if r.tailPos == len(r.ring) {
r.tailPos = 0
}
if r.tailPos == r.headPos {
r.full = true
}
}

func (r *RingBuffer[T]) PopFront() T {
if r.Empty() {
panic("github.com/quic-go/quic-go/internal/utils/ringbuffer: pop from an empty queue")
}
r.full = false
t := r.ring[r.headPos]
r.ring[r.headPos] = *new(T)
r.headPos++
if r.headPos == len(r.ring) {
r.headPos = 0
}
return t
}

// Grow the maximum size of the queue.
// This method assume the queue is full.
func (r *RingBuffer[T]) grow() {
oldRing := r.ring
newSize := len(oldRing) * 2
if newSize == 0 {
newSize = 1
}
r.ring = make([]T, newSize)
headLen := copy(r.ring, oldRing[r.headPos:])
copy(r.ring[headLen:], oldRing[:r.headPos])
r.headPos, r.tailPos, r.full = 0, len(oldRing), false
}

func (r *RingBuffer[T]) Clear() {
var zeroValue T
for i := range r.ring {
r.ring[i] = zeroValue
}
r.headPos, r.tailPos, r.full = 0, 0, false
}
12 changes: 12 additions & 0 deletions internal/utils/ringbuffer/ringbuffer_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package ringbuffer

import "testing"

func BenchmarkRingBuffer(b *testing.B) {
r := RingBuffer[int]{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
r.PushBack(i)
r.PopFront()
}
}
13 changes: 13 additions & 0 deletions internal/utils/ringbuffer/ringbuffer_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ringbuffer

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestTestdata(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ringbuffer suite")
}
38 changes: 38 additions & 0 deletions internal/utils/ringbuffer/ringbuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package ringbuffer

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("RingBuffer", func() {
It("push and pop", func() {
r := RingBuffer[int]{}
Expect(len(r.ring)).To(Equal(0))
Expect(func() { r.PopFront() }).To(Panic())
r.PushBack(1)
r.PushBack(2)
r.PushBack(3)
Expect(r.PopFront()).To(Equal(1))
Expect(r.PopFront()).To(Equal(2))
r.PushBack(4)
r.PushBack(5)
Expect(r.Len()).To(Equal(3))
r.PushBack(6)
Expect(r.Len()).To(Equal(4))
Expect(r.PopFront()).To(Equal(3))
Expect(r.PopFront()).To(Equal(4))
Expect(r.PopFront()).To(Equal(5))
Expect(r.PopFront()).To(Equal(6))
})
It("clear", func() {
r := RingBuffer[int]{}
r.Init(2)
r.PushBack(1)
r.PushBack(2)
Expect(r.full).To(BeTrue())
r.Clear()
Expect(r.full).To(BeFalse())
Expect(r.Len()).To(Equal(0))
})
})

0 comments on commit 0c91c74

Please sign in to comment.