Skip to content

Commit

Permalink
refactor: 服务器支持异步消息类型、死锁阻塞、异步慢消息检测
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Jul 7, 2023
1 parent f0e3822 commit 1a2c1df
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 35 deletions.
2 changes: 0 additions & 2 deletions component/components/lockstep.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ func NewLockstep[ClientID comparable, Command any](options ...LockstepOption[Cli
// - 自定帧序列化方式 WithLockstepSerialization
// - 从特定帧开始追帧
// - 兼容各种基于TCP/UDP/Unix的网络类型,可通过客户端实现其他网络类型同步
//
// 可在 examples 目录下找到示例,示例项目:simple-server-lockstep
type Lockstep[ClientID comparable, Command any] struct {
clients *synchronization.Map[ClientID, component.LockstepClient[ClientID]] // 接受广播的客户端
frames *synchronization.Map[int, []Command] // 所有帧指令
Expand Down
4 changes: 2 additions & 2 deletions game/builtin/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func (slf *Player[ID]) UseConn(conn *server.Conn) {
}

// Send 向该玩家发送数据
func (slf *Player[ID]) Send(packet []byte, messageType ...int) {
slf.conn.Write(packet, messageType...)
func (slf *Player[ID]) Send(packet server.Packet) {
slf.conn.Write(packet)
}

// Close 关闭玩家
Expand Down
4 changes: 2 additions & 2 deletions server/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (slf *event) OnConsoleCommandEvent(command string) {
if !exist {
switch command {
case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN":
log.Info("Console", zap.String("Receive", command), zap.String("Action", "Shutdown"))
slf.Server.Shutdown(nil)
log.Info("Console", zap.String("Receive", command), zap.String("Action", "shutdown"))
slf.Server.shutdown(nil)
return
}
log.Warn("Server", zap.String("Command", "unregistered"))
Expand Down
2 changes: 1 addition & 1 deletion server/gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (slf *gNet) OnShutdown(server gnet.Server) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil {
log.Error("Server", zap.String("Minotaur GNet Server", "Shutdown"), zap.Error(err))
log.Error("Server", zap.String("Minotaur GNet Server", "shutdown"), zap.Error(err))
}
}

Expand Down
20 changes: 14 additions & 6 deletions server/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ var messageNames = map[MessageType]string{

const (
MessageErrorActionNone MessageErrorAction = iota // 错误消息类型操作:将不会被进行任何特殊处理,仅进行日志输出
MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.Shutdown 函数
MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.shutdown 函数
)

var messageErrorActionNames = map[MessageErrorAction]string{
MessageErrorActionNone: "None",
MessageErrorActionShutdown: "Shutdown",
MessageErrorActionShutdown: "shutdown",
}

type (
Expand All @@ -60,23 +60,23 @@ func (slf MessageType) String() string {
return messageNames[slf]
}

// PushPacketMessage 向特定服务器中推送 Packet 消息
// PushPacketMessage 向特定服务器中推送 MessageTypePacket 消息
func PushPacketMessage(srv *Server, conn *Conn, packet []byte) {
msg := srv.messagePool.Get()
msg.t = MessageTypePacket
msg.attrs = []any{conn, packet}
srv.pushMessage(msg)
}

// PushErrorMessage 向特定服务器中推送 Error 消息
// PushErrorMessage 向特定服务器中推送 MessageTypeError 消息
func PushErrorMessage(srv *Server, err error, action MessageErrorAction) {
msg := srv.messagePool.Get()
msg.t = MessageTypeError
msg.attrs = []any{err, action, string(debug.Stack())}
srv.pushMessage(msg)
}

// PushCrossMessage 向特定服务器中推送 Cross 消息
// PushCrossMessage 向特定服务器中推送 MessageTypeCross 消息
func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []byte) {
if serverId == srv.id {
msg := srv.messagePool.Get()
Expand All @@ -95,10 +95,18 @@ func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []by
}
}

// PushTickerMessage 向特定服务器中推送 Ticker 消息
// PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息
func PushTickerMessage(srv *Server, caller func()) {
msg := srv.messagePool.Get()
msg.t = MessageTypeTicker
msg.attrs = []any{caller}
srv.pushMessage(msg)
}

// PushAsyncMessage 向特定服务器中推送 MessageTypeAsync 消息
func PushAsyncMessage(srv *Server, caller func() error, callback ...func(err error)) {
msg := srv.messagePool.Get()
msg.t = MessageTypeAsync
msg.attrs = []any{caller, callback}
srv.pushMessage(msg)
}
6 changes: 3 additions & 3 deletions server/multiple.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@ func (slf *MultipleServer) Run() {
case err := <-exceptionChannel:
for len(slf.servers) > 0 {
server := slf.servers[0]
server.Shutdown(err)
server.shutdown(err)
slf.servers = slf.servers[1:]
}
break
case <-runtimeExceptionChannel:
for len(slf.servers) > 0 {
server := slf.servers[0]
server.multipleRuntimeErrorChan = nil
server.Shutdown(nil)
server.shutdown(nil)
slf.servers = slf.servers[1:]
}
break
case <-systemSignal:
for len(slf.servers) > 0 {
server := slf.servers[0]
server.multipleRuntimeErrorChan = nil
server.Shutdown(nil)
server.shutdown(nil)
slf.servers = slf.servers[1:]
}
break
Expand Down
35 changes: 35 additions & 0 deletions server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,41 @@ const (
)

type Option func(srv *Server)
type option struct {
disableAnts bool // 是否禁用协程池
antsPoolSize int // 协程池大小
}

type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测
}

// WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器
// - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock"
func WithDeadlockDetect(t time.Duration) Option {
return func(srv *Server) {
if t > 0 {
srv.deadlockDetect = t
log.Info("DeadlockDetect", zap.Any("Time", t))
}
}
}

// WithDisableAsyncMessage 通过禁用异步消息的方式创建服务器
func WithDisableAsyncMessage() Option {
return func(srv *Server) {
srv.disableAnts = true
}
}

// WithAsyncPoolSize 通过指定异步消息池大小的方式创建服务器
// - 当通过 WithDisableAsyncMessage 禁用异步消息时,此选项无效
// - 默认值为 256
func WithAsyncPoolSize(size int) Option {
return func(srv *Server) {
srv.antsPoolSize = size
}
}

// WithWebsocketReadDeadline 设置 Websocket 读取超时时间
// - 默认: 30 * time.Second
Expand Down
103 changes: 86 additions & 17 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kercylan98/minotaur/utils/synchronization"
"github.com/kercylan98/minotaur/utils/timer"
"github.com/kercylan98/minotaur/utils/times"
"github.com/panjf2000/ants/v2"
"github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/pkg/logging"
"github.com/xtaci/kcp-go/v5"
Expand All @@ -29,9 +30,11 @@ import (
func New(network Network, options ...Option) *Server {
server := &Server{
event: &event{},
runtime: &runtime{},
option: &option{},
network: network,
options: options,
closeChannel: make(chan struct{}, 1),
systemSignal: make(chan os.Signal, 1),
}
server.event.Server = server

Expand All @@ -50,17 +53,32 @@ func New(network Network, options ...Option) *Server {
for _, option := range options {
option(server)
}

if !server.disableAnts {
if server.antsPoolSize <= 0 {
server.antsPoolSize = 256
}
var err error
server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.Logger()))
if err != nil {
panic(err)
}
}

server.option = nil
return server
}

// Server 网络服务器
type Server struct {
*event // 事件
*runtime // 运行时
*option // 可选项
systemSignal chan os.Signal // 系统信号
cross map[string]Cross // 跨服
id int64 // 服务器id
network Network // 网络类型
addr string // 侦听地址
options []Option // 选项
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
Expand All @@ -69,6 +87,7 @@ type Server struct {
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
ants *ants.Pool // 协程池

gServer *gNet // TCP或UDP模式下的服务器
messagePool *synchronization.Pool[*Message] // 消息池
Expand Down Expand Up @@ -304,11 +323,11 @@ func (slf *Server) Run(addr string) error {
)
log.Info("Server", zap.String("Minotaur Server", "===================================================================="))
slf.OnStartFinishEvent()
systemSignal := make(chan os.Signal, 1)
signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)

signal.Notify(slf.systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
select {
case <-systemSignal:
slf.Shutdown(nil)
case <-slf.systemSignal:
slf.shutdown(nil)
}

select {
Expand Down Expand Up @@ -348,8 +367,13 @@ func (slf *Server) Ticker() *timer.Ticker {
return slf.ticker
}

// Shutdown 停止运行服务器
func (slf *Server) Shutdown(err error, stack ...string) {
// Shutdown 主动停止运行服务器
func (slf *Server) Shutdown() {
slf.systemSignal <- syscall.SIGQUIT
}

// shutdown 停止运行服务器
func (slf *Server) shutdown(err error, stack ...string) {
slf.OnStopEvent()
defer func() {
if slf.multipleRuntimeErrorChan != nil {
Expand Down Expand Up @@ -432,8 +456,32 @@ func (slf *Server) pushMessage(message *Message) {
slf.messageChannel <- message
}

func (slf *Server) low(message *Message, present time.Time) {
cost := time.Since(present)
if cost > time.Millisecond*100 {
log.Warn("Server", zap.String("MessageType", messageNames[message.t]), zap.String("LowExecCost", cost.String()))
slf.OnMessageLowExecEvent(message, cost)
}
}

// dispatchMessage 消息分发
func (slf *Server) dispatchMessage(msg *Message) {
var (
ctx context.Context
cancel context.CancelFunc
)
if slf.deadlockDetect > 0 {
ctx, cancel = context.WithTimeout(context.Background(), slf.deadlockDetect)
go func() {
select {
case <-ctx.Done():
if err := ctx.Err(); err == context.DeadlineExceeded {
log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("SuspectedDeadlock", msg.attrs))
}
}
}()
}

present := time.Now()
defer func() {
if err := recover(); err != nil {
Expand All @@ -443,15 +491,14 @@ func (slf *Server) dispatchMessage(msg *Message) {
}
}

cost := time.Since(present)
if cost > time.Millisecond*100 {
log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.String("LowExecCost", cost.String()), zap.Any("MessageAttrs", msg.attrs))
slf.OnMessageLowExecEvent(msg, cost)
if msg.t != MessageTypeAsync {
super.Handle(cancel)
slf.low(msg, present)
if !slf.isShutdown.Load() {
slf.messagePool.Release(msg)
}
}

if !slf.isShutdown.Load() {
slf.messagePool.Release(msg)
}
}()
var attrs = msg.attrs
switch msg.t {
Expand All @@ -465,7 +512,7 @@ func (slf *Server) dispatchMessage(msg *Message) {
case MessageErrorActionNone:
log.ErrorWithStack("Server", stack, zap.Error(err))
case MessageErrorActionShutdown:
slf.Shutdown(err, stack)
slf.shutdown(err, stack)
default:
log.Warn("Server", zap.String("not support message error action", action.String()))
}
Expand All @@ -474,7 +521,29 @@ func (slf *Server) dispatchMessage(msg *Message) {
case MessageTypeTicker:
attrs[0].(func())()
case MessageTypeAsync:

handle := attrs[0].(func() error)
callbacks := attrs[1].([]func(err error))
if err := slf.ants.Submit(func() {
defer func() {
if err := recover(); err != nil {
log.Error("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("MessageAttrs", msg.attrs), zap.Any("error", err))
if e, ok := err.(error); ok {
slf.OnMessageErrorEvent(msg, e)
}
}
super.Handle(cancel)
if !slf.isShutdown.Load() {
slf.messagePool.Release(msg)
}
}()
if err := handle(); err != nil {
for _, callback := range callbacks {
callback(err)
}
}
}); err != nil {
panic(err)
}
default:
log.Warn("Server", zap.String("not support message type", msg.t.String()))
}
Expand Down
23 changes: 23 additions & 0 deletions server/server_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package server_test

import (
"github.com/kercylan98/minotaur/server"
"time"
)

func ExampleNew() {
srv := server.New(server.NetworkWebsocket,
server.WithDeadlockDetect(time.Second*5),
)

srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet server.Packet) {
conn.Write(packet)
})

go func() { time.Sleep(1 * time.Second); srv.Shutdown() }()
if err := srv.Run(":9999"); err != nil {
panic(err)
}

// Output:
}
Loading

0 comments on commit 1a2c1df

Please sign in to comment.