Skip to content

Commit

Permalink
opt: make it more robust when running async jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Feb 4, 2021
1 parent 035f614 commit 6509b85
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
8 changes: 6 additions & 2 deletions internal/netpoll/epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ var (
// Trigger wakes up the poller blocked in waiting for network-events and runs jobs in asyncJobQueue.
func (p *Poller) Trigger(job internal.Job) (err error) {
if p.asyncJobQueue.Push(job) == 1 {
_, err = unix.Write(p.wfd, b)
for _, err = unix.Write(p.wfd, b); err != nil; _, err = unix.Write(p.wfd, b) {
}
}
return os.NewSyscallError("write", err)
}
Expand Down Expand Up @@ -124,14 +125,17 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {

if wakenUp {
wakenUp = false
runAsyncJobs:
leftover, err := p.asyncJobQueue.ForEach()
switch err {
case nil:
case errors.ErrServerShutdown:
return err
default:
if q := len(leftover); q > 0 && q == p.asyncJobQueue.Batch(leftover) {
_, err = unix.Write(p.wfd, b)
if _, err = unix.Write(p.wfd, b); err != nil {
goto runAsyncJobs
}
}
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
Expand Down
8 changes: 6 additions & 2 deletions internal/netpoll/kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ var wakeChanges = []unix.Kevent_t{{
// Trigger wakes up the poller blocked in waiting for network-events and runs jobs in asyncJobQueue.
func (p *Poller) Trigger(job internal.Job) (err error) {
if p.asyncJobQueue.Push(job) == 1 {
_, err = unix.Kevent(p.fd, wakeChanges, nil, nil)
for _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err != nil; _, err = unix.Kevent(p.fd, wakeChanges, nil, nil) {
}
}
return os.NewSyscallError("kevent trigger", err)
}
Expand Down Expand Up @@ -122,14 +123,17 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error {

if wakenUp {
wakenUp = false
runAsyncJobs:
leftover, err := p.asyncJobQueue.ForEach()
switch err {
case nil:
case errors.ErrServerShutdown:
return err
default:
if q := len(leftover); q > 0 && q == p.asyncJobQueue.Batch(leftover) {
_, err = unix.Kevent(p.fd, wakeChanges, nil, nil)
if _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err != nil {
goto runAsyncJobs
}
}
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
Expand Down

0 comments on commit 6509b85

Please sign in to comment.