Skip to content

Commit

Permalink
opt: refine the logic of eventfd in poller
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Mar 12, 2022
1 parent a22af33 commit 38aa2e0
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 104 deletions.
61 changes: 33 additions & 28 deletions internal/netpoll/epoll_default_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ import (

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int // epoll fd
wfd int // wake fd
wfdBuf []byte // wfd buffer to read packet
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
priorAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
fd int // epoll fd
efd int // eventfd
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
}

// OpenPoller instantiates a poller.
Expand All @@ -49,20 +49,20 @@ func OpenPoller() (poller *Poller, err error) {
err = os.NewSyscallError("epoll_create1", err)
return
}
if poller.wfd, err = unix.Eventfd(0, unix.EFD_NONBLOCK|unix.EFD_CLOEXEC); err != nil {
if poller.efd, err = unix.Eventfd(0, unix.EFD_NONBLOCK|unix.EFD_CLOEXEC); err != nil {
_ = poller.Close()
poller = nil
err = os.NewSyscallError("eventfd", err)
return
}
poller.wfdBuf = make([]byte, 8)
if err = poller.AddRead(&PollAttachment{FD: poller.wfd}); err != nil {
poller.efdBuf = make([]byte, 8)
if err = poller.AddRead(&PollAttachment{FD: poller.efd}); err != nil {
_ = poller.Close()
poller = nil
return
}
poller.asyncTaskQueue = queue.NewLockFreeQueue()
poller.priorAsyncTaskQueue = queue.NewLockFreeQueue()
poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue()
return
}

Expand All @@ -71,7 +71,7 @@ func (p *Poller) Close() error {
if err := os.NewSyscallError("close", unix.Close(p.fd)); err != nil {
return err
}
return os.NewSyscallError("close", unix.Close(p.wfd))
return os.NewSyscallError("close", unix.Close(p.efd))
}

// Make the endianness of bytes compatible with more linux OSs under different processor-architectures,
Expand All @@ -81,17 +81,18 @@ var (
b = (*(*[8]byte)(unsafe.Pointer(&u)))[:]
)

// UrgentTrigger puts task into priorAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from priorAsyncTaskQueue and run them.
// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from urgentAsyncTaskQueue and run them.
//
// Note that priorAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.priorAsyncTaskQueue.Enqueue(task)
p.urgentAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) {
if _, err = unix.Write(p.efd, b); err == unix.EAGAIN {
err = nil
}
}
return os.NewSyscallError("write", err)
Expand All @@ -106,7 +107,8 @@ func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) {
if _, err = unix.Write(p.efd, b); err == unix.EAGAIN {
err = nil
}
}
return os.NewSyscallError("write", err)
Expand All @@ -115,7 +117,7 @@ func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
// Polling blocks the current goroutine, waiting for network-events.
func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
el := newEventList(InitPollEventsCap)
var wakenUp bool
var doChores bool

msec := -1
for {
Expand All @@ -132,24 +134,24 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {

for i := 0; i < n; i++ {
ev := &el.events[i]
if fd := int(ev.Fd); fd != p.wfd {
if fd := int(ev.Fd); fd != p.efd {
switch err = callback(fd, ev.Events); err {
case nil:
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
return err
default:
logging.Warnf("error occurs in event-loop: %v", err)
}
} else { // poller is awaken to run tasks in queues.
wakenUp = true
_, _ = unix.Read(p.wfd, p.wfdBuf)
} else { // poller is awakened to run tasks in queues.
doChores = true
_, _ = unix.Read(p.efd, p.efdBuf)
}
}

if wakenUp {
wakenUp = false
task := p.priorAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.priorAsyncTaskQueue.Dequeue() {
if doChores {
doChores = false
task := p.urgentAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.urgentAsyncTaskQueue.Dequeue() {
switch err = task.Run(task.Arg); err {
case nil:
case errors.ErrEngineShutdown:
Expand All @@ -173,8 +175,11 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
queue.PutTask(task)
}
atomic.StoreInt32(&p.wakeupCall, 0)
if (!p.asyncTaskQueue.IsEmpty() || !p.priorAsyncTaskQueue.IsEmpty()) && atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
for _, err = unix.Write(p.wfd, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wfd, b) {
if (!p.asyncTaskQueue.IsEmpty() || !p.urgentAsyncTaskQueue.IsEmpty()) && atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
switch _, err = unix.Write(p.efd, b); err {
case nil, unix.EAGAIN:
default:
doChores = true
}
}
}
Expand Down
65 changes: 35 additions & 30 deletions internal/netpoll/epoll_optimized_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int // epoll fd
wpa *PollAttachment // PollAttachment for wake events
wfdBuf []byte // wfd buffer to read packet
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
priorAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
fd int // epoll fd
epa *PollAttachment // PollAttachment for waking events
efdBuf []byte // efd buffer to read an 8-byte integer
wakeupCall int32
asyncTaskQueue queue.AsyncTaskQueue // queue with low priority
urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority
}

// OpenPoller instantiates a poller.
Expand All @@ -48,22 +48,22 @@ func OpenPoller() (poller *Poller, err error) {
err = os.NewSyscallError("epoll_create1", err)
return
}
var wfd int
if wfd, err = unix.Eventfd(0, unix.EFD_NONBLOCK|unix.EFD_CLOEXEC); err != nil {
var efd int
if efd, err = unix.Eventfd(0, unix.EFD_NONBLOCK|unix.EFD_CLOEXEC); err != nil {
_ = poller.Close()
poller = nil
err = os.NewSyscallError("eventfd", err)
return
}
poller.wfdBuf = make([]byte, 8)
poller.wpa = &PollAttachment{FD: wfd}
if err = poller.AddRead(poller.wpa); err != nil {
poller.efdBuf = make([]byte, 8)
poller.epa = &PollAttachment{FD: efd}
if err = poller.AddRead(poller.epa); err != nil {
_ = poller.Close()
poller = nil
return
}
poller.asyncTaskQueue = queue.NewLockFreeQueue()
poller.priorAsyncTaskQueue = queue.NewLockFreeQueue()
poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue()
return
}

Expand All @@ -72,7 +72,7 @@ func (p *Poller) Close() error {
if err := os.NewSyscallError("close", unix.Close(p.fd)); err != nil {
return err
}
return os.NewSyscallError("close", unix.Close(p.wpa.FD))
return os.NewSyscallError("close", unix.Close(p.epa.FD))
}

// Make the endianness of bytes compatible with more linux OSs under different processor-architectures,
Expand All @@ -82,17 +82,18 @@ var (
b = (*(*[8]byte)(unsafe.Pointer(&u)))[:]
)

// UrgentTrigger puts task into priorAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from priorAsyncTaskQueue and run them.
// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events,
// then the poller will get tasks from urgentAsyncTaskQueue and run them.
//
// Note that priorAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small,
// so only those urgent tasks should be put into this queue.
func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) {
task := queue.GetTask()
task.Run, task.Arg = fn, arg
p.priorAsyncTaskQueue.Enqueue(task)
p.urgentAsyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
for _, err = unix.Write(p.wpa.FD, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wpa.FD, b) {
if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN {
err = nil
}
}
return os.NewSyscallError("write", err)
Expand All @@ -107,7 +108,8 @@ func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
task.Run, task.Arg = fn, arg
p.asyncTaskQueue.Enqueue(task)
if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
for _, err = unix.Write(p.wpa.FD, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wpa.FD, b) {
if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN {
err = nil
}
}
return os.NewSyscallError("write", err)
Expand All @@ -116,7 +118,7 @@ func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
// Polling blocks the current goroutine, waiting for network-events.
func (p *Poller) Polling() error {
el := newEventList(InitPollEventsCap)
var wakenUp bool
var doChores bool

msec := -1
for {
Expand All @@ -134,24 +136,24 @@ func (p *Poller) Polling() error {
for i := 0; i < n; i++ {
ev := &el.events[i]
pollAttachment := *(**PollAttachment)(unsafe.Pointer(&ev.data))
if pollAttachment.FD != p.wpa.FD {
if pollAttachment.FD != p.epa.FD {
switch err = pollAttachment.Callback(pollAttachment.FD, ev.events); err {
case nil:
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
return err
default:
logging.Warnf("error occurs in event-loop: %v", err)
}
} else { // poller is awaken to run tasks in queues.
wakenUp = true
_, _ = unix.Read(p.wpa.FD, p.wfdBuf)
} else { // poller is awakened to run tasks in queues.
doChores = true
_, _ = unix.Read(p.epa.FD, p.efdBuf)
}
}

if wakenUp {
wakenUp = false
task := p.priorAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.priorAsyncTaskQueue.Dequeue() {
if doChores {
doChores = false
task := p.urgentAsyncTaskQueue.Dequeue()
for ; task != nil; task = p.urgentAsyncTaskQueue.Dequeue() {
switch err = task.Run(task.Arg); err {
case nil:
case errors.ErrEngineShutdown:
Expand All @@ -175,8 +177,11 @@ func (p *Poller) Polling() error {
queue.PutTask(task)
}
atomic.StoreInt32(&p.wakeupCall, 0)
if (!p.asyncTaskQueue.IsEmpty() || !p.priorAsyncTaskQueue.IsEmpty()) && atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
for _, err = unix.Write(p.wpa.FD, b); err == unix.EINTR || err == unix.EAGAIN; _, err = unix.Write(p.wpa.FD, b) {
if (!p.asyncTaskQueue.IsEmpty() || !p.urgentAsyncTaskQueue.IsEmpty()) && atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {
switch _, err = unix.Write(p.epa.FD, b); err {
case nil, unix.EAGAIN:
default:
doChores = true
}
}
}
Expand Down

0 comments on commit 38aa2e0

Please sign in to comment.