Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: implement the lock-free queue for dispatching tasks faster #181

Merged
merged 7 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: Run unit tests for utils
run: go test $(go list ./... | tail -n +2)
- name: Run unit tests for server
run: go test -v -coverprofile=coverage -covermode=count -timeout 60s
run: go test -v -race -coverprofile=coverage -covermode=atomic -timeout 60s
- name: Upload code coverage report to Codecov
uses: codecov/codecov-action@v1.2.1
with:
Expand Down
7 changes: 0 additions & 7 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,9 @@ func (el *eventloop) loopRun(lockOSThread bool) {
defer func() {
el.closeAllConns()
el.ln.close()
if el.idx == 0 && el.svr.opts.Ticker {
close(el.svr.ticktock)
}
el.svr.signalShutdown()
}()

if el.idx == 0 && el.svr.opts.Ticker {
go el.loopTicker()
}

err := el.poller.Polling(el.handleEvent)
el.svr.logger.Infof("Event-loop(%d) is exiting due to error: %v", el.idx, err)
}
Expand Down
7 changes: 0 additions & 7 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,12 @@ func (el *eventloop) loopRun(lockOSThread bool) {

var err error
defer func() {
if el.idx == 0 && el.svr.opts.Ticker {
close(el.svr.ticktock)
}
el.svr.signalShutdownWithErr(err)
el.svr.loopWG.Done()
el.loopEgress()
el.svr.loopWG.Done()
}()

if el.idx == 0 && el.svr.opts.Ticker {
go el.loopTicker()
}

for v := range el.ch {
switch v := v.(type) {
case error:
Expand Down
25 changes: 8 additions & 17 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/pool/bytebuffer"
"github.com/panjf2000/gnet/pool/goroutine"
"github.com/valyala/bytebufferpool"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -259,13 +258,8 @@ func testCodecServe(network, addr string, multicore, async bool, nclients int, r
network: network, addr: addr, multicore: multicore, async: async, nclients: nclients,
codec: codec, workerPool: goroutine.Default(),
}
if reuseport {
err = Serve(ts, network+"://"+addr, WithMulticore(multicore), WithTicker(true),
WithTCPKeepAlive(time.Minute*5), WithCodec(codec), WithReusePort(true))
} else {
err = Serve(ts, network+"://"+addr, WithMulticore(multicore), WithTicker(true),
WithTCPKeepAlive(time.Minute*5), WithCodec(codec))
}
err = Serve(ts, network+"://"+addr, WithMulticore(multicore), WithTicker(true),
WithTCPKeepAlive(time.Minute*5), WithCodec(codec), WithReusePort(reuseport))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -431,7 +425,6 @@ type testServer struct {
clientActive int32
disconnected int32
workerPool *goroutine.Pool
bytesList []*bytebufferpool.ByteBuffer
}

func (s *testServer) OnInitComplete(svr Server) (action Action) {
Expand Down Expand Up @@ -464,9 +457,6 @@ func (s *testServer) OnClosed(c Conn, err error) (action Action) {
if atomic.LoadInt32(&s.connected) == atomic.LoadInt32(&s.disconnected) &&
atomic.LoadInt32(&s.disconnected) == int32(s.nclients) {
action = Shutdown
for i := range s.bytesList {
bytebuffer.Put(s.bytesList[i])
}
s.workerPool.Release()
}

Expand All @@ -477,7 +467,6 @@ func (s *testServer) React(frame []byte, c Conn) (out []byte, action Action) {
if s.async {
buf := bytebuffer.Get()
_, _ = buf.Write(frame)
s.bytesList = append(s.bytesList, buf)

if s.network == "tcp" || s.network == "unix" {
// just for test
Expand Down Expand Up @@ -632,12 +621,13 @@ type testWakeConnServer struct {
*EventServer
network string
addr string
conn Conn
conn chan Conn
c Conn
wake bool
}

func (t *testWakeConnServer) OnOpened(c Conn) (out []byte, action Action) {
t.conn = c
t.conn <- c
return
}

Expand Down Expand Up @@ -669,13 +659,14 @@ func (t *testWakeConnServer) Tick() (delay time.Duration, action Action) {
}()
return
}
_ = t.conn.Wake()
t.c = <-t.conn
_ = t.c.Wake()
delay = time.Millisecond * 100
return
}

func testWakeConn(network, addr string) {
svr := &testWakeConnServer{network: network, addr: addr}
svr := &testWakeConnServer{network: network, addr: addr, conn: make(chan Conn, 1)}
logger := zap.NewExample()
must(Serve(svr, network+"://"+addr, WithTicker(true), WithNumEventLoop(2*runtime.NumCPU()),
WithLogger(logger.Sugar())))
Expand Down
39 changes: 17 additions & 22 deletions internal/netpoll/epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ import (
"unsafe"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal"
"github.com/panjf2000/gnet/internal/logging"
"github.com/panjf2000/gnet/internal/netpoll/queue"
"golang.org/x/sys/unix"
)

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int // epoll fd
wfd int // wake fd
wfdBuf []byte // wfd buffer to read packet
asyncJobQueue internal.AsyncJobQueue
fd int // epoll fd
wfd int // wake fd
wfdBuf []byte // wfd buffer to read packet
asyncTaskQueue queue.AsyncTaskQueue
}

// OpenPoller instantiates a poller.
Expand All @@ -62,7 +62,7 @@ func OpenPoller() (poller *Poller, err error) {
poller = nil
return
}
poller.asyncJobQueue = internal.NewAsyncJobQueue()
poller.asyncTaskQueue = queue.NewLockFreeQueue()
return
}

Expand All @@ -81,9 +81,9 @@ var (
b = (*(*[8]byte)(unsafe.Pointer(&u)))[:]
)

// Trigger wakes up the poller blocked in waiting for network-events and runs jobs in asyncJobQueue.
func (p *Poller) Trigger(job internal.Job) (err error) {
if p.asyncJobQueue.Push(job) == 1 {
// Trigger wakes up the poller blocked in waiting for network-events and runs jobs in asyncTaskQueue.
func (p *Poller) Trigger(task queue.Task) (err error) {
if p.asyncTaskQueue.Enqueue(task) == 1 {
for _, err = unix.Write(p.wfd, b); err != nil; _, err = unix.Write(p.wfd, b) {
}
}
Expand All @@ -95,7 +95,7 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
el := newEventList(InitEvents)
var wakenUp bool

var msec = -1
msec := -1
for {
n, err := unix.EpollWait(p.fd, el.events, msec)
if n == 0 || (n < 0 && err == unix.EINTR) {
Expand Down Expand Up @@ -125,19 +125,14 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {

if wakenUp {
wakenUp = false
runAsyncJobs:
leftover, err := p.asyncJobQueue.ForEach()
switch err {
case nil:
case errors.ErrServerShutdown:
return err
default:
if q := len(leftover); q > 0 && q == p.asyncJobQueue.Batch(leftover) {
if _, err = unix.Write(p.wfd, b); err != nil {
goto runAsyncJobs
}
for task := p.asyncTaskQueue.Dequeue(); task != nil; task = p.asyncTaskQueue.Dequeue() {
switch err = task(); err {
case nil:
case errors.ErrServerShutdown:
return err
default:
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
}

Expand Down
33 changes: 14 additions & 19 deletions internal/netpoll/kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ import (
"runtime"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal"
"github.com/panjf2000/gnet/internal/logging"
"github.com/panjf2000/gnet/internal/netpoll/queue"
"golang.org/x/sys/unix"
)

// Poller represents a poller which is in charge of monitoring file-descriptors.
type Poller struct {
fd int
asyncJobQueue internal.AsyncJobQueue
fd int
asyncTaskQueue queue.AsyncTaskQueue
}

// OpenPoller instantiates a poller.
Expand All @@ -57,7 +57,7 @@ func OpenPoller() (poller *Poller, err error) {
err = os.NewSyscallError("kevent add|clear", err)
return
}
poller.asyncJobQueue = internal.NewAsyncJobQueue()
poller.asyncTaskQueue = queue.NewLockFreeQueue()
return
}

Expand All @@ -72,9 +72,9 @@ var wakeChanges = []unix.Kevent_t{{
Fflags: unix.NOTE_TRIGGER,
}}

// Trigger wakes up the poller blocked in waiting for network-events and runs jobs in asyncJobQueue.
func (p *Poller) Trigger(job internal.Job) (err error) {
if p.asyncJobQueue.Push(job) == 1 {
// Trigger wakes up the poller blocked in waiting for network-events and runs jobs in asyncTaskQueue.
func (p *Poller) Trigger(job queue.Task) (err error) {
if p.asyncTaskQueue.Enqueue(job) == 1 {
for _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err != nil; _, err = unix.Kevent(p.fd, wakeChanges, nil, nil) {
}
}
Expand Down Expand Up @@ -123,19 +123,14 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error {

if wakenUp {
wakenUp = false
runAsyncJobs:
leftover, err := p.asyncJobQueue.ForEach()
switch err {
case nil:
case errors.ErrServerShutdown:
return err
default:
if q := len(leftover); q > 0 && q == p.asyncJobQueue.Batch(leftover) {
if _, err = unix.Kevent(p.fd, wakeChanges, nil, nil); err != nil {
goto runAsyncJobs
}
for task := p.asyncTaskQueue.Dequeue(); task != nil; task = p.asyncTaskQueue.Dequeue() {
switch err = task(); err {
case nil:
case errors.ErrServerShutdown:
return err
default:
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
logging.DefaultLogger.Warnf("Error occurs in user-defined function, %v", err)
}
}

Expand Down
Loading