Skip to content

Commit

Permalink
opt: improve the network read with ring-buffer and readv
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Dec 3, 2021
1 parent dfa473d commit 0dcf599
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 15 deletions.
43 changes: 35 additions & 8 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/panjf2000/gnet/internal/netpoll"
"github.com/panjf2000/gnet/internal/socket"
"github.com/panjf2000/gnet/pkg/mixedbuffer"
"github.com/panjf2000/gnet/pkg/pool/bytebuffer"
rbPool "github.com/panjf2000/gnet/pkg/pool/ringbuffer"
"github.com/panjf2000/gnet/pkg/ringbuffer"
)
Expand All @@ -40,8 +41,9 @@ type conn struct {
opened bool // connection opened event fired
localAddr net.Addr // local addr
remoteAddr net.Addr // remote addr
inboundBuffer *ringbuffer.RingBuffer // buffer for data from the peer
outboundBuffer *mixedbuffer.Buffer // buffer for data that is ready to write to the peer
inboundBuffer *ringbuffer.RingBuffer // buffer for leftover data from the peer
transitBuffer *bytebuffer.ByteBuffer // buffer for a complete packet
outboundBuffer *mixedbuffer.Buffer // buffer for data that is eligible to be sent to the peer
pollAttachment *netpoll.PollAttachment // connection attachment for poller
}

Expand Down Expand Up @@ -161,26 +163,51 @@ func (c *conn) sendTo(buf []byte) error {
// ================================== Non-concurrency-safe API's ==================================

func (c *conn) Read() []byte {
buf, _ := c.inboundBuffer.PeekAll()
return buf
head, tail := c.inboundBuffer.PeekAll()
if tail == nil {
return head
}
if c.transitBuffer == nil {
c.transitBuffer = c.inboundBuffer.ByteBuffer()
return c.transitBuffer.B
}
c.transitBuffer.Reset()
_, _ = c.transitBuffer.Write(head)
_, _ = c.transitBuffer.Write(tail)
return c.transitBuffer.B
}

func (c *conn) ResetBuffer() {
c.inboundBuffer.Reset()
if c.transitBuffer != nil {
c.transitBuffer.Reset()
}
}

func (c *conn) ReadN(n int) (int, []byte) {
inBufferLen := c.inboundBuffer.Length()
if inBufferLen <= n || n <= 0 {
buf, _ := c.inboundBuffer.PeekAll()
return inBufferLen, buf
return inBufferLen, c.Read()
}
buf, _ := c.inboundBuffer.Peek(n)
return len(buf), buf
head, tail := c.inboundBuffer.Peek(n)
if tail == nil {
return n, head
}
if c.transitBuffer == nil {
c.transitBuffer = bytebuffer.Get()
} else {
c.transitBuffer.Reset()
}
_, _ = c.transitBuffer.Write(head)
_, _ = c.transitBuffer.Write(tail)
return n, c.transitBuffer.B
}

func (c *conn) ShiftN(n int) int {
c.inboundBuffer.Discard(n)
if c.transitBuffer != nil {
c.transitBuffer.Reset()
}
return n
}

Expand Down
1 change: 0 additions & 1 deletion eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (el *eventloop) loopRead(c *conn) error {
return nil
}
}
_ = c.inboundBuffer.Rewind()
return nil
}

Expand Down
39 changes: 33 additions & 6 deletions pkg/ringbuffer/ring_buffer_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,46 @@

package ringbuffer

import "golang.org/x/sys/unix"
import (
"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/internal/io"
)

// ========================= gnet specific APIs =========================

// CopyFromSocket copies data from a socket fd into ring-buffer.
func (rb *RingBuffer) CopyFromSocket(fd int) (n int, err error) {
n, err = unix.Read(fd, rb.buf[rb.w:])
if rb.r == rb.w {
if !rb.isEmpty {
return
}
rb.Reset()
n, err = unix.Read(fd, rb.buf)
if n > 0 {
rb.w += n
rb.isEmpty = false
if rb.w == rb.size {
rb.w = 0
}
}
return
}
if rb.w < rb.r {
n, err = unix.Read(fd, rb.buf[rb.w:rb.r])
if n > 0 {
rb.w += n
rb.isEmpty = false
if rb.w == rb.size {
rb.w = 0
}
}
return
}
n, err = io.Readv(fd, [][]byte{rb.buf[rb.w:], rb.buf[:rb.r]})
if n > 0 {
rb.w = (rb.w + n) % rb.size
rb.isEmpty = false
rb.w += n
if rb.w == rb.size {
rb.w = 0
}
}
return
}
Expand Down

0 comments on commit 0dcf599

Please sign in to comment.