Skip to content

Commit

Permalink
opt: improve the poller waking logic
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Feb 5, 2021
1 parent 00bea60 commit 4d8accb
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 14 deletions.
8 changes: 6 additions & 2 deletions internal/netpoll/epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package netpoll
import (
"os"
"runtime"
"sync/atomic"
"unsafe"

"github.com/panjf2000/gnet/errors"
Expand All @@ -39,6 +40,7 @@ type Poller struct {
fd int // epoll fd
wfd int // wake fd
wfdBuf []byte // wfd buffer to read packet
netpollWakeSig int32
asyncTaskQueue queue.AsyncTaskQueue
}

Expand Down Expand Up @@ -83,8 +85,9 @@ var (

// Trigger wakes up the poller blocked in waiting for network-events and runs jobs in asyncTaskQueue.
func (p *Poller) Trigger(task queue.Task) (err error) {
if p.asyncTaskQueue.Enqueue(task) == 1 {
for _, err = unix.Write(p.wfd, b); err != nil; _, err = unix.Write(p.wfd, b) {
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) {
for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) {
}
}
return os.NewSyscallError("write", err)
Expand Down Expand Up @@ -134,6 +137,7 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
}
atomic.StoreInt32(&p.netpollWakeSig, 0)
}

if n == el.size {
Expand Down
16 changes: 10 additions & 6 deletions internal/netpoll/kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package netpoll
import (
"os"
"runtime"
"sync/atomic"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/logging"
Expand All @@ -36,6 +37,7 @@ import (
// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int
netpollWakeSig int32
asyncTaskQueue queue.AsyncTaskQueue
}

Expand Down Expand Up @@ -73,9 +75,10 @@ var wakeChanges = []unix.Kevent_t{{
}}

// Trigger wakes up the poller blocked in waiting for network-events and runs jobs in asyncTaskQueue.
func (p *Poller) Trigger(job queue.Task) (err error) {
if p.asyncTaskQueue.Enqueue(job) == 1 {
for _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err != nil; _, err = unix.Kevent(p.fd, wakeChanges, nil, nil) {
func (p *Poller) Trigger(task queue.Task) (err error) {
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.netpollWakeSig, 0, 1) {
for _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Kevent(p.fd, wakeChanges, nil, nil) {
}
}
return os.NewSyscallError("kevent trigger", err)
Expand All @@ -84,11 +87,11 @@ func (p *Poller) Trigger(job queue.Task) (err error) {
// Polling blocks the current goroutine, waiting for network-events.
func (p *Poller) Polling(callback func(fd int, filter int16) error) error {
el := newEventList(InitEvents)
var wakenUp bool

var (
ts unix.Timespec
tsp *unix.Timespec
ts unix.Timespec
tsp *unix.Timespec
wakenUp bool
)
for {
n, err := unix.Kevent(p.fd, nil, el.events, tsp)
Expand Down Expand Up @@ -132,6 +135,7 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error {
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
}
atomic.StoreInt32(&p.netpollWakeSig, 0)
}

if n == el.size {
Expand Down
6 changes: 2 additions & 4 deletions internal/netpoll/queue/lock_free_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ import (
type lockFreeQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
size int32
}

type node struct {
Expand All @@ -107,7 +106,7 @@ func NewLockFreeQueue() AsyncTaskQueue {
}

// Enqueue puts the given value v at the tail of the queue.
func (q *lockFreeQueue) Enqueue(task Task) int {
func (q *lockFreeQueue) Enqueue(task Task) {
n := &node{value: task}
loop:
tail := load(&q.tail)
Expand All @@ -119,7 +118,7 @@ loop:
if cas(&tail.next, next, n) {
// Enqueue is done. Try to swing tail to the inserted node.
cas(&q.tail, tail, n)
return int(atomic.AddInt32(&q.size, 1))
return
}
} else { // tail was not pointing to the last node
// Try to swing Tail to the next node.
Expand Down Expand Up @@ -149,7 +148,6 @@ loop:
// Read value before CAS, otherwise another dequeue might free the next node.
task := next.value
if cas(&q.head, head, next) {
atomic.AddInt32(&q.size, -1)
// Dequeue is done. return value.
return task
}
Expand Down
2 changes: 1 addition & 1 deletion internal/netpoll/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ type Task func() error

// AsyncTaskQueue is a queue storing asynchronous tasks.
type AsyncTaskQueue interface {
Enqueue(Task) int
Enqueue(Task)
Dequeue() Task
}
2 changes: 1 addition & 1 deletion pool/goroutine/goroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
// ExpiryDuration is the interval time to clean up those expired workers.
ExpiryDuration = 10 * time.Second

// Nonblocking decides what to do when submitting a new job to a full worker pool: waiting for a available worker
// Nonblocking decides what to do when submitting a new task to a full worker pool: waiting for a available worker
// or returning nil directly.
Nonblocking = true
)
Expand Down

0 comments on commit 4d8accb

Please sign in to comment.