Skip to content

Commit

Permalink
fix: resolve the issue of closing one fd twice
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Jul 4, 2020
1 parent a704e06 commit b5a5c71
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
19 changes: 17 additions & 2 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ func (el *eventloop) loopRun() {
if el.idx == 0 && el.svr.opts.Ticker {
go el.loopTicker()
}

switch err := el.poller.Polling(el.handleEvent); err {
case errors.ErrServerShutdown:
el.svr.logger.Infof("Event-loop(%d) is exiting normally on the signal error: %v", el.idx, err)
default:
el.svr.logger.Fatalf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)
el.svr.logger.Errorf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)

}
}
Expand All @@ -79,6 +80,7 @@ func (el *eventloop) loopAccept(fd int) error {
if el.ln.network == "udp" {
return el.loopReadUDP(fd)
}

nfd, sa, err := unix.Accept(fd)
if err != nil {
if err == unix.EAGAIN {
Expand All @@ -97,19 +99,21 @@ func (el *eventloop) loopAccept(fd int) error {
}
return err
}

return nil
}

func (el *eventloop) loopOpen(c *conn) error {
c.opened = true
c.localAddr = el.ln.lnaddr
c.remoteAddr = netpoll.SockaddrToTCPOrUnixAddr(c.sa)
out, action := el.eventHandler.OnOpened(c)
if el.svr.opts.TCPKeepAlive > 0 {
if proto := el.ln.network; proto == "tcp" || proto == "unix" {
_ = netpoll.SetKeepAlive(c.fd, int(el.svr.opts.TCPKeepAlive/time.Second))
}
}

out, action := el.eventHandler.OnOpened(c)
if out != nil {
c.open(out)
}
Expand Down Expand Up @@ -181,13 +185,20 @@ func (el *eventloop) loopWrite(c *conn) error {
if c.outboundBuffer.IsEmpty() {
_ = el.poller.ModRead(c.fd)
}

return nil
}

func (el *eventloop) loopCloseConn(c *conn, err error) error {
if !c.outboundBuffer.IsEmpty() && err == nil {
_ = el.loopWrite(c)
}

if !c.opened {
el.svr.logger.Debugf("The fd=%d in event-loop(%d) is already closed, skipping it", c.fd, el.idx)
return nil
}

err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd)
if err0 == nil && err1 == nil {
delete(el.connections, c.fd)
Expand All @@ -205,6 +216,7 @@ func (el *eventloop) loopCloseConn(c *conn, err error) error {
c.fd, el.idx, os.NewSyscallError("close", err1))
}
}

return nil
}

Expand All @@ -217,6 +229,7 @@ func (el *eventloop) loopWake(c *conn) error {
frame, _ := el.codec.Encode(c, out)
c.write(frame)
}

return el.handleAction(c, action)
}

Expand Down Expand Up @@ -271,6 +284,7 @@ func (el *eventloop) loopReadUDP(fd int) error {
}
return nil
}

c := newUDPConn(fd, el, sa)
out, action := el.eventHandler.React(el.packet[:n], c)
if out != nil {
Expand All @@ -281,5 +295,6 @@ func (el *eventloop) loopReadUDP(fd int) error {
return errors.ErrServerShutdown
}
c.releaseUDP()

return nil
}
21 changes: 14 additions & 7 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ func (el *eventloop) loopRun() {
el.loopEgress()
el.svr.loopWG.Done()
}()

if el.idx == 0 && el.svr.opts.Ticker {
go el.loopTicker()
}

for v := range el.ch {
switch v := v.(type) {
case error:
Expand All @@ -72,7 +74,7 @@ func (el *eventloop) loopRun() {
err = v()
}
if err != nil {
el.svr.logger.Fatalf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)
el.svr.logger.Infof("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)
break
}
}
Expand All @@ -83,18 +85,19 @@ func (el *eventloop) loopAccept(c *stdConn) error {
c.localAddr = el.svr.ln.lnaddr
c.remoteAddr = c.conn.RemoteAddr()
el.calibrateCallback(el, 1)

out, action := el.eventHandler.OnOpened(c)
if out != nil {
el.eventHandler.PreWrite()
_, _ = c.conn.Write(out)
}
if el.svr.opts.TCPKeepAlive > 0 {
if c, ok := c.conn.(*net.TCPConn); ok {
_ = c.SetKeepAlive(true)
_ = c.SetKeepAlivePeriod(el.svr.opts.TCPKeepAlive)
}
}

out, action := el.eventHandler.OnOpened(c)
if out != nil {
el.eventHandler.PreWrite()
_, _ = c.conn.Write(out)
}

return el.handleAction(c, action)
}

Expand Down Expand Up @@ -123,6 +126,7 @@ func (el *eventloop) loopRead(ti *tcpIn) (err error) {
_, _ = c.inboundBuffer.Write(c.buffer.Bytes())
bytebuffer.Put(c.buffer)
c.buffer = nil

return
}

Expand Down Expand Up @@ -185,6 +189,7 @@ func (el *eventloop) loopError(c *stdConn, err error) (e error) {
} else {
el.svr.logger.Warnf("Failed to close connection(%s), error: %v", c.remoteAddr.String(), e)
}

return
}

Expand All @@ -197,6 +202,7 @@ func (el *eventloop) loopWake(c *stdConn) error {
frame, _ := el.codec.Encode(c, out)
_, _ = c.conn.Write(frame)
}

return el.handleAction(c, action)
}

Expand Down Expand Up @@ -224,5 +230,6 @@ func (el *eventloop) loopReadUDP(c *stdConn) error {
return errors.ErrServerShutdown
}
c.releaseUDP()

return nil
}
4 changes: 2 additions & 2 deletions reactor_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (svr *server) activateMainReactor() {
case errors.ErrServerShutdown:
svr.logger.Infof("Main reactor is exiting normally on the signal error: %v", err)
default:
svr.logger.Fatalf("Main reactor is exiting due to an unexpected error: %v", err)
svr.logger.Errorf("Main reactor is exiting due to an unexpected error: %v", err)

}
}
Expand Down Expand Up @@ -76,6 +76,6 @@ func (svr *server) activateSubReactor(el *eventloop) {
case errors.ErrServerShutdown:
svr.logger.Infof("Event-loop(%d) is exiting normally on the signal error: %v", el.idx, err)
default:
svr.logger.Fatalf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)
svr.logger.Errorf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)
}
}
4 changes: 2 additions & 2 deletions reactor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (svr *server) activateMainReactor() {
case errors.ErrServerShutdown:
svr.logger.Infof("Main reactor is exiting normally on the signal error: %v", err)
default:
svr.logger.Fatalf("Main reactor is exiting due to an unexpected error: %v", err)
svr.logger.Errorf("Main reactor is exiting due to an unexpected error: %v", err)
}
}

Expand Down Expand Up @@ -70,6 +70,6 @@ func (svr *server) activateSubReactor(el *eventloop) {
case errors.ErrServerShutdown:
svr.logger.Infof("Event-loop(%d) is exiting normally on the signal error: %v", el.idx, err)
default:
svr.logger.Fatalf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)
svr.logger.Errorf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)
}
}

0 comments on commit b5a5c71

Please sign in to comment.