From a5ac95a5057fb82e2f71cb6a7f4ffed83c967efb Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 22 Nov 2021 02:02:02 +0800 Subject: [PATCH] fix: resolve all test failures of gnet client Fixes #260 --- .github/workflows/ci.yml | 2 ++ .github/workflows/ci_poll_opt.yml | 2 ++ .github/workflows/codeql.yml | 8 +++-- .github/workflows/lint.yml | 2 ++ client.go | 51 ++++++++++++++++++++----------- client_test.go | 7 +---- 6 files changed, 47 insertions(+), 25 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a03bac276..7e8a2aa27 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,11 +4,13 @@ on: push: branches: - master + - dev paths-ignore: - '**.md' pull_request: branches: - master + - dev paths-ignore: - '**.md' diff --git a/.github/workflows/ci_poll_opt.yml b/.github/workflows/ci_poll_opt.yml index a4e886edd..b0af0b214 100644 --- a/.github/workflows/ci_poll_opt.yml +++ b/.github/workflows/ci_poll_opt.yml @@ -4,11 +4,13 @@ on: push: branches: - master + - dev paths-ignore: - '**.md' pull_request: branches: - master + - dev paths-ignore: - '**.md' diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 33e225bdc..097d4344c 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -2,11 +2,15 @@ name: CodeQL on: push: - branches: [master] + branches: + - master + - dev paths-ignore: - '**.md' pull_request: - branches: [master] + branches: + - master + - dev paths-ignore: - '**.md' schedule: diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 951c55ec8..b57f14311 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -4,11 +4,13 @@ on: push: branches: - master + - dev paths-ignore: - '**.md' pull_request: branches: - master + - dev paths-ignore: - '**.md' diff --git a/client.go b/client.go index eecc125f6..d642e760b 100644 --- a/client.go +++ b/client.go @@ -20,8 +20,8 @@ package gnet import ( "context" "net" - "os" "sync" + "syscall" "golang.org/x/sys/unix" @@ -82,7 +82,11 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro // Start starts the client event-loop, handing IO events. func (cli *Client) Start() error { cli.el.eventHandler.OnInitComplete(Server{}) - go cli.el.activateSubReactor(cli.opts.LockOSThread) + cli.el.svr.wg.Add(1) + go func() { + cli.el.activateSubReactor(cli.opts.LockOSThread) + cli.el.svr.wg.Done() + }() // Start the ticker. if cli.opts.Ticker { go cli.el.loopTicker(cli.el.svr.tickerCtx) @@ -93,7 +97,8 @@ func (cli *Client) Start() error { // Stop stops the client event-loop. func (cli *Client) Stop() (err error) { err = cli.el.poller.UrgentTrigger(func(_ interface{}) error { return gerrors.ErrServerShutdown }, nil) - cli.el.svr.waitForShutdown() + cli.el.svr.wg.Wait() + cli.el.poller.Close() cli.el.eventHandler.OnShutdown(Server{}) // Stop the ticker. if cli.opts.Ticker { @@ -107,43 +112,55 @@ func (cli *Client) Stop() (err error) { } // Dial is like net.Dial(). -func (cli *Client) Dial(network, address string) (gc Conn, err error) { - var c net.Conn - c, err = net.Dial(network, address) +func (cli *Client) Dial(network, address string) (Conn, error) { + c, err := net.Dial(network, address) if err != nil { return nil, err } defer c.Close() - v, ok := c.(interface{ File() (*os.File, error) }) + sc, ok := c.(interface { + SyscallConn() (syscall.RawConn, error) + }) if !ok { return nil, gerrors.ErrUnsupportedProtocol } + rc, err := sc.SyscallConn() + if err != nil { + return nil, gerrors.ErrUnsupportedProtocol + } - var file *os.File - file, err = v.File() + var DupFD int + e := rc.Control(func(fd uintptr) { + DupFD, err = unix.Dup(int(fd)) + }) if err != nil { return nil, err } - fd := int(file.Fd()) + if e != nil { + return nil, e + } - var sockAddr unix.Sockaddr + var ( + sockAddr unix.Sockaddr + gc Conn + ) switch c.(type) { case *net.UnixConn: if sockAddr, _, _, err = socket.GetUnixSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil { - return + return nil, err } - gc = newTCPConn(fd, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr()) + gc = newTCPConn(DupFD, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr()) case *net.TCPConn: if sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil { return nil, err } - gc = newTCPConn(fd, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr()) + gc = newTCPConn(DupFD, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr()) case *net.UDPConn: if sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil { - return + return nil, err } - gc = newUDPConn(fd, cli.el, c.LocalAddr(), sockAddr) + gc = newUDPConn(DupFD, cli.el, c.LocalAddr(), sockAddr) default: return nil, gerrors.ErrUnsupportedProtocol } @@ -151,5 +168,5 @@ func (cli *Client) Dial(network, address string) (gc Conn, err error) { if err != nil { gc.Close() } - return + return gc, nil } diff --git a/client_test.go b/client_test.go index b668d0c23..5ebdcd6d4 100644 --- a/client_test.go +++ b/client_test.go @@ -176,7 +176,6 @@ func (s *testCodecClientServer) OnOpened(c Conn) (out []byte, action Action) { func (s *testCodecClientServer) OnClosed(c Conn, err error) (action Action) { require.Equal(s.tester, c.Context(), c, "invalid context") - atomic.AddInt32(&s.disconnected, 1) if atomic.LoadInt32(&s.connected) == atomic.LoadInt32(&s.disconnected) && atomic.LoadInt32(&s.disconnected) == 1 { @@ -201,9 +200,8 @@ func (s *testCodecClientServer) React(packet []byte, c Conn) (out []byte, action } func (s *testCodecClientServer) Tick() (delay time.Duration, action Action) { - if atomic.LoadInt32(&s.started) == 0 { + if atomic.CompareAndSwapInt32(&s.started, 0, 1) { go startCodecGnetClient(s.tester, s.client, s.clientEV, s.network, s.addr, s.multicore, s.async, s.codec) - atomic.StoreInt32(&s.started, 1) } delay = time.Second / 5 return @@ -277,9 +275,6 @@ func testCodecServeWithGnetClient( WithMulticore(multicore), WithTicker(true), WithLogLevel(zapcore.DebugLevel), - WithTCPKeepAlive( - time.Minute*5, - ), WithSocketRecvBuffer(8*1024), WithSocketSendBuffer(8*1024), WithCodec(codec),