Skip to content

Commit

Permalink
feat: support edge-triggered I/O (panjf2000#576)
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 authored and andyl committed Apr 22, 2024
1 parent ef12c18 commit 5025980
Show file tree
Hide file tree
Showing 32 changed files with 1,154 additions and 737 deletions.
12 changes: 8 additions & 4 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
// ECONNABORTED means that a socket on the listen
// ECONNABORTED indicates that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
return nil
Expand Down Expand Up @@ -66,11 +66,11 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
return el.readUDP1(fd, ev, flags)
}

nfd, sa, err := socket.Accept(el.ln.fd)
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
// ECONNABORTED means that a socket on the listen
// ECONNABORTED indicates that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
return nil
Expand All @@ -87,7 +87,11 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
}

c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
if err = el.poller.AddRead(&c.pollAttachment); err != nil {
addEvents := el.poller.AddRead
if el.engine.opts.EdgeTriggeredIO {
addEvents = el.poller.AddReadWrite
}
if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
return err
}
el.connections.addConn(c, el.idx)
Expand Down
178 changes: 140 additions & 38 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type connHandler struct {
type clientEvents struct {
*BuiltinEventEngine
tester *testing.T
svr *testClientServer
svr *testClient
packetLen int
}

Expand Down Expand Up @@ -87,117 +87,219 @@ func (ev *clientEvents) OnShutdown(e Engine) {
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
}

func TestServeWithGnetClient(t *testing.T) {
func TestClient(t *testing.T) {
// start an engine
// connect 10 clients
// each client will pipe random data for 1-3 seconds.
// the writes to the engine will be random sizes. 0KB - 1MB.
// the engine will echo back the data.
// waits for graceful connection closing.
t.Run("poll", func(t *testing.T) {
t.Run("poll-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
})
})
})

t.Run("poll-reuseport", func(t *testing.T) {
t.Run("poll-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", true, false, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", true, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", true, false, true, true, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", true, false, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", true, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", true, false, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", true, false, true, false, 10, SourceAddrHash)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", true, false, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", true, false, true, true, 10, SourceAddrHash)
})
})
})

t.Run("poll-LT-reuseport", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections)
})
})
})

t.Run("poll-ET-reuseport", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
})
})
})
}

type testClientServer struct {
type testClient struct {
*BuiltinEventEngine
client *Client
tester *testing.T
Expand All @@ -215,20 +317,20 @@ type testClientServer struct {
udpReadHeader int32
}

func (s *testClientServer) OnBoot(eng Engine) (action Action) {
func (s *testClient) OnBoot(eng Engine) (action Action) {
s.eng = eng
return
}

func (s *testClientServer) OnOpen(c Conn) (out []byte, action Action) {
func (s *testClient) OnOpen(c Conn) (out []byte, action Action) {
c.SetContext(&sync.Once{})
atomic.AddInt32(&s.connected, 1)
require.NotNil(s.tester, c.LocalAddr(), "nil local addr")
require.NotNil(s.tester, c.RemoteAddr(), "nil remote addr")
return
}

func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
func (s *testClient) OnClose(c Conn, err error) (action Action) {
if err != nil {
logging.Debugf("error occurred on closed, %v\n", err)
}
Expand All @@ -246,13 +348,13 @@ func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
return
}

func (s *testClientServer) OnShutdown(Engine) {
func (s *testClient) OnShutdown(Engine) {
if s.network == "udp" {
require.EqualValues(s.tester, int32(s.nclients), atomic.LoadInt32(&s.udpReadHeader))
}
}

func (s *testClientServer) OnTraffic(c Conn) (action Action) {
func (s *testClient) OnTraffic(c Conn) (action Action) {
readHeader := func() {
ping := make([]byte, len(pingMsg))
n, err := io.ReadFull(c, ping)
Expand Down Expand Up @@ -302,7 +404,7 @@ func (s *testClientServer) OnTraffic(c Conn) (action Action) {
return
}

func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
func (s *testClient) OnTick() (delay time.Duration, action Action) {
delay = time.Second / 5
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
Expand All @@ -321,8 +423,8 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
return
}

func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async bool, nclients int, lb LoadBalancing) {
ts := &testClientServer{
func runClient(t *testing.T, network, addr string, et, reuseport, multicore, async bool, nclients int, lb LoadBalancing) {
ts := &testClient{
tester: t,
network: network,
addr: addr,
Expand All @@ -347,10 +449,10 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus

err = Run(ts,
network+"://"+addr,
WithEdgeTriggeredIO(et),
WithLockOSThread(async),
WithMulticore(multicore),
WithReusePort(reuseport),
WithReuseAddr(reuseaddr),
WithTicker(true),
WithTCPKeepAlive(time.Minute*1),
WithTCPNoDelay(TCPDelay),
Expand Down
Loading

0 comments on commit 5025980

Please sign in to comment.