From 35e13d9cd5e055746f6f8da3a966876a67b7415d Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Sun, 7 Apr 2024 18:48:11 +0800 Subject: [PATCH] =?UTF-8?q?other:=20=E6=9C=8D=E5=8A=A1=E5=99=A8=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/internal/v2/conn.go | 65 +++-- server/internal/v2/controller.go | 96 ++++++-- server/internal/v2/events.go | 12 +- server/internal/v2/message.go | 251 ++++++++++++++------ server/internal/v2/messages/async.go | 27 --- server/internal/v2/messages/sync.go | 26 -- server/internal/v2/options.go | 7 +- server/internal/v2/queue/message_wrapper.go | 21 +- server/internal/v2/queue/queue.go | 7 +- server/internal/v2/reactor/reactor.go | 35 +-- server/internal/v2/reactor/reactor_test.go | 4 +- server/internal/v2/server.go | 47 ++-- server/internal/v2/server_test.go | 24 +- 13 files changed, 388 insertions(+), 234 deletions(-) delete mode 100644 server/internal/v2/messages/async.go delete mode 100644 server/internal/v2/messages/sync.go diff --git a/server/internal/v2/conn.go b/server/internal/v2/conn.go index b1c44c70..39bb0959 100644 --- a/server/internal/v2/conn.go +++ b/server/internal/v2/conn.go @@ -12,8 +12,11 @@ type Conn interface { // SetActor 设置连接使用的 Actor 名称 SetActor(actor string) - // GetActor 获取连接使用的 Actor 名称 - GetActor() string + // DelActor 删除连接使用的 Actor + DelActor() + + // GetActor 获取连接使用的 Actor 名称及是否拥有 Actor 名称的状态 + GetActor() (string, bool) // WritePacket 写入一个 Packet WritePacket(packet Packet) error @@ -27,9 +30,14 @@ type Conn interface { // WriteContext 写入数据 WriteContext(data []byte, context interface{}) error + // PushMessage 通过连接推送特定消息到队列中进行处理 + PushMessage(message Message) + + // PushSyncMessage 是 PushMessage 中对于 GenerateConnSyncMessage 的快捷方式 PushSyncMessage(handler func(srv Server, conn Conn)) - PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error)) + // PushAsyncMessage 是 PushMessage 中对于 GenerateConnAsyncMessage 的快捷方式,当 callback 传入多个时,将仅有首个生效 + PushAsyncMessage(handler func(srv Server, conn Conn) error, callback ...func(srv Server, conn Conn, err error)) } func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn { @@ -42,17 +50,25 @@ func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn { type conn struct { server *server - conn net.Conn // 连接 - writer ConnWriter // 写入器 - actor atomic.String // Actor 名称 + conn net.Conn // 连接 + writer ConnWriter // 写入器 + actor atomic.Pointer[string] // Actor 名称 } func (c *conn) SetActor(actor string) { - c.actor.Store(actor) + c.actor.Store(&actor) } -func (c *conn) GetActor() string { - return c.actor.Load() +func (c *conn) DelActor() { + c.actor.Store(nil) +} + +func (c *conn) GetActor() (string, bool) { + ident := c.actor.Load() + if ident == nil { + return "", false + } + return *ident, true } func (c *conn) WritePacket(packet Packet) error { @@ -72,22 +88,29 @@ func (c *conn) WriteContext(data []byte, context interface{}) error { return c.writer(NewPacket(data).SetContext(context)) } +func (c *conn) PushMessage(message Message) { + c.getDispatchHandler()(message) +} + func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) { - if err := c.server.reactor.AutoDispatch(c.GetActor(), SyncMessage(c.server, func(srv *server) { - handler(srv, c) - })); err != nil { - panic(err) + c.PushMessage(GenerateConnSyncMessage(c, handler)) +} + +func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callback ...func(srv Server, conn Conn, err error)) { + var cb func(srv Server, conn Conn, err error) + if len(callback) > 0 { + cb = callback[0] } + c.PushMessage(GenerateConnAsyncMessage(c, handler, cb)) } -func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error)) { - if err := c.server.reactor.AutoDispatch(c.GetActor(), AsyncMessage(c.server, c.GetActor(), func(srv *server) error { - return handler(srv, c) - }, func(srv *server, err error) { - for _, callback := range callbacks { - callback(srv, c, err) +func (c *conn) getDispatchHandler() func(message Message) { + var ident, exist = c.GetActor() + return func(message Message) { + if !exist { + c.server.PushSystemMessage(message) + } else { + c.server.PushIdentMessage(ident, message) } - })); err != nil { - panic(err) } } diff --git a/server/internal/v2/controller.go b/server/internal/v2/controller.go index f22c772b..608eb43d 100644 --- a/server/internal/v2/controller.go +++ b/server/internal/v2/controller.go @@ -1,11 +1,29 @@ package server -import "net" +import ( + "github.com/kercylan98/minotaur/utils/log" + "github.com/panjf2000/ants/v2" + "net" +) +// Controller 控制器是暴露 Server 对用户非公开的接口信息,适用于功能性的拓展 type Controller interface { + // GetServer 获取服务器 + GetServer() Server + // RegisterConnection 注册连接 RegisterConnection(conn net.Conn, writer ConnWriter) + // EliminateConnection 消除连接 EliminateConnection(conn net.Conn, err error) + // ReactPacket 反应连接数据包 ReactPacket(conn net.Conn, packet Packet) + // GetAnts 获取服务器异步池 + GetAnts() *ants.Pool + // PushSystemMessage 推送系统消息 + PushSystemMessage(message Message, errorHandlers ...func(err error)) + // PushIdentMessage 推送标识消息 + PushIdentMessage(ident string, message Message, errorHandlers ...func(err error)) + // MessageErrProcess 消息错误处理 + MessageErrProcess(message Message, err error) } type controller struct { @@ -19,37 +37,77 @@ func (s *controller) init(srv *server) *controller { return s } +func (s *controller) GetServer() Server { + return s.server +} + +func (s *controller) MessageErrProcess(message Message, err error) { + if err == nil { + return + } + if s.server.messageErrorHandler != nil { + s.server.messageErrorHandler(s.server, message, err) + } else { + s.server.GetLogger().Error("Server", log.Err(err)) + } +} + +func (s *controller) GetAnts() *ants.Pool { + return s.server.ants +} + +func (s *controller) PushSystemMessage(message Message, errorHandlers ...func(err error)) { + if err := s.server.reactor.SystemDispatch(message); err != nil { + for _, f := range errorHandlers { + f(err) + } + s.MessageErrProcess(message, err) + } +} + +func (s *controller) PushIdentMessage(ident string, message Message, errorHandlers ...func(err error)) { + if err := s.server.reactor.IdentDispatch(ident, message); err != nil { + for _, f := range errorHandlers { + f(err) + } + s.MessageErrProcess(message, err) + } +} + func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) { - if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { + s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { c := newConn(s.server, conn, writer) - srv.connections[conn] = c + s.server.connections[conn] = c s.events.onConnectionOpened(c) - })); err != nil { - panic(err) - } + })) } func (s *controller) EliminateConnection(conn net.Conn, err error) { - if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { - c, exist := srv.connections[conn] + s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { + c, exist := s.server.connections[conn] if !exist { return } - delete(srv.connections, conn) - srv.events.onConnectionClosed(c, err) - })); err != nil { - panic(err) - } + delete(s.server.connections, conn) + s.server.events.onConnectionClosed(c, err) + })) } func (s *controller) ReactPacket(conn net.Conn, packet Packet) { - if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { - c, exist := srv.connections[conn] + s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { + c, exist := s.server.connections[conn] if !exist { return } - srv.events.onConnectionReceivePacket(c, packet) - })); err != nil { - panic(err) - } + ident, exist := c.GetActor() + if !exist { + s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { + s.events.onConnectionReceivePacket(c, packet) + })) + } else { + s.PushIdentMessage(ident, GenerateSystemSyncMessage(func(srv Server) { + s.events.onConnectionReceivePacket(c, packet) + })) + } + })) } diff --git a/server/internal/v2/events.go b/server/internal/v2/events.go index 0e8f0a1e..74a7ee40 100644 --- a/server/internal/v2/events.go +++ b/server/internal/v2/events.go @@ -66,7 +66,7 @@ func (s *events) onLaunched() { opt.logger.Info("Minotaur Server", log.String("", "============================================================================")) }) - _ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { + s.PushMessage(GenerateSystemSyncMessage(func(srv Server) { s.launchedEventHandlers.RangeValue(func(index int, value LaunchedEventHandler) bool { value(s.server, s.server.state.Ip, s.server.state.LaunchedAt) return true @@ -79,7 +79,7 @@ func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHand } func (s *events) onConnectionOpened(conn Conn) { - _ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { + s.PushMessage(GenerateSystemSyncMessage(func(srv Server) { s.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool { value(s.server, conn) return true @@ -92,7 +92,7 @@ func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHand } func (s *events) onConnectionClosed(conn Conn, err error) { - _ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { + s.PushMessage(GenerateSystemSyncMessage(func(srv Server) { s.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool { value(s.server, conn, err) return true @@ -104,8 +104,8 @@ func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceiveP s.connectionReceivePacketEventHandlers.AppendByOptionalPriority(handler, priority...) } -func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) { - _ = s.server.reactor.AutoDispatch(conn.GetActor(), SyncMessage(s.server, func(srv *server) { +func (s *events) onConnectionReceivePacket(conn *conn, packet Packet) { + conn.getDispatchHandler()(GenerateConnSyncMessage(conn, func(srv Server, conn Conn) { s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool { value(s.server, conn, packet) return true @@ -118,7 +118,7 @@ func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority .. } func (s *events) onShutdown() { - _ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { + s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { s.shutdownEventHandlers.RangeValue(func(index int, value ShutdownEventHandler) bool { value(s.server) return true diff --git a/server/internal/v2/message.go b/server/internal/v2/message.go index 128316d8..b466c320 100644 --- a/server/internal/v2/message.go +++ b/server/internal/v2/message.go @@ -3,98 +3,213 @@ package server import ( "github.com/kercylan98/minotaur/server/internal/v2/queue" "github.com/kercylan98/minotaur/server/internal/v2/reactor" - "github.com/kercylan98/minotaur/utils/log/v2" - "github.com/kercylan98/minotaur/utils/super" - "runtime/debug" ) -type MessageI interface { +type Message interface { // OnInitialize 消息初始化阶段将会被告知消息所在服务器、反应器、队列及标识信息 - OnInitialize(srv Server, reactor *reactor.Reactor[Message], queue *queue.Queue[int, string, Message], ident string) + OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) // OnProcess 消息处理阶段需要完成对消息的处理,并返回处理结果 - OnProcess(finish func(err error)) + OnProcess() } -type Message interface { - OnExecute() +// GenerateSystemSyncMessage 生成系统同步消息 +func GenerateSystemSyncMessage(handler func(srv Server)) Message { + return &systemSyncMessage{handler: handler} } -func SyncMessage(srv *server, handler func(srv *server)) Message { - return &syncMessage{srv: srv, handler: handler} +type systemSyncMessage struct { + controller Controller + handler func(srv Server) } -type syncMessage struct { - srv *server - handler func(srv *server) +func (m *systemSyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { + m.controller = controller } -func (s *syncMessage) OnExecute() { - s.handler(s.srv) +func (m *systemSyncMessage) OnProcess() { + m.handler(m.controller.GetServer()) } -func AsyncMessage(srv *server, ident string, handler func(srv *server) error, callback func(srv *server, err error)) Message { - return &asyncMessage{ - ident: ident, - srv: srv, +// GenerateSystemAsyncMessage 生成系统异步消息 +func GenerateSystemAsyncMessage(handler func(srv Server) error, callback func(srv Server, err error)) Message { + return &systemAsyncMessage{ handler: handler, callback: callback, } } -type asyncMessage struct { - ident string - srv *server - handler func(srv *server) error - callback func(srv *server, err error) +type systemAsyncMessage struct { + controller Controller + queue *queue.Queue[int, string, Message] + handler func(srv Server) error + callback func(srv Server, err error) + hasIdent bool + ident string +} + +func (m *systemAsyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { + m.controller = controller + m.queue = message.Queue() + m.ident = message.Ident() + m.hasIdent = message.HasIdent() +} + +func (m *systemAsyncMessage) OnProcess() { + var ident = m.ident + + m.queue.WaitAdd(ident, 1) + err := m.controller.GetAnts().Submit(func() { + err := m.handler(m.controller.GetServer()) + if !m.hasIdent { + m.controller.PushSystemMessage(GenerateSystemAsyncCallbackMessage(m.callback, err), func(err error) { + m.queue.WaitAdd(ident, -1) + }) + } else { + m.controller.PushIdentMessage(ident, GenerateSystemAsyncCallbackMessage(m.callback, err), func(err error) { + m.queue.WaitAdd(ident, -1) + }) + } + if err != nil { + m.queue.WaitAdd(ident, -1) + } + }) + if err != nil { + m.controller.MessageErrProcess(m, err) + m.queue.WaitAdd(ident, -1) + } +} + +// GenerateSystemAsyncCallbackMessage 生成系统异步回调消息 +func GenerateSystemAsyncCallbackMessage(handler func(srv Server, err error), err error) Message { + return &systemAsyncCallbackMessage{ + err: err, + handler: handler, + } +} + +type systemAsyncCallbackMessage struct { + controller Controller + err error + handler func(srv Server, err error) + queue *queue.Queue[int, string, Message] + ident string +} + +func (m *systemAsyncCallbackMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { + m.controller = controller + m.queue = message.Queue() + m.ident = message.Ident() +} + +func (m *systemAsyncCallbackMessage) OnProcess() { + defer func(m *systemAsyncCallbackMessage) { + m.queue.WaitAdd(m.ident, -1) + }(m) + + if m.handler != nil { + m.handler(m.controller.GetServer(), m.err) + } +} + +// GenerateConnSyncMessage 生成连接同步消息 +func GenerateConnSyncMessage(conn Conn, handler func(srv Server, conn Conn)) Message { + return &connSyncMessage{handler: handler, conn: conn} +} + +type connSyncMessage struct { + controller Controller + conn Conn + handler func(srv Server, conn Conn) +} + +func (m *connSyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { + m.controller = controller } -func (s *asyncMessage) OnExecute() { - var q *queue.Queue[int, string, Message] - var dispatch = func(ident string, message Message, beforeHandler ...func(queue *queue.Queue[int, string, Message], msg Message)) { - _ = s.srv.reactor.AutoDispatch(ident, message, beforeHandler...) +func (m *connSyncMessage) OnProcess() { + m.handler(m.controller.GetServer(), m.conn) +} + +// GenerateConnAsyncMessage 生成连接异步消息 +func GenerateConnAsyncMessage(conn Conn, handler func(srv Server, conn Conn) error, callback func(srv Server, conn Conn, err error)) Message { + return &connAsyncMessage{ + conn: conn, + handler: handler, + callback: callback, } +} + +type connAsyncMessage struct { + controller Controller + conn Conn + queue *queue.Queue[int, string, Message] + handler func(srv Server, conn Conn) error + callback func(srv Server, conn Conn, err error) + ident string + hasIdent bool +} - dispatch( - s.ident, - SyncMessage(s.srv, func(srv *server) { - _ = srv.ants.Submit(func() { - defer func(srv *server, msg *asyncMessage) { - if err := super.RecoverTransform(recover()); err != nil { - if errHandler := srv.GetMessageErrorHandler(); errHandler != nil { - errHandler(srv, msg, err) - } else { - srv.GetLogger().Error("Message", log.Err(err)) - debug.PrintStack() - } - } - }(s.srv, s) - - err := s.handler(srv) - var msg Message - msg = SyncMessage(srv, func(srv *server) { - defer func() { - q.WaitAdd(s.ident, -1) - if err := super.RecoverTransform(recover()); err != nil { - if errHandler := srv.GetMessageErrorHandler(); errHandler != nil { - errHandler(srv, msg, err) - } else { - srv.GetLogger().Error("Message", log.Err(err)) - debug.PrintStack() - } - } - }() - if s.callback != nil { - s.callback(srv, err) - } - }) - dispatch(s.ident, msg) +func (m *connAsyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { + m.controller = controller + m.queue = message.Queue() + m.ident = message.Ident() + m.hasIdent = message.HasIdent() +} +func (m *connAsyncMessage) OnProcess() { + m.queue.WaitAdd(m.ident, 1) + err := m.controller.GetAnts().Submit(func() { + err := m.handler(m.controller.GetServer(), m.conn) + if !m.hasIdent { + m.controller.PushSystemMessage(GenerateConnAsyncCallbackMessage(m.conn, m.callback, err), func(err error) { + m.queue.WaitAdd(m.ident, -1) + }) + } else { + m.controller.PushIdentMessage(m.ident, GenerateConnAsyncCallbackMessage(m.conn, m.callback, err), func(err error) { + m.queue.WaitAdd(m.ident, -1) }) - }), - func(queue *queue.Queue[int, string, Message], msg Message) { - queue.WaitAdd(s.ident, 1) - q = queue - }, - ) + } + if err != nil { + m.queue.WaitAdd(m.ident, -1) + } + }) + if err != nil { + m.controller.MessageErrProcess(m, err) + m.queue.WaitAdd(m.ident, -1) + } +} + +// GenerateConnAsyncCallbackMessage 生成连接异步回调消息 +func GenerateConnAsyncCallbackMessage(conn Conn, handler func(srv Server, conn Conn, err error), err error) Message { + return &connAsyncCallbackMessage{ + conn: conn, + err: err, + handler: handler, + } +} + +type connAsyncCallbackMessage struct { + controller Controller + conn Conn + err error + handler func(srv Server, conn Conn, err error) + queue *queue.Queue[int, string, Message] + ident string +} + +func (m *connAsyncCallbackMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { + m.controller = controller + m.queue = message.Queue() + m.ident = message.Ident() +} + +func (m *connAsyncCallbackMessage) OnProcess() { + defer func(m *connAsyncCallbackMessage) { + m.queue.WaitAdd(m.ident, -1) + }(m) + + if m.handler != nil { + m.handler(m.controller.GetServer(), m.conn, m.err) + } } diff --git a/server/internal/v2/messages/async.go b/server/internal/v2/messages/async.go deleted file mode 100644 index c1686557..00000000 --- a/server/internal/v2/messages/async.go +++ /dev/null @@ -1,27 +0,0 @@ -package messages - -import ( - "github.com/kercylan98/minotaur/server/internal/v2" - "github.com/kercylan98/minotaur/server/internal/v2/queue" - "github.com/kercylan98/minotaur/server/internal/v2/reactor" - "github.com/kercylan98/minotaur/utils/super" -) - -func NewAsync(handler func() error, callback func(err error)) server.MessageI { - return &Async{handler: handler} -} - -type Async struct { - handler func() error - callback func(err error) -} - -func (s *Async) OnInitialize(srv server.Server, reactor *reactor.Reactor[server.Message], queue *queue.Queue[int, string, server.Message], ident string) { - -} - -func (s *Async) OnProcess(finish func(err error)) { - defer finish(super.RecoverTransform(recover())) - - s.handler() -} diff --git a/server/internal/v2/messages/sync.go b/server/internal/v2/messages/sync.go deleted file mode 100644 index dac04dcb..00000000 --- a/server/internal/v2/messages/sync.go +++ /dev/null @@ -1,26 +0,0 @@ -package messages - -import ( - "github.com/kercylan98/minotaur/server/internal/v2" - "github.com/kercylan98/minotaur/server/internal/v2/queue" - "github.com/kercylan98/minotaur/server/internal/v2/reactor" - "github.com/kercylan98/minotaur/utils/super" -) - -func NewSync(handler func()) server.MessageI { - return &Sync{handler: handler} -} - -type Sync struct { - handler func() -} - -func (s *Sync) OnInitialize(srv server.Server, reactor *reactor.Reactor[server.Message], queue *queue.Queue[int, string, server.Message], ident string) { - -} - -func (s *Sync) OnProcess(finish func(err error)) { - defer finish(super.RecoverTransform(recover())) - - s.handler() -} diff --git a/server/internal/v2/options.go b/server/internal/v2/options.go index dfc8171c..e8862a0e 100644 --- a/server/internal/v2/options.go +++ b/server/internal/v2/options.go @@ -64,6 +64,9 @@ func (opt *Options) Apply(options ...*Options) { } if opt.server != nil && !opt.server.state.LaunchedAt.IsZero() { opt.active() + if opt.server.reactor != nil { + opt.server.reactor.SetLogger(opt.logger) + } } } @@ -117,7 +120,9 @@ func (opt *Options) IsDebug() bool { func (opt *Options) WithLogger(logger *log.Logger) *Options { return opt.modifyOptionsValue(func(opt *Options) { opt.logger = logger - opt.server.reactor.SetLogger(opt.logger) + if opt.server != nil && opt.server.reactor != nil { + opt.server.reactor.SetLogger(opt.logger) + } }) } diff --git a/server/internal/v2/queue/message_wrapper.go b/server/internal/v2/queue/message_wrapper.go index 0dc345cc..c8ef0016 100644 --- a/server/internal/v2/queue/message_wrapper.go +++ b/server/internal/v2/queue/message_wrapper.go @@ -1,10 +1,20 @@ package queue +func messageWrapper[Id, Ident comparable, M Message](queue *Queue[Id, Ident, M], hasIdent bool, ident Ident, msg M) MessageWrapper[Id, Ident, M] { + return MessageWrapper[Id, Ident, M]{ + queue: queue, + hasIdent: hasIdent, + ident: ident, + msg: msg, + } +} + // MessageWrapper 提供了对外部消息的包装,用于方便的获取消息信息 type MessageWrapper[Id, Ident comparable, M Message] struct { - queue *Queue[Id, Ident, M] // 处理消息的队列 - ident Ident // 消息所有人 - msg M // 消息信息 + queue *Queue[Id, Ident, M] // 处理消息的队列 + ident Ident // 消息所有人 + msg M // 消息信息 + hasIdent bool // 是否拥有所有人 } // Queue 返回处理该消息的队列 @@ -17,6 +27,11 @@ func (m MessageWrapper[Id, Ident, M]) Ident() Ident { return m.ident } +// HasIdent 返回消息是否拥有所有人 +func (m MessageWrapper[Id, Ident, M]) HasIdent() bool { + return m.hasIdent +} + // Message 返回消息的具体实例 func (m MessageWrapper[Id, Ident, M]) Message() M { return m.msg diff --git a/server/internal/v2/queue/queue.go b/server/internal/v2/queue/queue.go index c5806d6e..74815b31 100644 --- a/server/internal/v2/queue/queue.go +++ b/server/internal/v2/queue/queue.go @@ -103,17 +103,14 @@ func (q *Queue[Id, Ident, M]) Run() { } // Push 向队列中推送来自 ident 的消息 m,当队列已关闭时将会返回 ErrorQueueClosed -func (q *Queue[Id, Ident, M]) Push(ident Ident, m M) error { +func (q *Queue[Id, Ident, M]) Push(hasIdent bool, ident Ident, m M) error { if atomic.LoadInt32(&q.state.status) > StatusClosing { return ErrorQueueClosed } q.cond.L.Lock() q.identifiers[ident]++ q.state.total++ - q.buf.Write(MessageWrapper[Id, Ident, M]{ - ident: ident, - msg: m, - }) + q.buf.Write(messageWrapper(q, hasIdent, ident, m)) //log.Info("消息总计数", log.Int64("计数", q.state.total)) q.cond.Signal() q.cond.L.Unlock() diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go index 72a7879a..55cd73dc 100644 --- a/server/internal/v2/reactor/reactor.go +++ b/server/internal/v2/reactor/reactor.go @@ -5,7 +5,6 @@ import ( "github.com/kercylan98/minotaur/server/internal/v2/loadbalancer" "github.com/kercylan98/minotaur/server/internal/v2/queue" "github.com/kercylan98/minotaur/utils/log/v2" - "github.com/kercylan98/minotaur/utils/str" "github.com/kercylan98/minotaur/utils/super" "runtime" "runtime/debug" @@ -20,8 +19,6 @@ const ( statusClosed // 事件循环已关闭 ) -const SysIdent = str.None - // NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列 func NewReactor[M queue.Message](systemQueueSize, queueSize, systemBufferSize, queueBufferSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { if handler == nil { @@ -84,7 +81,7 @@ func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) { if r.errorHandler != nil { r.errorHandler(msg, err) } else { - r.GetLogger().Error("Reactor", log.String("action", "process"), log.String("ident", msg.Ident()), log.Int("queue", msg.Queue().Id()), log.Err(err)) + r.GetLogger().Error("Reactor", log.String("action", "process"), log.Any("ident", msg.Ident()), log.Int("queue", msg.Queue().Id()), log.Err(err)) debug.PrintStack() } } @@ -93,29 +90,18 @@ func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) { r.handler(msg) } -// AutoDispatch 自动分发,当 ident 为空字符串时,分发到系统级别的队列,否则分发到 ident 使用的队列 -func (r *Reactor[M]) AutoDispatch(ident string, msg M, beforeHandler ...func(queue *queue.Queue[int, string, M], msg M)) error { - if ident == str.None { - return r.DispatchWithSystem(msg, beforeHandler...) - } - return r.Dispatch(ident, msg, beforeHandler...) -} - -// DispatchWithSystem 将消息分发到系统级别的队列 -func (r *Reactor[M]) DispatchWithSystem(msg M, beforeHandler ...func(queue *queue.Queue[int, string, M], msg M)) error { +// SystemDispatch 将消息分发到系统级别的队列 +func (r *Reactor[M]) SystemDispatch(msg M) error { if atomic.LoadInt32(&r.state) > statusClosing { r.queueRW.RUnlock() return fmt.Errorf("reactor closing or closed") } - for _, f := range beforeHandler { - f(r.systemQueue, msg) - } - return r.systemQueue.Push(SysIdent, msg) + return r.systemQueue.Push(false, "", msg) } -// Dispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列 +// IdentDispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列 // - 设置 count 会增加消息的外部计数,当 Reactor 关闭时会等待外部计数归零 -func (r *Reactor[M]) Dispatch(ident string, msg M, beforeHandler ...func(queue *queue.Queue[int, string, M], msg M)) error { +func (r *Reactor[M]) IdentDispatch(ident string, msg M) error { r.queueRW.RLock() if atomic.LoadInt32(&r.state) > statusClosing { r.queueRW.RUnlock() @@ -131,7 +117,7 @@ func (r *Reactor[M]) Dispatch(ident string, msg M, beforeHandler ...func(queue * if i, exist = r.location[ident]; !exist { next = r.lb.Next() r.location[ident] = next.Id() - r.logger.Load().Debug("Reactor", log.String("action", "bind"), log.String("ident", ident), log.Any("queue", next.Id())) + r.logger.Load().Debug("Reactor", log.String("action", "bind"), log.Any("ident", ident), log.Any("queue", next.Id())) } else { next = r.queues[i] } @@ -140,10 +126,7 @@ func (r *Reactor[M]) Dispatch(ident string, msg M, beforeHandler ...func(queue * next = r.queues[i] } r.queueRW.RUnlock() - for _, f := range beforeHandler { - f(next, msg) - } - return next.Push(ident, msg) + return next.Push(true, ident, msg) } // Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列 @@ -207,7 +190,7 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) { r.queueRW.RLock() mq := r.queues[mq] r.queueRW.RUnlock() - r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.String("ident", m.Ident()), log.Any("queue", mq.Id())) + r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.Any("ident", m.Ident()), log.Any("queue", mq.Id())) } } } diff --git a/server/internal/v2/reactor/reactor_test.go b/server/internal/v2/reactor/reactor_test.go index 91dc61af..e557d9f0 100644 --- a/server/internal/v2/reactor/reactor_test.go +++ b/server/internal/v2/reactor/reactor_test.go @@ -24,7 +24,7 @@ func BenchmarkReactor_Dispatch(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - if err := r.Dispatch(random.HostName(), func() { + if err := r.IdentDispatch(random.HostName(), func() { }); err != nil { return } @@ -46,7 +46,7 @@ func TestReactor_Dispatch(t *testing.T) { id := random.HostName() for { time.Sleep(time.Millisecond * 20) - if err := r.Dispatch(id, func() { + if err := r.IdentDispatch(id, func() { }); err != nil { return diff --git a/server/internal/v2/server.go b/server/internal/v2/server.go index 3f5066df..3d701d39 100644 --- a/server/internal/v2/server.go +++ b/server/internal/v2/server.go @@ -23,11 +23,14 @@ type Server interface { // GetStatus 获取服务器状态 GetStatus() *State - // PushSyncMessage 推送同步消息 + // PushMessage 推送特定消息到系统队列中进行处理 + PushMessage(message Message) + + // PushSyncMessage 是 PushMessage 中对于 GenerateSystemSyncMessage 的快捷方式 PushSyncMessage(handler func(srv Server)) - // PushAsyncMessage 推送异步消息 - PushAsyncMessage(handler func(srv Server) error, callbacks ...func(srv Server, err error)) + // PushAsyncMessage 是 PushMessage 中对于 GenerateSystemAsyncMessage 的快捷方式,当 callback 传入多个时,将仅有首个生效 + PushAsyncMessage(handler func(srv Server) error, callback ...func(srv Server, err error)) } type server struct { @@ -56,9 +59,8 @@ func NewServer(network Network, options ...*Options) Server { srv.reactor = reactor.NewReactor[Message]( srv.GetServerMessageChannelSize(), srv.GetActorMessageChannelSize(), srv.GetServerMessageBufferInitialSize(), srv.GetActorMessageBufferInitialSize(), - func(message queue.MessageWrapper[int, string, Message]) { - message.Message().OnExecute() - }, func(message queue.MessageWrapper[int, string, Message], err error) { + srv.onProcessMessage, + func(message queue.MessageWrapper[int, string, Message], err error) { if handler := srv.GetMessageErrorHandler(); handler != nil { handler(srv, message.Message(), err) } @@ -121,26 +123,31 @@ func (s *server) Shutdown() (err error) { return } +func (s *server) PushMessage(message Message) { + s.controller.PushSystemMessage(message) +} + func (s *server) PushSyncMessage(handler func(srv Server)) { - if err := s.reactor.DispatchWithSystem(SyncMessage(s, func(srv *server) { - handler(srv) - })); err != nil { - panic(err) - } + s.PushMessage(GenerateSystemSyncMessage(handler)) } -func (s *server) PushAsyncMessage(handler func(srv Server) error, callbacks ...func(srv Server, err error)) { - if err := s.reactor.DispatchWithSystem(AsyncMessage(s, reactor.SysIdent, func(srv *server) error { - return handler(srv) - }, func(srv *server, err error) { - for _, callback := range callbacks { - callback(srv, err) - } - })); err != nil { - panic(err) +func (s *server) PushAsyncMessage(handler func(srv Server) error, callback ...func(srv Server, err error)) { + var cb func(srv Server, err error) + if len(callback) > 0 { + cb = callback[0] } + s.PushMessage(GenerateSystemAsyncMessage(handler, cb)) } func (s *server) GetStatus() *State { return s.state.Status() } + +func (s *server) onProcessMessage(message queue.MessageWrapper[int, string, Message]) { + s.getManyOptions(func(opt *Options) { + m := message.Message() + m.OnInitialize(s, s.reactor, message) + m.OnProcess() + }) + +} diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go index b5d404e2..8eeda7b3 100644 --- a/server/internal/v2/server_test.go +++ b/server/internal/v2/server_test.go @@ -4,6 +4,7 @@ import ( "github.com/gobwas/ws" "github.com/kercylan98/minotaur/server/internal/v2" "github.com/kercylan98/minotaur/server/internal/v2/network" + "github.com/kercylan98/minotaur/utils/log/v2" "github.com/kercylan98/minotaur/utils/random" "github.com/kercylan98/minotaur/utils/times" "testing" @@ -24,7 +25,7 @@ func TestNewServer(t *testing.T) { }) }() - srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Day*3)) + srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Day*3).WithLogger(log.GetLogger())) var tm = make(map[string]bool) @@ -34,7 +35,10 @@ func TestNewServer(t *testing.T) { t.Error(err) } - conn.PushSyncMessage(func(srv server.Server, conn server.Conn) { + conn.PushAsyncMessage(func(srv server.Server, conn server.Conn) error { + time.Sleep(time.Second * 5) + return nil + }, func(srv server.Server, conn server.Conn, err error) { for i := 0; i < 10000000; i++ { _ = tm["1"] tm["1"] = random.Bool() @@ -46,14 +50,14 @@ func TestNewServer(t *testing.T) { if err := conn.WritePacket(packet); err != nil { panic(err) } - srv.PushAsyncMessage(func(srv server.Server) error { - for i := 0; i < 3; i++ { - time.Sleep(time.Second) - } - return nil - }, func(srv server.Server, err error) { - t.Log("callback") - }) + //srv.PushAsyncMessage(func(srv server.Server) error { + // for i := 0; i < 3; i++ { + // time.Sleep(time.Second) + // } + // return nil + //}, func(srv server.Server, err error) { + // t.Log("callback") + //}) }) if err := srv.Run(); err != nil {