Skip to content

Commit

Permalink
feat: invoke OnClosed() when a UDP socket is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Dec 4, 2021
1 parent 72cd0ea commit 7be4b2a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
35 changes: 22 additions & 13 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,48 @@ import (

type clientEvents struct {
*EventServer
svr *testClientServer
packetLen int
rspChMap sync.Map
}

func (cli *clientEvents) OnOpened(c Conn) ([]byte, Action) {
func (ev *clientEvents) OnOpened(c Conn) ([]byte, Action) {
c.SetContext([]byte{})
rspCh := make(chan []byte, 1)
cli.rspChMap.Store(c.LocalAddr().String(), rspCh)
ev.rspChMap.Store(c.LocalAddr().String(), rspCh)
return nil, None
}

func (cli *clientEvents) React(packet []byte, c Conn) (out []byte, action Action) {
func (ev *clientEvents) OnClosed(c Conn, err error) Action {
if ev.svr != nil {
if atomic.AddInt32(&ev.svr.clientActive, -1) == 0 {
return Shutdown
}
}
return None
}

func (ev *clientEvents) React(packet []byte, c Conn) (out []byte, action Action) {
ctx := c.Context()
var p []byte
if ctx != nil {
p = ctx.([]byte)
} else { // UDP
cli.packetLen = 1024
ev.packetLen = 1024
}
p = append(p, packet...)
if len(p) < cli.packetLen {
if len(p) < ev.packetLen {
c.SetContext(p)
return
}
v, _ := cli.rspChMap.Load(c.LocalAddr().String())
v, _ := ev.rspChMap.Load(c.LocalAddr().String())
rspCh := v.(chan []byte)
rspCh <- p
c.SetContext([]byte{})
return
}

func (cli *clientEvents) Tick() (delay time.Duration, action Action) {
func (ev *clientEvents) Tick() (delay time.Duration, action Action) {
delay = 200 * time.Millisecond
return
}
Expand Down Expand Up @@ -549,7 +559,9 @@ func (s *testClientServer) OnClosed(c Conn, err error) (action Action) {
if err != nil {
logging.Debugf("error occurred on closed, %v\n", err)
}
require.Equal(s.tester, c.Context(), c, "invalid context")
if s.network != "udp" {
require.Equal(s.tester, c.Context(), c, "invalid context")
}

atomic.AddInt32(&s.disconnected, 1)
if atomic.LoadInt32(&s.connected) == atomic.LoadInt32(&s.disconnected) &&
Expand Down Expand Up @@ -593,10 +605,7 @@ func (s *testClientServer) Tick() (delay time.Duration, action Action) {
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
atomic.AddInt32(&s.clientActive, 1)
go func() {
startGnetClient(s.tester, s.client, s.clientEV, s.network, s.addr, s.multicore, s.async)
atomic.AddInt32(&s.clientActive, -1)
}()
go startGnetClient(s.tester, s.client, s.clientEV, s.network, s.addr, s.multicore, s.async)
}
}
if s.network == "udp" && atomic.LoadInt32(&s.clientActive) == 0 {
Expand All @@ -618,7 +627,7 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus
workerPool: goroutine.Default(),
}
var err error
ts.clientEV = &clientEvents{packetLen: streamLen}
ts.clientEV = &clientEvents{packetLen: streamLen, svr: ts}
ts.client, err = NewClient(
ts.clientEV,
WithLogLevel(logging.DebugLevel),
Expand Down
20 changes: 18 additions & 2 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"os"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -65,10 +66,10 @@ func (el *eventloop) closeAllSockets() {
_ = el.loopCloseConn(c, nil)
}
for _, c := range el.clientUDPSockets {
c.releaseUDP()
_ = el.loopCloseConn(c, nil)
}
for _, c := range el.serverUDPSockets {
c.releaseUDP()
_ = el.loopCloseConn(c, nil)
}
}

Expand Down Expand Up @@ -174,6 +175,21 @@ func (el *eventloop) loopWrite(c *conn) error {
}

func (el *eventloop) loopCloseConn(c *conn, err error) (rerr error) {
if addr := c.localAddr; addr != nil && strings.HasPrefix(c.localAddr.Network(), "udp") {
rerr = el.poller.Delete(c.fd)
if c.fd == el.ln.fd {
el.serverUDPSockets = nil
} else {
rerr = unix.Close(c.fd)
delete(el.clientUDPSockets, c.fd)
}
if el.eventHandler.OnClosed(c, err) == Shutdown {
return gerrors.ErrServerShutdown
}
c.releaseUDP()
return
}

if !c.opened {
return
}
Expand Down
4 changes: 3 additions & 1 deletion gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ func (s *testServer) OnClosed(c Conn, err error) (action Action) {
if err != nil {
logging.Debugf("error occurred on closed, %v\n", err)
}
require.Equal(s.tester, c.Context(), c, "invalid context")
if s.network != "udp" {
require.Equal(s.tester, c.Context(), c, "invalid context")
}

atomic.AddInt32(&s.disconnected, 1)
if atomic.LoadInt32(&s.connected) == atomic.LoadInt32(&s.disconnected) &&
Expand Down

0 comments on commit 7be4b2a

Please sign in to comment.