diff --git a/component/components/lockstep.go b/component/components/lockstep.go index 4a98f697..eb21c6f2 100644 --- a/component/components/lockstep.go +++ b/component/components/lockstep.go @@ -39,8 +39,6 @@ func NewLockstep[ClientID comparable, Command any](options ...LockstepOption[Cli // - 自定帧序列化方式 WithLockstepSerialization // - 从特定帧开始追帧 // - 兼容各种基于TCP/UDP/Unix的网络类型,可通过客户端实现其他网络类型同步 -// -// 可在 examples 目录下找到示例,示例项目:simple-server-lockstep type Lockstep[ClientID comparable, Command any] struct { clients *synchronization.Map[ClientID, component.LockstepClient[ClientID]] // 接受广播的客户端 frames *synchronization.Map[int, []Command] // 所有帧指令 diff --git a/game/builtin/player.go b/game/builtin/player.go index c66c2a99..3fd686ec 100644 --- a/game/builtin/player.go +++ b/game/builtin/player.go @@ -30,8 +30,8 @@ func (slf *Player[ID]) UseConn(conn *server.Conn) { } // Send 向该玩家发送数据 -func (slf *Player[ID]) Send(packet []byte, messageType ...int) { - slf.conn.Write(packet, messageType...) +func (slf *Player[ID]) Send(packet server.Packet) { + slf.conn.Write(packet) } // Close 关闭玩家 diff --git a/server/event.go b/server/event.go index 3c2080d6..1d1ba105 100644 --- a/server/event.go +++ b/server/event.go @@ -73,8 +73,8 @@ func (slf *event) OnConsoleCommandEvent(command string) { if !exist { switch command { case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN": - log.Info("Console", zap.String("Receive", command), zap.String("Action", "Shutdown")) - slf.Server.Shutdown(nil) + log.Info("Console", zap.String("Receive", command), zap.String("Action", "shutdown")) + slf.Server.shutdown(nil) return } log.Warn("Server", zap.String("Command", "unregistered")) diff --git a/server/gnet.go b/server/gnet.go index 73f9d57b..4e717932 100644 --- a/server/gnet.go +++ b/server/gnet.go @@ -23,7 +23,7 @@ func (slf *gNet) OnShutdown(server gnet.Server) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil { - log.Error("Server", zap.String("Minotaur GNet Server", "Shutdown"), zap.Error(err)) + log.Error("Server", zap.String("Minotaur GNet Server", "shutdown"), zap.Error(err)) } } diff --git a/server/message.go b/server/message.go index 3222a6cb..f93e5315 100644 --- a/server/message.go +++ b/server/message.go @@ -31,12 +31,12 @@ var messageNames = map[MessageType]string{ const ( MessageErrorActionNone MessageErrorAction = iota // 错误消息类型操作:将不会被进行任何特殊处理,仅进行日志输出 - MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.Shutdown 函数 + MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.shutdown 函数 ) var messageErrorActionNames = map[MessageErrorAction]string{ MessageErrorActionNone: "None", - MessageErrorActionShutdown: "Shutdown", + MessageErrorActionShutdown: "shutdown", } type ( @@ -60,7 +60,7 @@ func (slf MessageType) String() string { return messageNames[slf] } -// PushPacketMessage 向特定服务器中推送 Packet 消息 +// PushPacketMessage 向特定服务器中推送 MessageTypePacket 消息 func PushPacketMessage(srv *Server, conn *Conn, packet []byte) { msg := srv.messagePool.Get() msg.t = MessageTypePacket @@ -68,7 +68,7 @@ func PushPacketMessage(srv *Server, conn *Conn, packet []byte) { srv.pushMessage(msg) } -// PushErrorMessage 向特定服务器中推送 Error 消息 +// PushErrorMessage 向特定服务器中推送 MessageTypeError 消息 func PushErrorMessage(srv *Server, err error, action MessageErrorAction) { msg := srv.messagePool.Get() msg.t = MessageTypeError @@ -76,7 +76,7 @@ func PushErrorMessage(srv *Server, err error, action MessageErrorAction) { srv.pushMessage(msg) } -// PushCrossMessage 向特定服务器中推送 Cross 消息 +// PushCrossMessage 向特定服务器中推送 MessageTypeCross 消息 func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []byte) { if serverId == srv.id { msg := srv.messagePool.Get() @@ -95,10 +95,18 @@ func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []by } } -// PushTickerMessage 向特定服务器中推送 Ticker 消息 +// PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息 func PushTickerMessage(srv *Server, caller func()) { msg := srv.messagePool.Get() msg.t = MessageTypeTicker msg.attrs = []any{caller} srv.pushMessage(msg) } + +// PushAsyncMessage 向特定服务器中推送 MessageTypeAsync 消息 +func PushAsyncMessage(srv *Server, caller func() error, callback ...func(err error)) { + msg := srv.messagePool.Get() + msg.t = MessageTypeAsync + msg.attrs = []any{caller, callback} + srv.pushMessage(msg) +} diff --git a/server/multiple.go b/server/multiple.go index c9fdf0aa..0f29a470 100644 --- a/server/multiple.go +++ b/server/multiple.go @@ -60,7 +60,7 @@ func (slf *MultipleServer) Run() { case err := <-exceptionChannel: for len(slf.servers) > 0 { server := slf.servers[0] - server.Shutdown(err) + server.shutdown(err) slf.servers = slf.servers[1:] } break @@ -68,7 +68,7 @@ func (slf *MultipleServer) Run() { for len(slf.servers) > 0 { server := slf.servers[0] server.multipleRuntimeErrorChan = nil - server.Shutdown(nil) + server.shutdown(nil) slf.servers = slf.servers[1:] } break @@ -76,7 +76,7 @@ func (slf *MultipleServer) Run() { for len(slf.servers) > 0 { server := slf.servers[0] server.multipleRuntimeErrorChan = nil - server.Shutdown(nil) + server.shutdown(nil) slf.servers = slf.servers[1:] } break diff --git a/server/options.go b/server/options.go index 49d3b240..ed31f7f8 100644 --- a/server/options.go +++ b/server/options.go @@ -23,6 +23,41 @@ const ( ) type Option func(srv *Server) +type option struct { + disableAnts bool // 是否禁用协程池 + antsPoolSize int // 协程池大小 +} + +type runtime struct { + deadlockDetect time.Duration // 是否开启死锁检测 +} + +// WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器 +// - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock" +func WithDeadlockDetect(t time.Duration) Option { + return func(srv *Server) { + if t > 0 { + srv.deadlockDetect = t + log.Info("DeadlockDetect", zap.Any("Time", t)) + } + } +} + +// WithDisableAsyncMessage 通过禁用异步消息的方式创建服务器 +func WithDisableAsyncMessage() Option { + return func(srv *Server) { + srv.disableAnts = true + } +} + +// WithAsyncPoolSize 通过指定异步消息池大小的方式创建服务器 +// - 当通过 WithDisableAsyncMessage 禁用异步消息时,此选项无效 +// - 默认值为 256 +func WithAsyncPoolSize(size int) Option { + return func(srv *Server) { + srv.antsPoolSize = size + } +} // WithWebsocketReadDeadline 设置 Websocket 读取超时时间 // - 默认: 30 * time.Second diff --git a/server/server.go b/server/server.go index 8a97e1d0..09eccea3 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,7 @@ import ( "github.com/kercylan98/minotaur/utils/synchronization" "github.com/kercylan98/minotaur/utils/timer" "github.com/kercylan98/minotaur/utils/times" + "github.com/panjf2000/ants/v2" "github.com/panjf2000/gnet" "github.com/panjf2000/gnet/pkg/logging" "github.com/xtaci/kcp-go/v5" @@ -29,9 +30,11 @@ import ( func New(network Network, options ...Option) *Server { server := &Server{ event: &event{}, + runtime: &runtime{}, + option: &option{}, network: network, - options: options, closeChannel: make(chan struct{}, 1), + systemSignal: make(chan os.Signal, 1), } server.event.Server = server @@ -50,17 +53,32 @@ func New(network Network, options ...Option) *Server { for _, option := range options { option(server) } + + if !server.disableAnts { + if server.antsPoolSize <= 0 { + server.antsPoolSize = 256 + } + var err error + server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.Logger())) + if err != nil { + panic(err) + } + } + + server.option = nil return server } // Server 网络服务器 type Server struct { *event // 事件 + *runtime // 运行时 + *option // 可选项 + systemSignal chan os.Signal // 系统信号 cross map[string]Cross // 跨服 id int64 // 服务器id network Network // 网络类型 addr string // 侦听地址 - options []Option // 选项 ginServer *gin.Engine // HTTP模式下的路由器 httpServer *http.Server // HTTP模式下的服务器 grpcServer *grpc.Server // GRPC模式下的服务器 @@ -69,6 +87,7 @@ type Server struct { isRunning bool // 是否正在运行 isShutdown atomic.Bool // 是否已关闭 closeChannel chan struct{} // 关闭信号 + ants *ants.Pool // 协程池 gServer *gNet // TCP或UDP模式下的服务器 messagePool *synchronization.Pool[*Message] // 消息池 @@ -304,11 +323,11 @@ func (slf *Server) Run(addr string) error { ) log.Info("Server", zap.String("Minotaur Server", "====================================================================")) slf.OnStartFinishEvent() - systemSignal := make(chan os.Signal, 1) - signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) + + signal.Notify(slf.systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) select { - case <-systemSignal: - slf.Shutdown(nil) + case <-slf.systemSignal: + slf.shutdown(nil) } select { @@ -348,8 +367,13 @@ func (slf *Server) Ticker() *timer.Ticker { return slf.ticker } -// Shutdown 停止运行服务器 -func (slf *Server) Shutdown(err error, stack ...string) { +// Shutdown 主动停止运行服务器 +func (slf *Server) Shutdown() { + slf.systemSignal <- syscall.SIGQUIT +} + +// shutdown 停止运行服务器 +func (slf *Server) shutdown(err error, stack ...string) { slf.OnStopEvent() defer func() { if slf.multipleRuntimeErrorChan != nil { @@ -432,8 +456,32 @@ func (slf *Server) pushMessage(message *Message) { slf.messageChannel <- message } +func (slf *Server) low(message *Message, present time.Time) { + cost := time.Since(present) + if cost > time.Millisecond*100 { + log.Warn("Server", zap.String("MessageType", messageNames[message.t]), zap.String("LowExecCost", cost.String())) + slf.OnMessageLowExecEvent(message, cost) + } +} + // dispatchMessage 消息分发 func (slf *Server) dispatchMessage(msg *Message) { + var ( + ctx context.Context + cancel context.CancelFunc + ) + if slf.deadlockDetect > 0 { + ctx, cancel = context.WithTimeout(context.Background(), slf.deadlockDetect) + go func() { + select { + case <-ctx.Done(): + if err := ctx.Err(); err == context.DeadlineExceeded { + log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("SuspectedDeadlock", msg.attrs)) + } + } + }() + } + present := time.Now() defer func() { if err := recover(); err != nil { @@ -443,15 +491,14 @@ func (slf *Server) dispatchMessage(msg *Message) { } } - cost := time.Since(present) - if cost > time.Millisecond*100 { - log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.String("LowExecCost", cost.String()), zap.Any("MessageAttrs", msg.attrs)) - slf.OnMessageLowExecEvent(msg, cost) + if msg.t != MessageTypeAsync { + super.Handle(cancel) + slf.low(msg, present) + if !slf.isShutdown.Load() { + slf.messagePool.Release(msg) + } } - if !slf.isShutdown.Load() { - slf.messagePool.Release(msg) - } }() var attrs = msg.attrs switch msg.t { @@ -465,7 +512,7 @@ func (slf *Server) dispatchMessage(msg *Message) { case MessageErrorActionNone: log.ErrorWithStack("Server", stack, zap.Error(err)) case MessageErrorActionShutdown: - slf.Shutdown(err, stack) + slf.shutdown(err, stack) default: log.Warn("Server", zap.String("not support message error action", action.String())) } @@ -474,7 +521,29 @@ func (slf *Server) dispatchMessage(msg *Message) { case MessageTypeTicker: attrs[0].(func())() case MessageTypeAsync: - + handle := attrs[0].(func() error) + callbacks := attrs[1].([]func(err error)) + if err := slf.ants.Submit(func() { + defer func() { + if err := recover(); err != nil { + log.Error("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("MessageAttrs", msg.attrs), zap.Any("error", err)) + if e, ok := err.(error); ok { + slf.OnMessageErrorEvent(msg, e) + } + } + super.Handle(cancel) + if !slf.isShutdown.Load() { + slf.messagePool.Release(msg) + } + }() + if err := handle(); err != nil { + for _, callback := range callbacks { + callback(err) + } + } + }); err != nil { + panic(err) + } default: log.Warn("Server", zap.String("not support message type", msg.t.String())) } diff --git a/server/server_example_test.go b/server/server_example_test.go new file mode 100644 index 00000000..7fd49afb --- /dev/null +++ b/server/server_example_test.go @@ -0,0 +1,23 @@ +package server_test + +import ( + "github.com/kercylan98/minotaur/server" + "time" +) + +func ExampleNew() { + srv := server.New(server.NetworkWebsocket, + server.WithDeadlockDetect(time.Second*5), + ) + + srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet server.Packet) { + conn.Write(packet) + }) + + go func() { time.Sleep(1 * time.Second); srv.Shutdown() }() + if err := srv.Run(":9999"); err != nil { + panic(err) + } + + // Output: +} diff --git a/utils/log/log.go b/utils/log/log.go index 832a04e4..cfab1178 100644 --- a/utils/log/log.go +++ b/utils/log/log.go @@ -101,8 +101,16 @@ func getWriter(filename string, times int) io.Writer { return hook } -func Logger() *zap.Logger { - return logger +type MLogger struct { + *zap.Logger +} + +func (slf *MLogger) Printf(format string, args ...interface{}) { + slf.Info(fmt.Sprintf(format, args...)) +} + +func Logger() *MLogger { + return &MLogger{logger} } func Info(msg string, fields ...zap.Field) {