Skip to content

Commit

Permalink
chore: code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed May 24, 2023
1 parent 5746a14 commit 9ff529b
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 43 deletions.
2 changes: 1 addition & 1 deletion acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (eng *engine) accept(fd int, _ netpoll.IOEvent) error {
logging.Error(err)
}

el := eng.lb.next(remoteAddr)
el := eng.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
err = el.poller.UrgentTrigger(el.register, c)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (eng *engine) listen() (err error) {
return
}

el := eng.lb.next(addr)
el := eng.eventLoops.next(addr)
c := newUDPConn(el, eng.ln.addr, addr)
el.ch <- packUDPConn(c, buffer[:n])
} else {
Expand All @@ -49,7 +49,7 @@ func (eng *engine) listen() (err error) {
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
return
}
el := eng.lb.next(tc.RemoteAddr())
el := eng.eventLoops.next(tc.RemoteAddr())
c := newTCPConn(tc, el)
el.ch <- c
go func(c *conn, tc net.Conn, el *eventloop) {
Expand Down
46 changes: 23 additions & 23 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (

type engine struct {
ln *listener // the listener for accepting new connections
lb loadBalancer // event-loops for handling events
opts *Options // options with engine
mainLoop *eventloop // main event-loop for accepting connections
acceptor *eventloop // main event-loop for accepting connections
eventLoops loadBalancer // event-loops for handling events
inShutdown int32 // whether the engine is in shutdown
ticker struct {
ctx context.Context // context for ticker
Expand Down Expand Up @@ -66,22 +66,29 @@ func (eng *engine) shutdown(err error) {
}

func (eng *engine) startEventLoops() {
eng.lb.iterate(func(i int, el *eventloop) bool {
eng.eventLoops.iterate(func(i int, el *eventloop) bool {
eng.workerPool.Go(el.run)
return true
})
}

func (eng *engine) closeEventLoops() {
eng.lb.iterate(func(i int, el *eventloop) bool {
eng.eventLoops.iterate(func(i int, el *eventloop) bool {
el.ln.close()
_ = el.poller.Close()
return true
})
if eng.acceptor != nil {
eng.ln.close()
err := eng.acceptor.poller.Close()
if err != nil {
eng.opts.Logger.Errorf("failed to close poller when stopping engine: %v", err)
}
}
}

func (eng *engine) startSubReactors() {
eng.lb.iterate(func(i int, el *eventloop) bool {
eng.eventLoops.iterate(func(i int, el *eventloop) bool {
eng.workerPool.Go(el.activateSubReactor)
return true
})
Expand Down Expand Up @@ -110,7 +117,7 @@ func (eng *engine) activateEventLoops(numEventLoop int) (err error) {
if err = el.poller.AddRead(el.ln.packPollAttachment(el.accept)); err != nil {
return
}
eng.lb.register(el)
eng.eventLoops.register(el)

// Start the ticker.
if el.idx == 0 && eng.opts.Ticker {
Expand Down Expand Up @@ -142,7 +149,7 @@ func (eng *engine) activateReactors(numEventLoop int) error {
el.buffer = make([]byte, eng.opts.ReadBufferCap)
el.connections.init()
el.eventHandler = eng.eventHandler
eng.lb.register(el)
eng.eventLoops.register(el)
} else {
return err
}
Expand All @@ -161,7 +168,7 @@ func (eng *engine) activateReactors(numEventLoop int) error {
if err = el.poller.AddRead(eng.ln.packPollAttachment(eng.accept)); err != nil {
return err
}
eng.mainLoop = el
eng.acceptor = el

// Start main reactor in background.
eng.workerPool.Go(el.activateMainReactor)
Expand All @@ -172,7 +179,7 @@ func (eng *engine) activateReactors(numEventLoop int) error {
// Start the ticker.
if eng.opts.Ticker {
eng.workerPool.Go(func() error {
eng.mainLoop.ticker(eng.ticker.ctx)
eng.acceptor.ticker(eng.ticker.ctx)
return nil
})
}
Expand All @@ -195,15 +202,15 @@ func (eng *engine) stop(s Engine) {
eng.eventHandler.OnShutdown(s)

// Notify all event-loops to exit.
eng.lb.iterate(func(i int, el *eventloop) bool {
eng.eventLoops.iterate(func(i int, el *eventloop) bool {
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping engine: %v", err)
}
return true
})
if eng.mainLoop != nil {
err := eng.mainLoop.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if eng.acceptor != nil {
err := eng.acceptor.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping engine: %v", err)
}
Expand All @@ -220,13 +227,6 @@ func (eng *engine) stop(s Engine) {

// Close all listeners and pollers of event-loops.
eng.closeEventLoops()
if eng.mainLoop != nil {
eng.ln.close()
err := eng.mainLoop.poller.Close()
if err != nil {
eng.opts.Logger.Errorf("failed to close poller when stopping engine: %v", err)
}
}

// Put the engine into the shutdown state.
atomic.StoreInt32(&eng.inShutdown, 1)
Expand Down Expand Up @@ -259,11 +259,11 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA
}
switch options.LB {
case RoundRobin:
eng.lb = new(roundRobinLoadBalancer)
eng.eventLoops = new(roundRobinLoadBalancer)
case LeastConnections:
eng.lb = new(leastConnectionsLoadBalancer)
eng.eventLoops = new(leastConnectionsLoadBalancer)
case SourceAddrHash:
eng.lb = new(sourceAddrHashLoadBalancer)
eng.eventLoops = new(sourceAddrHashLoadBalancer)
}

if eng.opts.Ticker {
Expand Down Expand Up @@ -294,7 +294,7 @@ func (eng *engine) sendCmd(cmd *asyncCmd, urgent bool) error {
if !gfd.Validate(cmd.fd) {
return errors.ErrInvalidConn
}
el := eng.lb.index(cmd.fd.EventLoopIndex())
el := eng.eventLoops.index(cmd.fd.EventLoopIndex())
if el == nil {
return errors.ErrInvalidConn
}
Expand Down
33 changes: 18 additions & 15 deletions engine_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
)

type engine struct {
ln *listener
lb loadBalancer // event-loops for handling events
opts *Options // options with engine
ticker struct {
ln *listener
opts *Options // options with engine
eventLoops loadBalancer // event-loops for handling events
ticker struct {
ctx context.Context
cancel context.CancelFunc
}
Expand All @@ -56,6 +56,14 @@ func (eng *engine) shutdown(err error) {
eng.workerPool.shutdown()
}

func (eng *engine) closeEventLoops() {
eng.eventLoops.iterate(func(i int, el *eventloop) bool {
el.ch <- errorx.ErrEngineShutdown
return true
})
eng.ln.close()
}

func (eng *engine) start(numEventLoop int) error {
for i := 0; i < numEventLoop; i++ {
el := eventloop{
Expand All @@ -65,7 +73,7 @@ func (eng *engine) start(numEventLoop int) error {
connections: make(map[*conn]struct{}),
eventHandler: eng.eventHandler,
}
eng.lb.register(&el)
eng.eventLoops.register(&el)
eng.workerPool.Go(el.run)
if i == 0 && eng.opts.Ticker {
eng.workerPool.Go(func() error {
Expand All @@ -86,17 +94,12 @@ func (eng *engine) stop(engine Engine) error {
eng.opts.Logger.Infof("engine is being shutdown...")
eng.eventHandler.OnShutdown(engine)

eng.ln.close()

eng.lb.iterate(func(i int, el *eventloop) bool {
el.ch <- errorx.ErrEngineShutdown
return true
})

if eng.ticker.cancel != nil {
eng.ticker.cancel()
}

eng.closeEventLoops()

if err := eng.workerPool.Wait(); err != nil {
eng.opts.Logger.Errorf("engine shutdown error: %v", err)
}
Expand Down Expand Up @@ -131,11 +134,11 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA

switch options.LB {
case RoundRobin:
eng.lb = new(roundRobinLoadBalancer)
eng.eventLoops = new(roundRobinLoadBalancer)
case LeastConnections:
eng.lb = new(leastConnectionsLoadBalancer)
eng.eventLoops = new(leastConnectionsLoadBalancer)
case SourceAddrHash:
eng.lb = new(sourceAddrHashLoadBalancer)
eng.eventLoops = new(sourceAddrHashLoadBalancer)
}

if options.Ticker {
Expand Down
2 changes: 1 addition & 1 deletion eventloop_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (s *benchmarkServerGC) OnBoot(eng Engine) (action Action) {
s.eng = eng
go func() {
for {
if s.eng.eng.lb.len() == s.elNum && s.eng.CountConnections() == s.elNum*int(s.initConnCount) {
if s.eng.eng.eventLoops.len() == s.elNum && s.eng.CountConnections() == s.elNum*int(s.initConnCount) {
break
}
time.Sleep(time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e Engine) CountConnections() (count int) {
return -1
}

e.eng.lb.iterate(func(i int, el *eventloop) bool {
e.eng.eventLoops.iterate(func(i int, el *eventloop) bool {
count += int(el.countConn())
return true
})
Expand Down

0 comments on commit 9ff529b

Please sign in to comment.