Skip to content

Commit

Permalink
opt: refactor the inside AsyncTaskQueue to make it more generic
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Jul 11, 2021
1 parent bce2c2b commit 2d1a463
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 43 deletions.
14 changes: 3 additions & 11 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ package gnet
import (
"os"

"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/socket"
"golang.org/x/sys/unix"
)

func (svr *server) acceptNewConnection(fd int) error {
Expand All @@ -47,16 +48,7 @@ func (svr *server) acceptNewConnection(fd int) error {
el := svr.lb.next(netAddr)
c := newTCPConn(nfd, el, sa, netAddr)

err = el.poller.UrgentTrigger(func(_ []byte) (err error) {
if err = el.poller.AddRead(nfd); err != nil {
_ = unix.Close(nfd)
c.releaseTCP()
return
}
el.connections[nfd] = c
err = el.loopOpen(c)
return
})
err = el.poller.UrgentTrigger(el.loopInsert, c)
if err != nil {
_ = unix.Close(nfd)
c.releaseTCP()
Expand Down
20 changes: 10 additions & 10 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ func (c *conn) read() ([]byte, error) {
}

func (c *conn) write(buf []byte) (err error) {
if !c.opened {
return
}
var outFrame []byte
if outFrame, err = c.codec.Encode(c, buf); err != nil {
return
Expand Down Expand Up @@ -142,6 +139,13 @@ func (c *conn) write(buf []byte) (err error) {
return
}

func (c *conn) asyncWrite(itf interface{}) error {
if !c.opened {
return nil
}
return c.write(itf.([]byte))
}

func (c *conn) sendTo(buf []byte) error {
return unix.Sendto(c.fd, buf, 0, c.sa)
}
Expand Down Expand Up @@ -222,23 +226,19 @@ func (c *conn) BufferLength() int {
}

func (c *conn) AsyncWrite(buf []byte) error {
return c.loop.poller.Trigger(c.write, buf)
return c.loop.poller.Trigger(c.asyncWrite, buf)
}

func (c *conn) SendTo(buf []byte) error {
return c.sendTo(buf)
}

func (c *conn) Wake() error {
return c.loop.poller.UrgentTrigger(func(_ []byte) error {
return c.loop.loopWake(c)
})
return c.loop.poller.UrgentTrigger(func(_ interface{}) error { return c.loop.loopWake(c) }, nil)
}

func (c *conn) Close() error {
return c.loop.poller.Trigger(func(_ []byte) error {
return c.loop.loopCloseConn(c, nil)
}, nil)
return c.loop.poller.Trigger(func(_ interface{}) error { return c.loop.loopCloseConn(c, nil) }, nil)
}

func (c *conn) Context() interface{} { return c.ctx }
Expand Down
13 changes: 12 additions & 1 deletion eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ func (el *eventloop) loopAccept(fd int) error {
return nil
}

func (el *eventloop) loopInsert(itf interface{}) error {
c := itf.(*conn)
if err := el.poller.AddRead(c.fd); err != nil {
_ = unix.Close(c.fd)
c.releaseTCP()
return nil
}
el.connections[c.fd] = c
return el.loopOpen(c)
}

func (el *eventloop) loopOpen(c *conn) error {
c.opened = true
el.addConn(1)
Expand Down Expand Up @@ -281,7 +292,7 @@ func (el *eventloop) loopTicker(ctx context.Context) {
switch action {
case None:
case Shutdown:
err := el.poller.UrgentTrigger(func(_ []byte) error { return gerrors.ErrServerShutdown })
err := el.poller.UrgentTrigger(func(_ interface{}) error { return gerrors.ErrServerShutdown }, nil)
el.getLogger().Debugf("stopping ticker in event-loop(%d) from Tick(), UrgentTrigger:%v", el.idx, err)
}
if timer == nil {
Expand Down
12 changes: 6 additions & 6 deletions internal/netpoll/epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ var (
//
// Note that priorAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(f queue.TaskFunc) (err error) {
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run = f
task.Run, task.Arg = fn, arg
p.priorAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) {
for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) {
Expand All @@ -106,9 +106,9 @@ func (p *Poller) UrgentTrigger(f queue.TaskFunc) (err error) {
// call this method when the task is not so urgent, for instance writing data back to client.
//
// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(f queue.TaskFunc, buf []byte) (err error) {
func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Buf = f, buf
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) {
for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) {
Expand Down Expand Up @@ -154,7 +154,7 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
wakenUp = false
task := p.priorAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.priorAsyncTaskQueue.Dequeue() {
switch err = task.Run(task.Buf); err {
switch err = task.Run(task.Arg); err {
case nil:
case errors.ErrServerShutdown:
return err
Expand All @@ -167,7 +167,7 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
switch err = task.Run(task.Buf); err {
switch err = task.Run(task.Arg); err {
case nil:
case errors.ErrServerShutdown:
return err
Expand Down
12 changes: 6 additions & 6 deletions internal/netpoll/kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ var wakeChanges = []unix.Kevent_t{{
//
// Note that priorAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(f queue.TaskFunc) (err error) {
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run = f
task.Run, task.Arg = fn, arg
p.priorAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) {
for _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Kevent(p.fd, wakeChanges, nil, nil) {
Expand All @@ -97,9 +97,9 @@ func (p *Poller) UrgentTrigger(f queue.TaskFunc) (err error) {
// call this method when the task is not so urgent, for instance writing data back to client.
//
// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog.
func (p *Poller) Trigger(f queue.TaskFunc, buf []byte) (err error) {
func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Buf = f, buf
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) {
for _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Kevent(p.fd, wakeChanges, nil, nil) {
Expand Down Expand Up @@ -152,7 +152,7 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error {
wakenUp = false
task := p.priorAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.priorAsyncTaskQueue.Dequeue() {
switch err = task.Run(task.Buf); err {
switch err = task.Run(task.Arg); err {
case nil:
case errors.ErrServerShutdown:
return err
Expand All @@ -165,7 +165,7 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
switch err = task.Run(task.Buf); err {
switch err = task.Run(task.Arg); err {
case nil:
case errors.ErrServerShutdown:
return err
Expand Down
6 changes: 3 additions & 3 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ package queue
import "sync"

// TaskFunc is the callback function executed by poller.
type TaskFunc func([]byte) error
type TaskFunc func(interface{}) error

// Task is a wrapper that contains function and its argument.
type Task struct {
Run TaskFunc
Buf []byte
Arg interface{}
}

var taskPool = sync.Pool{New: func() interface{} { return new(Task) }}
Expand All @@ -40,7 +40,7 @@ func GetTask() *Task {

// PutTask puts the trashy Task back in pool.
func PutTask(task *Task) {
task.Run, task.Buf = nil, nil
task.Run, task.Arg = nil, nil
taskPool.Put(task)
}

Expand Down
8 changes: 2 additions & 6 deletions server_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ func (svr *server) stop(s Server) {

// Notify all loops to close by closing all listeners
svr.lb.iterate(func(i int, el *eventloop) bool {
err := el.poller.UrgentTrigger(func(_ []byte) error {
return errors.ErrServerShutdown
})
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrServerShutdown }, nil)
if err != nil {
svr.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping server")
}
Expand All @@ -211,9 +209,7 @@ func (svr *server) stop(s Server) {

if svr.mainLoop != nil {
svr.ln.close()
err := svr.mainLoop.poller.UrgentTrigger(func(_ []byte) error {
return errors.ErrServerShutdown
})
err := svr.mainLoop.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrServerShutdown }, nil)
if err != nil {
svr.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping server")
}
Expand Down

0 comments on commit 2d1a463

Please sign in to comment.