Skip to content

Commit

Permalink
opt: take off the codec from eventloop
Browse files Browse the repository at this point in the history
Fixes #115
  • Loading branch information
panjf2000 committed Jul 10, 2020
1 parent 240c9fb commit bc85c34
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 45 deletions.
44 changes: 23 additions & 21 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr) *conn {
fd: fd,
sa: sa,
loop: el,
codec: el.codec,
codec: el.svr.codec,
inboundBuffer: prb.Get(),
outboundBuffer: prb.Get(),
}
Expand Down Expand Up @@ -106,25 +106,31 @@ func (c *conn) read() ([]byte, error) {
return c.codec.Decode(c)
}

func (c *conn) write(buf []byte) {
func (c *conn) write(buf []byte) (err error) {
var outFrame []byte
if outFrame, err = c.codec.Encode(c, buf); err != nil {
return
}
if !c.outboundBuffer.IsEmpty() {
_, _ = c.outboundBuffer.Write(buf)
_, _ = c.outboundBuffer.Write(outFrame)
return
}
n, err := unix.Write(c.fd, buf)
if err != nil {

var n int
if n, err = unix.Write(c.fd, outFrame); err != nil {
if err == unix.EAGAIN {
_, _ = c.outboundBuffer.Write(buf)
_ = c.loop.poller.ModReadWrite(c.fd)
_, _ = c.outboundBuffer.Write(outFrame)
err = c.loop.poller.ModReadWrite(c.fd)
return
}
_ = c.loop.loopCloseConn(c, os.NewSyscallError("write", err))
return
}
if n < len(buf) {
_, _ = c.outboundBuffer.Write(buf[n:])
_ = c.loop.poller.ModReadWrite(c.fd)
if n < len(outFrame) {
_, _ = c.outboundBuffer.Write(outFrame[n:])
err = c.loop.poller.ModReadWrite(c.fd)
}
return
}

func (c *conn) sendTo(buf []byte) error {
Expand Down Expand Up @@ -206,17 +212,13 @@ func (c *conn) BufferLength() int {
return c.inboundBuffer.Length() + len(c.buffer)
}

func (c *conn) AsyncWrite(buf []byte) (err error) {
var encodedBuf []byte
if encodedBuf, err = c.codec.Encode(c, buf); err == nil {
return c.loop.poller.Trigger(func() error {
if c.opened {
c.write(encodedBuf)
}
return nil
})
}
return
func (c *conn) AsyncWrite(buf []byte) error {
return c.loop.poller.Trigger(func() (err error) {
if c.opened {
err = c.write(buf)
}
return
})
}

func (c *conn) SendTo(buf []byte) error {
Expand Down
8 changes: 4 additions & 4 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newTCPConn(conn net.Conn, el *eventloop) *stdConn {
return &stdConn{
conn: conn,
loop: el,
codec: el.codec,
codec: el.svr.codec,
inboundBuffer: prb.Get(),
}
}
Expand Down Expand Up @@ -176,9 +176,9 @@ func (c *stdConn) BufferLength() int {
func (c *stdConn) AsyncWrite(buf []byte) (err error) {
var encodedBuf []byte
if encodedBuf, err = c.codec.Encode(c, buf); err == nil {
c.loop.ch <- func() error {
_, _ = c.conn.Write(encodedBuf)
return nil
c.loop.ch <- func() (err error) {
_, err = c.conn.Write(encodedBuf)
return
}
}
return
Expand Down
14 changes: 7 additions & 7 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type eventloop struct {
ln *listener // listener
idx int // loop index in the server loops list
svr *server // server in loop
codec ICodec // codec for TCP
packet []byte // read packet buffer
poller *netpoll.Poller // epoll or kqueue
connCount int32 // number of active connections in event-loop
Expand Down Expand Up @@ -138,9 +137,10 @@ func (el *eventloop) loopRead(c *conn) error {
for inFrame, _ := c.read(); inFrame != nil; inFrame, _ = c.read() {
out, action := el.eventHandler.React(inFrame, c)
if out != nil {
outFrame, _ := el.codec.Encode(c, out)
el.eventHandler.PreWrite()
c.write(outFrame)
if err = c.write(out); err != nil {
return err
}
}
switch action {
case None:
Expand Down Expand Up @@ -199,8 +199,7 @@ func (el *eventloop) loopCloseConn(c *conn, err error) error {
_ = el.loopWrite(c)
}

err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd)
if err0 == nil && err1 == nil {
if err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd); err0 == nil && err1 == nil {
delete(el.connections, c.fd)
el.calibrateCallback(el, -1)
if el.eventHandler.OnClosed(c, err) == Shutdown {
Expand All @@ -226,8 +225,9 @@ func (el *eventloop) loopWake(c *conn) error {
//}
out, action := el.eventHandler.React(nil, c)
if out != nil {
frame, _ := el.codec.Encode(c, out)
c.write(frame)
if err := c.write(out); err != nil {
return err
}
}

return el.handleAction(c, action)
Expand Down
17 changes: 9 additions & 8 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type eventloop struct {
ch chan interface{} // command channel
idx int // loop index
svr *server // server in loop
codec ICodec // codec for TCP
connCount int32 // number of active connections in event-loop
connections map[*stdConn]struct{} // track all the sockets bound to this loop
eventHandler EventHandler // user eventHandler
Expand Down Expand Up @@ -108,9 +107,11 @@ func (el *eventloop) loopRead(ti *tcpIn) (err error) {
for inFrame, _ := c.read(); inFrame != nil; inFrame, _ = c.read() {
out, action := el.eventHandler.React(inFrame, c)
if out != nil {
outFrame, _ := el.codec.Encode(c, out)
outFrame, _ := c.codec.Encode(c, out)
el.eventHandler.PreWrite()
_, err = c.conn.Write(outFrame)
if _, err = c.conn.Write(outFrame); err != nil {
return el.loopError(c, err)
}
}
switch action {
case None:
Expand All @@ -119,9 +120,6 @@ func (el *eventloop) loopRead(ti *tcpIn) (err error) {
case Shutdown:
return errors.ErrServerShutdown
}
if err != nil {
return el.loopError(c, err)
}
}
_, _ = c.inboundBuffer.Write(c.buffer.Bytes())
bytebuffer.Put(c.buffer)
Expand Down Expand Up @@ -199,8 +197,11 @@ func (el *eventloop) loopWake(c *stdConn) error {
//}
out, action := el.eventHandler.React(nil, c)
if out != nil {
frame, _ := el.codec.Encode(c, out)
_, _ = c.conn.Write(frame)
if frame, err := c.codec.Encode(c, out); err != nil {
return err
} else if _, err = c.conn.Write(frame); err != nil {
return err
}
}

return el.handleAction(c, action)
Expand Down
5 changes: 3 additions & 2 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package gnet

import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -298,7 +299,7 @@ func startCodecClient(network, addr string, multicore, async bool, codec ICodec)
if _, err := io.ReadFull(rd, data2); err != nil {
panic(err)
}
if string(encodedData) != string(data2) && !async {
if !bytes.Equal(encodedData, data2) && !async {
// panic(fmt.Sprintf("mismatch %s/multi-core:%t: %d vs %d bytes, %s:%s", network, multicore,
// len(encodedData), len(data2), string(encodedData), string(data2)))
panic(fmt.Sprintf("mismatch %s/multi-core:%t: %d vs %d bytes", network, multicore, len(encodedData), len(data2)))
Expand Down Expand Up @@ -569,7 +570,7 @@ func startClient(network, addr string, multicore, async bool) {
if _, err := io.ReadFull(rd, data2); err != nil {
panic(err)
}
if string(data) != string(data2) && !async {
if !bytes.Equal(data, data2) && !async {
panic(fmt.Sprintf("mismatch %s/multi-core:%t: %d vs %d bytes\n", network, multicore, len(data), len(data2)))
}
}
Expand Down
2 changes: 0 additions & 2 deletions server_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func (svr *server) activateEventLoops(numEventLoop int) (err error) {
el := &eventloop{
ln: l,
svr: svr,
codec: svr.codec,
poller: p,
packet: make([]byte, 0x10000),
connections: make(map[int]*conn),
Expand All @@ -136,7 +135,6 @@ func (svr *server) activateReactors(numEventLoop int) error {
el := &eventloop{
ln: svr.ln,
svr: svr,
codec: svr.codec,
poller: p,
packet: make([]byte, 0x10000),
connections: make(map[int]*conn),
Expand Down
1 change: 0 additions & 1 deletion server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (svr *server) startEventLoops(numEventLoop int) {
el := &eventloop{
ch: make(chan interface{}, commandBufferSize),
svr: svr,
codec: svr.codec,
connections: make(map[*stdConn]struct{}),
eventHandler: svr.eventHandler,
calibrateCallback: svr.subEventLoopSet.calibrate,
Expand Down

0 comments on commit bc85c34

Please sign in to comment.