Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
tiechui1994 committed May 1, 2024
1 parent 76f0328 commit 881591d
Showing 1 changed file with 154 additions and 14 deletions.
168 changes: 154 additions & 14 deletions tcp/net/gnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ Reactor 实现:
初始化时 eventLoop.accept 方法注册到 Epoll 中 (Read Event), 用于接收请求. (polling 当中用于处理 accept,
read, write)

eventLoop

create N eventLoop:
```
```cgo
func (eng *engine) activateEventLoops(numEventLoop int) (err error)
...
ln := eng.ln
Expand Down Expand Up @@ -81,7 +79,7 @@ func (eng *engine) activateEventLoops(numEventLoop int) (err error)
```

run: 后台执行, Epoll 轮询(核心)
```
```cgo
func (el *eventloop) run() error {
if el.engine.opts.LockOSThread {
runtime.LockOSThread()
Expand Down Expand Up @@ -119,7 +117,7 @@ func (el *eventloop) run() error {
return nil
}
// 新的连接事件, 与前面的 activateEventLoops 当中设置回调对应
// 新的连接事件, 与前面的 activateEventLoops 当中设置回调对应.
return el.accept(fd, ev)
})
Expand All @@ -137,9 +135,9 @@ func (el *eventloop) run() error {
}
```

Polling 逻辑的实现: blocking waiting + infinite loop
Polling 逻辑的实现: blocking waiting + infinite loop

```
```cgo
// Polling blocks the current goroutine, waiting for network-events.
func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
el := newEventList(InitPollEventsCap)
Expand Down Expand Up @@ -234,8 +232,8 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
}
```

accept: 接收到 Read Event 的回调函数.
```
accept: 针对 listener fd 的 Read Event 被调用的函数
```cgo
func (el *eventloop) accept(fd int, ev netpoll.IOEvent) error {
if el.ln.network == "udp" {
return el.readUDP(fd, ev)
Expand Down Expand Up @@ -265,7 +263,7 @@ func (el *eventloop) accept(fd int, ev netpoll.IOEvent) error {
logging.Error(err)
}
// 注: 将 conn 的 Handler 注册到 poller 的 Read Event
// 注册 conn 的 Read Event 到 poller
c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
if err = el.poller.AddRead(&c.pollAttachment); err != nil {
return err
Expand All @@ -275,9 +273,9 @@ func (el *eventloop) accept(fd int, ev netpoll.IOEvent) error {
}
```

open: 回调 OnOpen 函数
open: TCP 刚建立, 需要触发 OnOpen 回调, 如果用户需要在 TCP 连接刚建立发送数据, 注册 conn 的 Write Event 到 poller

```
```cgo
func (el *eventloop) open(c *conn) error {
c.opened = true
Expand All @@ -301,7 +299,7 @@ func (el *eventloop) open(c *conn) error {

### Reactor 模型(Linux)

(1 Main)EventLoop( polling 只处理 accept ) + (N Sub)EventLoop ( polling 只处理 read, write)
(1 Main)EventLoop( polling 只处理 accept ) + (N Sub)EventLoop ( polling 处理 read, write)

在这种模型下, 会创建 N (取决于 CPU 核心数, 配置的 NumEventLoop, 最大值不超过 256) 个 Sub eventLoop 对象. 每个
eventLoop 都会与一个 Epoll 与之绑定 (只处理 read, write).
Expand All @@ -311,7 +309,82 @@ eventLoop 都会与一个 Epoll 与之绑定 (只处理 read, write).
accept: 绝大多数逻辑与 eventLoop 的 accept 类似. 只是在连接建立后, 通过 LSB 选择一个 EventLoop, 将这个请求交给
该 EventLoop (触发了一个 UrgentTrigger 事件, 添加 Read Event), 后续由该 EventLoop 处理.

// create N Sub EventLoop, 1 Main EventLoop
```cgo
func (eng *engine) activateReactors(numEventLoop int) error {
// N 个 Sub EventLoop, 每个 EventLoop 都绑定一个 Poller
// 注: OpenPoller() 当中会监听一个用于处理优先级任务的 Read Event (efd)
for i := 0; i < numEventLoop; i++ {
if p, err := netpoll.OpenPoller(); err == nil {
el := new(eventloop)
el.ln = eng.ln
el.engine = eng
el.poller = p
el.buffer = make([]byte, eng.opts.ReadBufferCap)
el.connections.init()
el.eventHandler = eng.eventHandler
eng.eventLoops.register(el)
} else {
return err
}
}
// 后台运行 N 个 Sub EventLoop 的 Polling, 主要用于处理 Read, Write Event
eng.startSubReactors()
// 启动 Main EventLoop, 添加 listenr 的 fd 的 Read Event 到该 poller 中
if p, err := netpoll.OpenPoller(); err == nil {
el := new(eventloop)
el.ln = eng.ln
el.idx = -1
el.engine = eng
el.poller = p
el.eventHandler = eng.eventHandler
if err = el.poller.AddRead(eng.ln.packPollAttachment(eng.accept)); err != nil {
return err
}
eng.acceptor = el
// 后台运行 Main EventLoop, Polling, 主要用于处理 Accept
eng.workerPool.Go(el.activateMainReactor)
} else {
return err
}
// Start the ticker.
if eng.opts.Ticker {
eng.workerPool.Go(func() error {
eng.acceptor.ticker(eng.ticker.ctx)
return nil
})
}
return nil
}
```

Main EventLoop 后台任务:
```cgo
func (el *eventloop) activateMainReactor() error {
if el.engine.opts.LockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}
// Polling 回调 engine.accept()
// 相较于 EventLoop.run, 这里只处理了 Accept
err := el.poller.Polling(func(fd int, ev uint32) error { return el.engine.accept(fd, ev) })
if err == errors.ErrEngineShutdown {
el.engine.opts.Logger.Debugf("main reactor is exiting in terms of the demand from user, %v", err)
err = nil
} else if err != nil {
el.engine.opts.Logger.Errorf("main reactor is exiting due to error: %v", err)
}
el.engine.shutdown(err)
return err
}
func (eng *engine) accept(fd int, _ netpoll.IOEvent) error {
nfd, sa, err := unix.Accept(fd)
if err != nil {
Expand All @@ -336,8 +409,11 @@ func (eng *engine) accept(fd int, _ netpoll.IOEvent) error {
logging.Error(err)
}
// 选择一个 EventLoop, 将当前的 conn 添加到其中, 后续由该 EventLoop 的 Poller 处理该 conn
// 的 Read, Write Event
el := eng.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
// 这里会触发选择的 EventLoop 的 Poller 的 efd 的 Read Event
err = el.poller.UrgentTrigger(el.register, c)
if err != nil {
eng.opts.Logger.Errorf("UrgentTrigger() failed due to error: %v", err)
Expand All @@ -346,6 +422,70 @@ func (eng *engine) accept(fd int, _ netpoll.IOEvent) error {
}
return nil
}
// 添加 conn 到 EventLoop 当中
func (el *eventloop) register(itf interface{}) error {
c := itf.(*conn)
if err := el.poller.AddRead(&c.pollAttachment); err != nil {
_ = unix.Close(c.fd)
c.release()
return err
}
el.connections.addConn(c, el.idx)
if c.isDatagram {
return nil
}
return el.open(c)
}
```

一旦 conn 到达 Sub EventLoop 后, 后续的处理方式与前面的逻辑是一样的.
Sub EventLoop 后台任务:
```cgo
func (el *eventloop) activateSubReactor() error {
if el.engine.opts.LockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}
// 相较于 EventLoop.run, 这里只处理了 Read, Write Event
err := el.poller.Polling(func(fd int, ev uint32) error {
if c := el.connections.getConn(fd); c != nil {
// Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN unless you're 100%
// sure what you're doing!
// Re-ordering can easily introduce bugs and bad side-effects, as I found out painfully in the past.
// We should always check for the EPOLLOUT event first, as we must try to send the leftover data back to
// the peer when any error occurs on a connection.
//
// Either an EPOLLOUT or EPOLLERR event may be fired when a connection is refused.
// In either case write() should take care of it properly:
// 1) writing data back,
// 2) closing the connection.
if ev&netpoll.OutEvents != 0 && !c.outboundBuffer.IsEmpty() {
if err := el.write(c); err != nil {
return err
}
}
if ev&netpoll.InEvents != 0 {
return el.read(c)
}
return nil
}
return nil
})
if err == errors.ErrEngineShutdown {
el.engine.opts.Logger.Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
err = nil
} else if err != nil {
el.engine.opts.Logger.Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err)
}
el.closeConns()
el.engine.shutdown(err)
return err
}
```

0 comments on commit 881591d

Please sign in to comment.