Skip to content

Commit

Permalink
fix: resolve all test failures of gnet client
Browse files Browse the repository at this point in the history
Fixes #260
  • Loading branch information
panjf2000 committed Nov 21, 2021
1 parent 10c619f commit a5ac95a
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ on:
push:
branches:
- master
- dev
paths-ignore:
- '**.md'
pull_request:
branches:
- master
- dev
paths-ignore:
- '**.md'

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ci_poll_opt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ on:
push:
branches:
- master
- dev
paths-ignore:
- '**.md'
pull_request:
branches:
- master
- dev
paths-ignore:
- '**.md'

Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ name: CodeQL

on:
push:
branches: [master]
branches:
- master
- dev
paths-ignore:
- '**.md'
pull_request:
branches: [master]
branches:
- master
- dev
paths-ignore:
- '**.md'
schedule:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ on:
push:
branches:
- master
- dev
paths-ignore:
- '**.md'
pull_request:
branches:
- master
- dev
paths-ignore:
- '**.md'

Expand Down
51 changes: 34 additions & 17 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package gnet
import (
"context"
"net"
"os"
"sync"
"syscall"

"golang.org/x/sys/unix"

Expand Down Expand Up @@ -82,7 +82,11 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
// Start starts the client event-loop, handing IO events.
func (cli *Client) Start() error {
cli.el.eventHandler.OnInitComplete(Server{})
go cli.el.activateSubReactor(cli.opts.LockOSThread)
cli.el.svr.wg.Add(1)
go func() {
cli.el.activateSubReactor(cli.opts.LockOSThread)
cli.el.svr.wg.Done()
}()
// Start the ticker.
if cli.opts.Ticker {
go cli.el.loopTicker(cli.el.svr.tickerCtx)
Expand All @@ -93,7 +97,8 @@ func (cli *Client) Start() error {
// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
err = cli.el.poller.UrgentTrigger(func(_ interface{}) error { return gerrors.ErrServerShutdown }, nil)
cli.el.svr.waitForShutdown()
cli.el.svr.wg.Wait()
cli.el.poller.Close()
cli.el.eventHandler.OnShutdown(Server{})
// Stop the ticker.
if cli.opts.Ticker {
Expand All @@ -107,49 +112,61 @@ func (cli *Client) Stop() (err error) {
}

// Dial is like net.Dial().
func (cli *Client) Dial(network, address string) (gc Conn, err error) {
var c net.Conn
c, err = net.Dial(network, address)
func (cli *Client) Dial(network, address string) (Conn, error) {
c, err := net.Dial(network, address)
if err != nil {
return nil, err
}
defer c.Close()

v, ok := c.(interface{ File() (*os.File, error) })
sc, ok := c.(interface {
SyscallConn() (syscall.RawConn, error)
})
if !ok {
return nil, gerrors.ErrUnsupportedProtocol
}
rc, err := sc.SyscallConn()
if err != nil {
return nil, gerrors.ErrUnsupportedProtocol
}

var file *os.File
file, err = v.File()
var DupFD int
e := rc.Control(func(fd uintptr) {
DupFD, err = unix.Dup(int(fd))
})
if err != nil {
return nil, err
}
fd := int(file.Fd())
if e != nil {
return nil, e
}

var sockAddr unix.Sockaddr
var (
sockAddr unix.Sockaddr
gc Conn
)
switch c.(type) {
case *net.UnixConn:
if sockAddr, _, _, err = socket.GetUnixSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil {
return
return nil, err
}
gc = newTCPConn(fd, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr())
gc = newTCPConn(DupFD, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr())
case *net.TCPConn:
if sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil {
return nil, err
}
gc = newTCPConn(fd, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr())
gc = newTCPConn(DupFD, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr())
case *net.UDPConn:
if sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil {
return
return nil, err
}
gc = newUDPConn(fd, cli.el, c.LocalAddr(), sockAddr)
gc = newUDPConn(DupFD, cli.el, c.LocalAddr(), sockAddr)
default:
return nil, gerrors.ErrUnsupportedProtocol
}
err = cli.el.poller.UrgentTrigger(cli.el.loopRegister, gc)
if err != nil {
gc.Close()
}
return
return gc, nil
}
7 changes: 1 addition & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (s *testCodecClientServer) OnOpened(c Conn) (out []byte, action Action) {

func (s *testCodecClientServer) OnClosed(c Conn, err error) (action Action) {
require.Equal(s.tester, c.Context(), c, "invalid context")

atomic.AddInt32(&s.disconnected, 1)
if atomic.LoadInt32(&s.connected) == atomic.LoadInt32(&s.disconnected) &&
atomic.LoadInt32(&s.disconnected) == 1 {
Expand All @@ -201,9 +200,8 @@ func (s *testCodecClientServer) React(packet []byte, c Conn) (out []byte, action
}

func (s *testCodecClientServer) Tick() (delay time.Duration, action Action) {
if atomic.LoadInt32(&s.started) == 0 {
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
go startCodecGnetClient(s.tester, s.client, s.clientEV, s.network, s.addr, s.multicore, s.async, s.codec)
atomic.StoreInt32(&s.started, 1)
}
delay = time.Second / 5
return
Expand Down Expand Up @@ -277,9 +275,6 @@ func testCodecServeWithGnetClient(
WithMulticore(multicore),
WithTicker(true),
WithLogLevel(zapcore.DebugLevel),
WithTCPKeepAlive(
time.Minute*5,
),
WithSocketRecvBuffer(8*1024),
WithSocketSendBuffer(8*1024),
WithCodec(codec),
Expand Down

0 comments on commit a5ac95a

Please sign in to comment.