Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: enable ET mode on listener event-loop by default #585

Merged
merged 1 commit into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,43 @@
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)

func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
// ECONNABORTED indicates that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
return nil
default:
eng.opts.Logger.Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
for {
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EAGAIN: // the Accept queue has been drained, we can return now
return nil
case unix.EINTR, unix.ECONNRESET, unix.ECONNABORTED:

Check warning on line 38 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L38

Added line #L38 was not covered by tests
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
continue
default:
eng.opts.Logger.Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket

Check warning on line 45 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L42-L45

Added lines #L42 - L45 were not covered by tests
}
}
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if eng.opts.TCPKeepAlive > 0 && eng.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(eng.opts.TCPKeepAlive.Seconds()))
logging.Error(err)
}
remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if eng.opts.TCPKeepAlive > 0 && eng.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(eng.opts.TCPKeepAlive.Seconds()))
if err != nil {
eng.opts.Logger.Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)

Check warning on line 53 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L53

Added line #L53 was not covered by tests
}
}

el := eng.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err)
_ = unix.Close(nfd)
c.release()
el := eng.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err)
_ = unix.Close(nfd)
c.release()

Check warning on line 63 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L61-L63

Added lines #L61 - L63 were not covered by tests
}
}
return nil
}

func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
Expand All @@ -69,10 +73,10 @@
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
// ECONNABORTED indicates that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
case unix.EINTR, unix.EAGAIN, unix.ECONNRESET, unix.ECONNABORTED:

Check warning on line 76 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L76

Added line #L76 was not covered by tests
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
return nil
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
Expand All @@ -83,7 +87,9 @@
remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive/time.Second))
logging.Error(err)
if err != nil {
el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)

Check warning on line 91 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L91

Added line #L91 was not covered by tests
}
}

c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
Expand Down
2 changes: 1 addition & 1 deletion engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (eng *engine) activateReactors(numEventLoop int) error {
el.poller = p
el.eventHandler = eng.eventHandler
for _, ln := range eng.listeners {
if err = el.poller.AddRead(ln.packPollAttachment(eng.accept), false); err != nil {
if err = el.poller.AddRead(ln.packPollAttachment(eng.accept), true); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@
func (el *eventloop) readUDP1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
n, sa, err := unix.Recvfrom(fd, el.buffer, 0)
if err != nil {
if err == unix.EAGAIN || err == unix.EWOULDBLOCK {
if err == unix.EAGAIN {

Check warning on line 313 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L313

Added line #L313 was not covered by tests
return nil
}
return fmt.Errorf("failed to read UDP packet from fd=%d in event-loop(%d), %v",
Expand Down
Loading