Skip to content

Commit

Permalink
fix: avoid starving sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Feb 8, 2021
1 parent 4d8accb commit e315252
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 2 deletions.
10 changes: 9 additions & 1 deletion internal/netpoll/epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {

if wakenUp {
wakenUp = false
for task := p.asyncTaskQueue.Dequeue(); task != nil; task = p.asyncTaskQueue.Dequeue() {
var task queue.Task
for i := 0; i < AsyncTasks; i++ {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
switch err = task(); err {
case nil:
case errors.ErrServerShutdown:
Expand All @@ -138,6 +142,10 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
}
}
atomic.StoreInt32(&p.netpollWakeSig, 0)
if !p.asyncTaskQueue.Empty() {
for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) {
}
}
}

if n == el.size {
Expand Down
2 changes: 2 additions & 0 deletions internal/netpoll/epoll_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import "golang.org/x/sys/unix"
const (
// InitEvents represents the initial length of poller event-list.
InitEvents = 128
// AsyncTasks is the maximum number of asynchronous tasks that the event-loop will process at one time.
AsyncTasks = 64
// ErrEvents represents exceptional events that are not read/write, like socket being closed,
// reading/writing from/to a closed socket, etc.
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP
Expand Down
10 changes: 9 additions & 1 deletion internal/netpoll/kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error {

if wakenUp {
wakenUp = false
for task := p.asyncTaskQueue.Dequeue(); task != nil; task = p.asyncTaskQueue.Dequeue() {
var task queue.Task
for i := 0; i < AsyncTasks; i++ {
if task = p.asyncTaskQueue.Dequeue(); task == nil {
break
}
switch err = task(); err {
case nil:
case errors.ErrServerShutdown:
Expand All @@ -136,6 +140,10 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error {
}
}
atomic.StoreInt32(&p.netpollWakeSig, 0)
if !p.asyncTaskQueue.Empty() {
for _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Kevent(p.fd, wakeChanges, nil, nil) {
}
}
}

if n == el.size {
Expand Down
2 changes: 2 additions & 0 deletions internal/netpoll/kqueue_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import "golang.org/x/sys/unix"
const (
// InitEvents represents the initial length of poller event-list.
InitEvents = 64
// AsyncTasks is the maximum number of asynchronous tasks that the event-loop will process at one time.
AsyncTasks = 48
// EVFilterWrite represents writeable events from sockets.
EVFilterWrite = unix.EVFILT_WRITE
// EVFilterRead represents readable events from sockets.
Expand Down
8 changes: 8 additions & 0 deletions internal/netpoll/queue/lock_free_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ import (
type lockFreeQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
len int32
}

type node struct {
Expand All @@ -118,6 +119,7 @@ loop:
if cas(&tail.next, next, n) {
// Enqueue is done. Try to swing tail to the inserted node.
cas(&q.tail, tail, n)
atomic.AddInt32(&q.len, 1)
return
}
} else { // tail was not pointing to the last node
Expand Down Expand Up @@ -149,13 +151,19 @@ loop:
task := next.value
if cas(&q.head, head, next) {
// Dequeue is done. return value.
atomic.AddInt32(&q.len, -1)
return task
}
}
}
goto loop
}

// Empty indicates whether this queue is empty or not.
func (q *lockFreeQueue) Empty() bool {
return atomic.LoadInt32(&q.len) == 0
}

func load(p *unsafe.Pointer) (n *node) {
return (*node)(atomic.LoadPointer(p))
}
Expand Down
1 change: 1 addition & 0 deletions internal/netpoll/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type Task func() error
type AsyncTaskQueue interface {
Enqueue(Task)
Dequeue() Task
Empty() bool
}

0 comments on commit e315252

Please sign in to comment.