Skip to content

Commit

Permalink
feat: support UDP and Unix protocol in client
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Dec 4, 2021
1 parent 5d8fe64 commit 7159b95
Show file tree
Hide file tree
Showing 17 changed files with 459 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
run: go test $(go list ./... | tail -n +2)

- name: Run integration testing
run: go test -v -race -coverprofile="coverage.report" -covermode=atomic -timeout 60s
run: go test -v -race -coverprofile="coverage.report" -covermode=atomic -timeout 120s

- name: Upload the code coverage report to codecov.io
uses: codecov/codecov-action@v2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci_poll_opt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ jobs:
${{ runner.os }}-${{ matrix.go }}-go-ci
- name: Run integration testing with poll_opt build tag
run: go test -v -tags=poll_opt -timeout 60s
run: go test -v -tags=poll_opt -timeout 120s
14 changes: 7 additions & 7 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/panjf2000/gnet/pkg/logging"
)

func (svr *server) acceptNewConnection(_ netpoll.IOEvent) error {
nfd, sa, err := unix.Accept(svr.ln.fd)
func (svr *server) acceptNewConnection(fd int, _ netpoll.IOEvent) error {
nfd, sa, err := unix.Accept(fd)
if err != nil {
if err == unix.EAGAIN {
return nil
Expand All @@ -42,14 +42,14 @@ func (svr *server) acceptNewConnection(_ netpoll.IOEvent) error {
return err
}

netAddr := socket.SockaddrToTCPOrUnixAddr(sa)
remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if svr.opts.TCPKeepAlive > 0 && svr.ln.network == "tcp" {
err = socket.SetKeepAlive(nfd, int(svr.opts.TCPKeepAlive/time.Second))
logging.Error(err)
}

el := svr.lb.next(netAddr)
c := newTCPConn(nfd, el, sa, svr.opts.Codec, el.ln.lnaddr, netAddr)
el := svr.lb.next(remoteAddr)
c := newTCPConn(nfd, el, sa, svr.opts.Codec, el.ln.lnaddr, remoteAddr)

err = el.poller.UrgentTrigger(el.loopRegister, c)
if err != nil {
Expand All @@ -59,9 +59,9 @@ func (svr *server) acceptNewConnection(_ netpoll.IOEvent) error {
return nil
}

func (el *eventloop) loopAccept(_ netpoll.IOEvent) error {
func (el *eventloop) loopAccept(fd int, _ netpoll.IOEvent) error {
if el.ln.network == "udp" {
return el.loopReadUDP(el.ln.fd)
return el.loopReadUDP(fd)
}

nfd, sa, err := unix.Accept(el.ln.fd)
Expand Down
37 changes: 23 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"errors"
"net"
"strconv"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -45,6 +47,7 @@ type Client struct {
func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err error) {
options := loadOptions(opts...)
cli = new(Client)
cli.opts = options
var logger logging.Logger
if options.LogPath != "" {
if logger, cli.logFlush, err = logging.CreateLoggerAsLocalFile(options.LogPath, options.LogLevel); err != nil {
Expand All @@ -59,7 +62,6 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
if options.Codec == nil {
cli.opts.Codec = new(BuiltInFrameCodec)
}
cli.opts = options
var p *netpoll.Poller
if p, err = netpoll.OpenPoller(); err != nil {
return
Expand All @@ -68,7 +70,7 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
svr.opts = options
svr.eventHandler = eventHandler
svr.ln = new(listener)

svr.ln.network = "udp"
svr.cond = sync.NewCond(&sync.Mutex{})
if options.Ticker {
svr.tickerCtx, svr.cancelTicker = context.WithCancel(context.Background())
Expand All @@ -83,6 +85,7 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
options.ReadBufferCap = toolkit.CeilToPowerOfTwo(rbc)
}
el.buffer = make([]byte, options.ReadBufferCap)
el.udpSockets = make(map[int]*conn)
el.connections = make(map[int]*conn)
el.eventHandler = eventHandler
cli.el = el
Expand All @@ -94,7 +97,7 @@ func (cli *Client) Start() error {
cli.el.eventHandler.OnInitComplete(Server{})
cli.el.svr.wg.Add(1)
go func() {
cli.el.activateSubReactor(cli.opts.LockOSThread)
cli.el.loopRun(cli.opts.LockOSThread)
cli.el.svr.wg.Done()
}()
// Start the ticker.
Expand Down Expand Up @@ -149,16 +152,19 @@ func (cli *Client) Dial(network, address string) (Conn, error) {
return nil, e
}

if cli.opts.TCPNoDelay == TCPNoDelay {
if err = socket.SetNoDelay(DupFD, 1); err != nil {
return nil, err
if strings.HasPrefix(network, "tcp") {
if cli.opts.TCPNoDelay == TCPNoDelay {
if err = socket.SetNoDelay(DupFD, 1); err != nil {
return nil, err
}
}
}
if cli.opts.TCPKeepAlive > 0 {
if err = socket.SetKeepAlive(DupFD, int(cli.opts.TCPKeepAlive/time.Second)); err != nil {
return nil, err
if cli.opts.TCPKeepAlive > 0 {
if err = socket.SetKeepAlive(DupFD, int(cli.opts.TCPKeepAlive/time.Second)); err != nil {
return nil, err
}
}
}

if cli.opts.SocketSendBuffer > 0 {
if err = socket.SetSendBuffer(DupFD, cli.opts.SocketSendBuffer); err != nil {
return nil, err
Expand All @@ -176,26 +182,29 @@ func (cli *Client) Dial(network, address string) (Conn, error) {
)
switch c.(type) {
case *net.UnixConn:
if sockAddr, _, _, err = socket.GetUnixSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil {
if sockAddr, _, _, err = socket.GetUnixSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
return nil, err
}
ua := c.LocalAddr().(*net.UnixAddr)
ua.Name = c.RemoteAddr().String() + "." + strconv.Itoa(DupFD)
gc = newTCPConn(DupFD, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr())
case *net.TCPConn:
if sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil {
if sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
return nil, err
}
gc = newTCPConn(DupFD, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr())
case *net.UDPConn:
if sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil {
if sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
return nil, err
}
gc = newUDPConn(DupFD, cli.el, c.LocalAddr(), sockAddr)
gc = newUDPConn(DupFD, cli.el, c.LocalAddr(), sockAddr, true)
default:
return nil, gerrors.ErrUnsupportedProtocol
}
err = cli.el.poller.UrgentTrigger(cli.el.loopRegister, gc)
if err != nil {
gc.Close()
return nil, err
}
return gc, nil
}
Loading

0 comments on commit 7159b95

Please sign in to comment.