Skip to content

Commit

Permalink
perf: control ws write buffer (#1451)
Browse files Browse the repository at this point in the history
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
  • Loading branch information
rfyiamcool committed Nov 22, 2023
1 parent 7a13284 commit a9153af
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 4 deletions.
4 changes: 3 additions & 1 deletion internal/msggateway/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
WithPort(wsPort),
WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)),
WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeout)*time.Second),
WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen))
WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen),
WithWriteBufferSize(config.Config.LongConnSvr.WebsocketWriteBufferSize),
)
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions internal/msggateway/long_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ type GWebSocket struct {
protocolType int
conn *websocket.Conn
handshakeTimeout time.Duration
writeBufferSize int
}

func newGWebSocket(protocolType int, handshakeTimeout time.Duration) *GWebSocket {
return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout}
func newGWebSocket(protocolType int, handshakeTimeout time.Duration, wbs int) *GWebSocket {
return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout, writeBufferSize: wbs}
}

func (d *GWebSocket) Close() error {
Expand All @@ -65,6 +66,10 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er
HandshakeTimeout: d.handshakeTimeout,
CheckOrigin: func(r *http.Request) bool { return true },
}
if d.writeBufferSize > 0 { // default is 4kb.
upgrader.WriteBufferSize = d.writeBufferSize
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion internal/msggateway/n_ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type WsServer struct {
onlineUserNum atomic.Int64
onlineUserConnNum atomic.Int64
handshakeTimeout time.Duration
writeBufferSize int
validate *validator.Validate
cache cache.MsgModel
userClient *rpcclient.UserRpcClient
Expand Down Expand Up @@ -137,6 +138,7 @@ func NewWsServer(opts ...Option) (*WsServer, error) {
return &WsServer{
port: config.port,
wsMaxConnNum: config.maxConnNum,
writeBufferSize: config.writeBufferSize,
handshakeTimeout: config.handshakeTimeout,
clientPool: sync.Pool{
New: func() interface{} {
Expand Down Expand Up @@ -430,7 +432,8 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
httpError(connContext, errs.ErrTokenNotExist.Wrap())
return
}
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout)

wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize)
err = wsLongConn.GenerateLongConn(w, r)
if err != nil {
httpError(connContext, err)
Expand Down
8 changes: 8 additions & 0 deletions internal/msggateway/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type (
handshakeTimeout time.Duration
// 允许消息最大长度
messageMaxMsgLength int
// websocket write buffer, default: 4096, 4kb.
writeBufferSize int
}
)

Expand All @@ -53,3 +55,9 @@ func WithMessageMaxMsgLength(length int) Option {
opt.messageMaxMsgLength = length
}
}

func WithWriteBufferSize(size int) Option {
return func(opt *configs) {
opt.writeBufferSize = size
}
}
1 change: 1 addition & 0 deletions pkg/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ type configStruct struct {
WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"`
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeout int `yaml:"websocketTimeout"`
WebsocketWriteBufferSize int `yaml:"websocketWriteBufferSize"`
} `yaml:"longConnSvr"`

Push struct {
Expand Down

0 comments on commit a9153af

Please sign in to comment.