diff --git a/connection_unix.go b/connection_unix.go index 2853de2a4..a8bbd60b2 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -27,6 +27,7 @@ import ( "github.com/panjf2000/gnet/internal/netpoll" "github.com/panjf2000/gnet/internal/socket" "github.com/panjf2000/gnet/pkg/mixedbuffer" + "github.com/panjf2000/gnet/pkg/pool/bytebuffer" rbPool "github.com/panjf2000/gnet/pkg/pool/ringbuffer" "github.com/panjf2000/gnet/pkg/ringbuffer" ) @@ -40,8 +41,9 @@ type conn struct { opened bool // connection opened event fired localAddr net.Addr // local addr remoteAddr net.Addr // remote addr - inboundBuffer *ringbuffer.RingBuffer // buffer for data from the peer - outboundBuffer *mixedbuffer.Buffer // buffer for data that is ready to write to the peer + inboundBuffer *ringbuffer.RingBuffer // buffer for leftover data from the peer + transitBuffer *bytebuffer.ByteBuffer // buffer for a complete packet + outboundBuffer *mixedbuffer.Buffer // buffer for data that is eligible to be sent to the peer pollAttachment *netpoll.PollAttachment // connection attachment for poller } @@ -161,26 +163,51 @@ func (c *conn) sendTo(buf []byte) error { // ================================== Non-concurrency-safe API's ================================== func (c *conn) Read() []byte { - buf, _ := c.inboundBuffer.PeekAll() - return buf + head, tail := c.inboundBuffer.PeekAll() + if tail == nil { + return head + } + if c.transitBuffer == nil { + c.transitBuffer = c.inboundBuffer.ByteBuffer() + return c.transitBuffer.B + } + c.transitBuffer.Reset() + _, _ = c.transitBuffer.Write(head) + _, _ = c.transitBuffer.Write(tail) + return c.transitBuffer.B } func (c *conn) ResetBuffer() { c.inboundBuffer.Reset() + if c.transitBuffer != nil { + c.transitBuffer.Reset() + } } func (c *conn) ReadN(n int) (int, []byte) { inBufferLen := c.inboundBuffer.Length() if inBufferLen <= n || n <= 0 { - buf, _ := c.inboundBuffer.PeekAll() - return inBufferLen, buf + return inBufferLen, c.Read() } - buf, _ := c.inboundBuffer.Peek(n) - return len(buf), buf + head, tail := c.inboundBuffer.Peek(n) + if tail == nil { + return n, head + } + if c.transitBuffer == nil { + c.transitBuffer = bytebuffer.Get() + } else { + c.transitBuffer.Reset() + } + _, _ = c.transitBuffer.Write(head) + _, _ = c.transitBuffer.Write(tail) + return n, c.transitBuffer.B } func (c *conn) ShiftN(n int) int { c.inboundBuffer.Discard(n) + if c.transitBuffer != nil { + c.transitBuffer.Reset() + } return n } diff --git a/eventloop_unix.go b/eventloop_unix.go index 4ae60fc4d..20428fc1c 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -127,7 +127,6 @@ func (el *eventloop) loopRead(c *conn) error { return nil } } - _ = c.inboundBuffer.Rewind() return nil } diff --git a/pkg/ringbuffer/ring_buffer_unix.go b/pkg/ringbuffer/ring_buffer_unix.go index 3161896f3..95638fb5c 100644 --- a/pkg/ringbuffer/ring_buffer_unix.go +++ b/pkg/ringbuffer/ring_buffer_unix.go @@ -17,19 +17,46 @@ package ringbuffer -import "golang.org/x/sys/unix" +import ( + "golang.org/x/sys/unix" + + "github.com/panjf2000/gnet/internal/io" +) // ========================= gnet specific APIs ========================= // CopyFromSocket copies data from a socket fd into ring-buffer. func (rb *RingBuffer) CopyFromSocket(fd int) (n int, err error) { - n, err = unix.Read(fd, rb.buf[rb.w:]) + if rb.r == rb.w { + if !rb.isEmpty { + return + } + rb.Reset() + n, err = unix.Read(fd, rb.buf) + if n > 0 { + rb.w += n + rb.isEmpty = false + if rb.w == rb.size { + rb.w = 0 + } + } + return + } + if rb.w < rb.r { + n, err = unix.Read(fd, rb.buf[rb.w:rb.r]) + if n > 0 { + rb.w += n + rb.isEmpty = false + if rb.w == rb.size { + rb.w = 0 + } + } + return + } + n, err = io.Readv(fd, [][]byte{rb.buf[rb.w:], rb.buf[:rb.r]}) if n > 0 { + rb.w = (rb.w + n) % rb.size rb.isEmpty = false - rb.w += n - if rb.w == rb.size { - rb.w = 0 - } } return }