Skip to content

Commit

Permalink
feat: support multiple network addresses binding (#578)
Browse files Browse the repository at this point in the history
Fixes #428
  • Loading branch information
panjf2000 committed Apr 21, 2024
1 parent 791cc48 commit bf7121d
Show file tree
Hide file tree
Showing 21 changed files with 646 additions and 358 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ English | [中文](README_ZH.md)
- [x] Implementation of `gnet` Client
- [x] **Windows** platform support (For compatibility in development only, do not use it in production)
- [x] **Edge-triggered** I/O support
- [ ] Multiple network addresses binding
- [x] Multiple network addresses binding
- [ ] **TLS** support
- [ ] [io_uring](https://kernel.dk/io_uring.pdf) support

Expand Down
2 changes: 1 addition & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
- [x] 实现 `gnet` 客户端
- [x] 支持 **Windows** 平台 (仅用于开发环境的兼容性,不要在生产环境中使用)
- [x] **Edge-triggered** I/O 支持
- [ ] 多网络地址绑定
- [x] 多网络地址绑定
- [ ] 支持 **TLS**
- [ ] 支持 [io_uring](https://kernel.dk/io_uring.pdf)

Expand Down
10 changes: 5 additions & 5 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if eng.opts.TCPKeepAlive > 0 && eng.ln.network == "tcp" {
if eng.opts.TCPKeepAlive > 0 && eng.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(eng.opts.TCPKeepAlive.Seconds()))
logging.Error(err)
}

el := eng.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err)
Expand All @@ -62,7 +62,7 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
}

func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
if el.ln.network == "udp" {
if el.listeners[fd].network == "udp" {
return el.readUDP1(fd, ev, flags)
}

Expand All @@ -81,12 +81,12 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if el.engine.opts.TCPKeepAlive > 0 && el.ln.network == "tcp" {
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive/time.Second))
logging.Error(err)
}

c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
addEvents := el.poller.AddRead
if el.engine.opts.EdgeTriggeredIO {
addEvents = el.poller.AddReadWrite
Expand Down
100 changes: 54 additions & 46 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,64 +23,72 @@ import (
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
)

func (eng *engine) listen() (err error) {
func (eng *engine) listenStream(ln net.Listener) (err error) {
if eng.opts.LockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer func() { eng.shutdown(err) }()

var buffer [0x10000]byte
for {
if eng.ln.pc != nil {
// Read data from UDP socket.
n, addr, e := eng.ln.pc.ReadFrom(buffer[:])
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: errors.Join() is not supported until Go 1.20,
// we will uncomment this line after we bump up the
// minimal supported go version to 1.20.
// err = errors.Join(err, errorx.ErrEngineShutdown)
// Accept TCP socket.
tc, e := ln.Accept()
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: errors.Join() is not supported until Go 1.20,
// we will uncomment this line after we bump up the
// minimal supported go version to 1.20.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
return
}
el := eng.eventLoops.next(tc.RemoteAddr())
c := newTCPConn(tc, el)
el.ch <- &openConn{c: c}
go func(c *conn, tc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
n, err := tc.Read(buffer[:])
if err != nil {
el.ch <- &netErr{c, err}
return
}
return
el.ch <- packTCPConn(c, buffer[:n])
}
}(c, tc, el)
}
}

el := eng.eventLoops.next(addr)
c := newUDPConn(el, eng.ln.addr, addr)
el.ch <- packUDPConn(c, buffer[:n])
} else {
// Accept TCP socket.
tc, e := eng.ln.ln.Accept()
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: ditto.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
return
func (eng *engine) ListenUDP(pc net.PacketConn) (err error) {
if eng.opts.LockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer func() { eng.shutdown(err) }()

var buffer [0x10000]byte
for {
// Read data from UDP socket.
n, addr, e := pc.ReadFrom(buffer[:])
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: ditto.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
el := eng.eventLoops.next(tc.RemoteAddr())
c := newTCPConn(tc, el)
el.ch <- &openConn{c: c}
go func(c *conn, tc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
n, err := tc.Read(buffer[:])
if err != nil {
el.ch <- &netErr{c, err}
return
}
el.ch <- packTCPConn(c, buffer[:n])
}
}(c, tc, el)
return
}
el := eng.eventLoops.next(addr)
c := newUDPConn(el, pc, pc.LocalAddr(), addr)
el.ch <- packUDPConn(c, buffer[:n])
}
}
8 changes: 4 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestClient(t *testing.T) {
})
})

t.Run("poll-LT-reuseport", func(t *testing.T) {
t.Run("poll-reuseport-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestClient(t *testing.T) {
})
})

t.Run("poll-ET-reuseport", func(t *testing.T) {
t.Run("poll-reuseport-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
Expand Down Expand Up @@ -405,7 +405,7 @@ func (s *testClient) OnTraffic(c Conn) (action Action) {
}

func (s *testClient) OnTick() (delay time.Duration, action Action) {
delay = time.Second / 5
delay = 100 * time.Millisecond
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
atomic.AddInt32(&s.clientActive, 1)
Expand Down Expand Up @@ -484,7 +484,7 @@ func startGnetClient(t *testing.T, cli *Client, network, addr string, multicore,
require.NoError(t, err)
rspCh := handler.rspCh
duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2
t.Logf("test duration: %dms", duration/time.Millisecond)
logging.Debugf("test duration: %v", duration)
start := time.Now()
for time.Since(start) < duration {
reqData := make([]byte, streamLen)
Expand Down
13 changes: 7 additions & 6 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {

shutdownCtx, shutdown := context.WithCancel(context.Background())
eng := engine{
ln: &listener{},
listeners: make(map[int]*listener),
opts: options,
eventHandler: eh,
workerPool: struct {
Expand All @@ -82,9 +82,9 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
}
el := eventloop{
ln: eng.ln,
engine: &eng,
poller: p,
listeners: eng.listeners,
engine: &eng,
poller: p,
}

rbc := options.ReadBufferCap
Expand Down Expand Up @@ -115,7 +115,8 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {

// Start starts the client event-loop, handing IO events.
func (cli *Client) Start() error {
cli.el.eventHandler.OnBoot(Engine{})
logging.Infof("Starting gnet client with 1 event-loop")
cli.el.eventHandler.OnBoot(Engine{cli.el.engine})
cli.el.engine.workerPool.Go(cli.el.run)
// Start the ticker.
if cli.opts.Ticker {
Expand All @@ -134,7 +135,7 @@ func (cli *Client) Stop() (err error) {
}
_ = cli.el.engine.workerPool.Wait()
logging.Error(cli.el.poller.Close())
cli.el.eventHandler.OnShutdown(Engine{})
cli.el.eventHandler.OnShutdown(Engine{cli.el.engine})
logging.Cleanup()
return
}
Expand Down
12 changes: 6 additions & 6 deletions client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {

shutdownCtx, shutdown := context.WithCancel(context.Background())
eng := &engine{
ln: &listener{},
opts: options,
listeners: []*listener{},
opts: options,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
Expand All @@ -70,7 +70,7 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
}

func (cli *Client) Start() error {
cli.el.eventHandler.OnBoot(Engine{})
cli.el.eventHandler.OnBoot(Engine{cli.el.eng})
cli.el.eng.workerPool.Go(cli.el.run)
if cli.opts.Ticker {
cli.el.eng.ticker.ctx, cli.el.eng.ticker.cancel = context.WithCancel(context.Background())
Expand All @@ -89,7 +89,7 @@ func (cli *Client) Stop() (err error) {
cli.el.eng.ticker.cancel()
}
_ = cli.el.eng.workerPool.Wait()
cli.el.eventHandler.OnShutdown(Engine{})
cli.el.eventHandler.OnShutdown(Engine{cli.el.eng})
logging.Cleanup()
return
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err err
}(c, nc, cli.el)
gc = c
case *net.UDPConn:
c := newUDPConn(cli.el, nc.LocalAddr(), nc.RemoteAddr())
c := newUDPConn(cli.el, nil, nc.LocalAddr(), nc.RemoteAddr())
c.SetContext(ctx)
c.rawConn = nc
cli.el.ch <- &openConn{c: c, isDatagram: true, cb: func() { close(connOpened) }}
Expand All @@ -213,7 +213,7 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err err
if err != nil {
return
}
c := newUDPConn(cli.el, uc.LocalAddr(), uc.RemoteAddr())
c := newUDPConn(cli.el, nil, uc.LocalAddr(), uc.RemoteAddr())
c.SetContext(ctx)
c.rawConn = uc
el.ch <- packUDPConn(c, buffer[:n])
Expand Down
6 changes: 3 additions & 3 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func (c *conn) release() {
c.isEOF = false
c.ctx = nil
c.buffer = nil
if addr, ok := c.localAddr.(*net.TCPAddr); ok && c.localAddr != c.loop.ln.addr && len(addr.Zone) > 0 {
if addr, ok := c.localAddr.(*net.TCPAddr); ok && len(c.loop.listeners) == 0 && len(addr.Zone) > 0 {
bsPool.Put(bs.StringToBytes(addr.Zone))
}
if addr, ok := c.remoteAddr.(*net.TCPAddr); ok && len(addr.Zone) > 0 {
bsPool.Put(bs.StringToBytes(addr.Zone))
}
if addr, ok := c.localAddr.(*net.UDPAddr); ok && c.localAddr != c.loop.ln.addr && len(addr.Zone) > 0 {
if addr, ok := c.localAddr.(*net.UDPAddr); ok && len(c.loop.listeners) == 0 && len(addr.Zone) > 0 {
bsPool.Put(bs.StringToBytes(addr.Zone))
}
if addr, ok := c.remoteAddr.(*net.UDPAddr); ok && len(addr.Zone) > 0 {
Expand Down Expand Up @@ -451,7 +451,7 @@ func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
// func (c *conn) Gfd() gfd.GFD { return c.gfd }

func (c *conn) Fd() int { return c.fd }
func (c *conn) Dup() (fd int, err error) { fd, _, err = socket.Dup(c.fd); return }
func (c *conn) Dup() (fd int, err error) { return socket.Dup(c.fd) }
func (c *conn) SetReadBuffer(bytes int) error { return socket.SetRecvBuffer(c.fd, bytes) }
func (c *conn) SetWriteBuffer(bytes int) error { return socket.SetSendBuffer(c.fd, bytes) }
func (c *conn) SetLinger(sec int) error { return socket.SetLinger(c.fd, sec) }
Expand Down
Loading

0 comments on commit bf7121d

Please sign in to comment.