Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Goroutine Pool for UDP datagram reads #1416

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 49 additions & 34 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import (
"errors"
"io"
"net"
"runtime"
"strings"
"sync"

"golang.org/x/sync/errgroup"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't formatted properly. Stdlib should be grouped up separately from other imports.


"time"
)

Expand Down Expand Up @@ -495,45 +499,56 @@ func (srv *Server) serveUDP(l net.PacketConn) error {
}

var wg sync.WaitGroup
defer func() {
wg.Wait()
close(srv.shutdown)
}()

rtimeout := srv.getReadTimeout()
// deadline is not used here
for srv.isStarted() {
var (
m []byte
sPC net.Addr
sUDP *SessionUDP
err error
)
if isUDP {
m, sUDP, err = reader.ReadUDP(lUDP, rtimeout)
} else {
m, sPC, err = readerPC.ReadPacketConn(l, rtimeout)
}
if err != nil {
if !srv.isStarted() {
return nil
}
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
continue
}
return err
}
if len(m) < headerSize {
if cap(m) == srv.UDPSize {
srv.udpPool.Put(m[:srv.UDPSize])

g := new(errgroup.Group)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var g errgroup.Group


for i := 0; i < runtime.NumCPU(); i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if NumCPU is correct here or not.

g.Go(func() error {
// deadline is not used here
for srv.isStarted() {
var (
m []byte
sPC net.Addr
sUDP *SessionUDP
err error
)

if isUDP {
m, sUDP, err = reader.ReadUDP(lUDP, rtimeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing to actually ensure this call unblocks when the errgroup returns with an error. This could easily end up still reading in one go routine while others have already exited.

} else {
m, sPC, err = readerPC.ReadPacketConn(l, rtimeout)
}
if err != nil {
if !srv.isStarted() {
return nil
}
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
continue
}

return err
}
if len(m) < headerSize {
if cap(m) == srv.UDPSize {
srv.udpPool.Put(m[:srv.UDPSize])
}
continue
}
wg.Add(1)
go srv.serveUDPPacket(&wg, m, l, sUDP, sPC)
}
continue
}
wg.Add(1)
go srv.serveUDPPacket(&wg, m, l, sUDP, sPC)

return nil
})
}

return nil
err := g.Wait()
wg.Wait()
close(srv.shutdown)

return err
}

// Serve a new TCP connection.
Expand Down