Skip to content

Commit

Permalink
opt: Leverage system calls instead of net package to create listeners
Browse files Browse the repository at this point in the history
Also enable SO_REUSEPORT to avoid Thundering Herd, besides, Fixes #47 #44 #77
  • Loading branch information
panjf2000 committed Jul 3, 2020
1 parent 31c9dda commit ccc8c64
Show file tree
Hide file tree
Showing 24 changed files with 704 additions and 151 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ os:
- windows

go:
- 1.11.x
# - 1.9.x
# - 1.10.x
# - 1.11.x
- 1.12.x
- 1.13.x
- 1.14.x
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,15 @@ func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.

func main() {
var port int
var multicore bool
var multicore, reuseport bool

// Example command: go run echo.go --port 9000 --multicore=true
// Example command: go run echo.go --port 9000 --multicore=true --reuseport=true
flag.IntVar(&port, "port", 9000, "--port 9000")
flag.BoolVar(&multicore, "multicore", false, "--multicore true")
flag.BoolVar(&reuseport, "reuseport", false, "--reuseport true")
flag.Parse()
echo := new(echoServer)
log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore)))
log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore), gnet.WithReusePort(reuseport)))
}
```
</details>
Expand Down Expand Up @@ -1096,6 +1097,7 @@ Please read the [Contributing Guidelines](CONTRIBUTING.md) before opening a PR a
- [evio](https://github.com/tidwall/evio)
- [netty](https://github.com/netty/netty)
- [ants](https://github.com/panjf2000/ants)
- [go_reuseport](https://github.com/kavu/go_reuseport)
- [bytebufferpool](https://github.com/valyala/bytebufferpool)
- [goframe](https://github.com/smallnest/goframe)
- [ringbuffer](https://github.com/smallnest/ringbuffer)
Expand Down
12 changes: 7 additions & 5 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func main() {
}
```

正如你所见,上面的例子里 `gnet` 实例只注册了一个 `EventHandler.React` 事件。一般来说,主要的业务逻辑代码会写在这个事件方法里,这个方法会在服务器接收到客户端写过来的数据之时被调用,此时的输入参数: `frame` 已经是解码过后的一个完整的网络数据包,一般来说你需要实现 `gnet` 的[codec 接口](https://github.com/panjf2000/gnet/blob/master/codec.go#L18-L24)作为你自己的业务编解码器来处理 TCP 组包和分包的问题,如果你不实现那个接口的话,那么 `gnet` 将会使用[默认的 codec](https://github.com/panjf2000/gnet/blob/master/codec.go#L53-L63),这意味着在 `EventHandler.React` 被触发调用之时输入参数: `frame` 里存储的是所有网络数据:包括最新的以及还在 buffer 里的旧数据,然后处理输入数据(这里只是把数据 echo 回去)并且在处理完之后把需要输出的数据赋值给 `out` 变量并返回,接着输出的数据会经过编码,最后被写回客户端。
正如你所见,上面的例子里 `gnet` 实例只注册了一个 `EventHandler.React` 事件。一般来说,主要的业务逻辑代码会写在这个事件方法里,这个方法会在服务器接收到客户端写过来的数据之时被调用,此时的输入参数: `frame` 已经是解码过后的一个完整的网络数据包,一般来说你需要实现 `gnet` 的 [codec 接口](https://github.com/panjf2000/gnet/blob/master/codec.go#L18-L24)作为你自己的业务编解码器来处理 TCP 组包和分包的问题,如果你不实现那个接口的话,那么 `gnet` 将会使用[默认的 codec](https://github.com/panjf2000/gnet/blob/master/codec.go#L53-L63),这意味着在 `EventHandler.React` 被触发调用之时输入参数: `frame` 里存储的是所有网络数据:包括最新的以及还在 buffer 里的旧数据,然后处理输入数据(这里只是把数据 echo 回去)并且在处理完之后把需要输出的数据赋值给 `out` 变量并返回,接着输出的数据会经过编码,最后被写回客户端。

### 带阻塞逻辑的 echo 服务器

Expand Down Expand Up @@ -298,14 +298,15 @@ func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.

func main() {
var port int
var multicore bool
var multicore, reuseport bool

// Example command: go run echo.go --port 9000 --multicore=true
// Example command: go run echo.go --port 9000 --multicore=true --reuseport=true
flag.IntVar(&port, "port", 9000, "--port 9000")
flag.BoolVar(&multicore, "multicore", false, "--multicore true")
flag.BoolVar(&reuseport, "reuseport", false, "--reuseport true")
flag.Parse()
echo := new(echoServer)
log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore)))
log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore), gnet.WithReusePort(reuseport)))
}
```
</details>
Expand Down Expand Up @@ -989,7 +990,7 @@ events.Tick = func() (delay time.Duration, action Action){

服务器支持 [SO_REUSEPORT](https://lwn.net/Articles/542629/) 端口复用特性,允许多个 sockets 监听同一个端口,然后内核会帮你做好负载均衡,每次只唤醒一个 socket 来处理 `connect` 请求,避免惊群效应。

默认情况下,`gnet` 也不会有惊群效应,因为 `gnet` 默认的网络模型是主从多 Reactors,只会有一个主 reactor 在监听端口以及接受新连接。所以,开不开启 `SO_REUSEPORT` 选项是无关紧要的,只是开启了这个选项之后 `gnet` 的网络模型将会切换成 `evio` 的旧网络模型,这一点需要注意一下。
默认情况下,`gnet` 不会有惊群效应,因为 `gnet` 默认的网络模型是主从多 Reactors,只会有一个主 reactor 在监听端口以及接收新连接。所以,开不开启 `SO_REUSEPORT` 选项是无关紧要的,只是开启了这个选项之后 `gnet` 的网络模型将会切换成 `evio` 的旧网络模型,这一点需要注意一下。

开启这个功能也很简单,使用 functional options 设置一下即可:

Expand Down Expand Up @@ -1092,6 +1093,7 @@ GOMAXPROCS=4
- [evio](https://github.com/tidwall/evio)
- [netty](https://github.com/netty/netty)
- [ants](https://github.com/panjf2000/ants)
- [go_reuseport](https://github.com/kavu/go_reuseport)
- [bytebufferpool](https://github.com/valyala/bytebufferpool)
- [goframe](https://github.com/smallnest/goframe)
- [ringbuffer](https://github.com/smallnest/ringbuffer)
Expand Down
2 changes: 1 addition & 1 deletion connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func newUDPConn(fd int, el *eventloop, sa unix.Sockaddr) *conn {
return &conn{
fd: fd,
sa: sa,
localAddr: el.svr.ln.lnaddr,
localAddr: el.ln.lnaddr,
remoteAddr: netpoll.SockaddrToUDPAddr(sa),
}
}
Expand Down
11 changes: 6 additions & 5 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
package gnet

import (
"net"
"time"

"github.com/panjf2000/gnet/internal/netpoll"
"golang.org/x/sys/unix"
)

type eventloop struct {
ln *listener // listener
idx int // loop index in the server loops list
svr *server // server in loop
codec ICodec // codec for TCP
Expand All @@ -37,6 +37,7 @@ func (el *eventloop) closeAllConns() {
func (el *eventloop) loopRun() {
defer func() {
el.closeAllConns()
el.ln.close()
if el.idx == 0 && el.svr.opts.Ticker {
close(el.svr.ticktock)
}
Expand All @@ -51,8 +52,8 @@ func (el *eventloop) loopRun() {
}

func (el *eventloop) loopAccept(fd int) error {
if fd == el.svr.ln.fd {
if el.svr.ln.pconn != nil {
if fd == el.ln.fd {
if el.ln.network == "udp" {
return el.loopReadUDP(fd)
}
nfd, sa, err := unix.Accept(fd)
Expand All @@ -78,11 +79,11 @@ func (el *eventloop) loopAccept(fd int) error {

func (el *eventloop) loopOpen(c *conn) error {
c.opened = true
c.localAddr = el.svr.ln.lnaddr
c.localAddr = el.ln.lnaddr
c.remoteAddr = netpoll.SockaddrToTCPOrUnixAddr(c.sa)
out, action := el.eventHandler.OnOpened(c)
if el.svr.opts.TCPKeepAlive > 0 {
if _, ok := el.svr.ln.ln.(*net.TCPListener); ok {
if proto := el.ln.network; proto == "tcp" || proto == "unix" {
_ = netpoll.SetKeepAlive(c.fd, int(el.svr.opts.TCPKeepAlive/time.Second))
}
}
Expand Down
7 changes: 4 additions & 3 deletions examples/echo_tcp/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.

func main() {
var port int
var multicore bool
var multicore, reuseport bool

// Example command: go run echo.go --port 9000 --multicore=true
// Example command: go run echo.go --port 9000 --multicore=true --reuseport=true
flag.IntVar(&port, "port", 9000, "--port 9000")
flag.BoolVar(&multicore, "multicore", false, "--multicore true")
flag.BoolVar(&reuseport, "reuseport", false, "--reuseport true")
flag.Parse()
echo := new(echoServer)
log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore)))
log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore), gnet.WithReusePort(reuseport)))
}
55 changes: 7 additions & 48 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ import (
"log"
"net"
"os"
"runtime"
"strings"
"sync/atomic"
"time"

"github.com/panjf2000/gnet/internal/netpoll"
)

// Action is an action that occurs after the completion of an event.
Expand Down Expand Up @@ -224,63 +221,25 @@ func (es *EventServer) Tick() (delay time.Duration, action Action) {
// unix - Unix Domain Socket
//
// The "tcp" network scheme is assumed when one is not specified.
func Serve(eventHandler EventHandler, addr string, opts ...Option) (err error) {
var ln listener
defer func() {
ln.close()
if ln.network == "unix" {
sniffErrorAndLog(os.RemoveAll(ln.addr))
}
}()

func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err error) {
options := loadOptions(opts...)

if options.Logger != nil {
defaultLogger = options.Logger
}

ln.network, ln.addr = parseAddr(addr)
switch ln.network {
case "udp", "udp4", "udp6":
if options.ReusePort {
ln.pconn, err = netpoll.ReusePortListenPacket(ln.network, ln.addr)
} else {
ln.pconn, err = net.ListenPacket(ln.network, ln.addr)
}
case "unix":
sniffErrorAndLog(os.RemoveAll(ln.addr))
if runtime.GOOS == "windows" {
err = ErrUnsupportedProtocol
break
}
fallthrough
case "tcp", "tcp4", "tcp6":
if options.ReusePort {
ln.ln, err = netpoll.ReusePortListen(ln.network, ln.addr)
} else {
ln.ln, err = net.Listen(ln.network, ln.addr)
}
default:
err = ErrUnsupportedProtocol
}
if err != nil {
return
}

if ln.pconn != nil {
ln.lnaddr = ln.pconn.LocalAddr()
} else {
ln.lnaddr = ln.ln.Addr()
}
network, addr := parseProtoAddr(protoAddr)

if err = ln.renormalize(); err != nil {
var ln *listener
if ln, err = initListener(network, addr, options.ReusePort); err != nil {
return
}
defer ln.close()

return serve(eventHandler, &ln, options)
return serve(eventHandler, ln, options)
}

func parseAddr(addr string) (network, address string) {
func parseProtoAddr(addr string) (network, address string) {
network = "tcp"
address = strings.ToLower(addr)
if strings.Contains(address, "://") {
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ module github.com/panjf2000/gnet
go 1.13

require (
github.com/libp2p/go-reuseport v0.0.1
github.com/panjf2000/ants/v2 v2.4.0
github.com/pkg/errors v0.9.1 // indirect
github.com/panjf2000/ants/v2 v2.4.1
github.com/smallnest/goframe v1.0.0
github.com/valyala/bytebufferpool v1.0.0
golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae
)
17 changes: 4 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,19 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw=
github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQzaaxIFYDhcYEA=
github.com/panjf2000/ants/v2 v2.4.0 h1:embKPQeNWMRbnrRKURv4TXJwjQRWMEAfqZT6Pe5hZNc=
github.com/panjf2000/ants/v2 v2.4.0/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/panjf2000/ants/v2 v2.4.1 h1:7RtUqj5lGOw0WnZhSKDZ2zzJhaX5490ZW1sUolRXCxY=
github.com/panjf2000/ants/v2 v2.4.1/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/smallnest/goframe v1.0.0 h1:ywsSz9P5BFiqn39w8iFDENTdqN44v+B5bp1PbCH+PVw=
github.com/smallnest/goframe v1.0.0/go.mod h1:Dy8560GXrB6w5OJnVBU71dJtSyINdnqHHe6atDaZX00=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f h1:mOhmO9WsBaJCNmaZHPtHs9wOcdqdKCjF6OPJlmDM3KI=
golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
23 changes: 0 additions & 23 deletions internal/netpoll/reuseport.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/netpoll/socktoaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func SockaddrToTCPOrUnixAddr(sa unix.Sockaddr) net.Addr {

// SockaddrToUDPAddr converts a Sockaddr to a net.UDPAddr
// Returns nil if conversion fails.
func SockaddrToUDPAddr(sa unix.Sockaddr) *net.UDPAddr {
func SockaddrToUDPAddr(sa unix.Sockaddr) net.Addr {
switch sa := sa.(type) {
case *unix.SockaddrInet4:
ip := sockaddrInet4ToIP(sa)
Expand Down
44 changes: 44 additions & 0 deletions internal/reuseport/reuseport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// copyright (c) 2020 andy pan
// Copyright (C) 2017 Max Riveiro
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the Software without restriction,
// including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of
// the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
// WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package reuseport provides a function that returns fd and net.Listener powered
// by a net.FileListener with a SO_REUSEPORT option set to the socket.

// +build linux darwin netbsd freebsd openbsd dragonfly

package reuseport

import (
"errors"
"net"
)

var errUnsupportedProtocol = errors.New("only unix, tcp/tcp4/tcp6, udp/udp4/udp6 are supported")

// TCPSocket calls tcpReusablePort.
func TCPSocket(proto, addr string, reusePort bool) (int, net.Addr, error) {
return tcpReusablePort(proto, addr, reusePort)
}

// UDPSocket calls udpReusablePort.
func UDPSocket(proto, addr string, reusePort bool) (int, net.Addr, error) {
return udpReusablePort(proto, addr, reusePort)
}

// UnixSocket calls udsReusablePort.
func UnixSocket(proto, addr string, reusePort bool) (int, net.Addr, error) {
return udsReusablePort(proto, addr, reusePort)
}
Loading

0 comments on commit ccc8c64

Please sign in to comment.