Skip to content

Commit

Permalink
other: 服务器消息优化
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Apr 7, 2024
1 parent 16704bf commit 35e13d9
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 234 deletions.
65 changes: 44 additions & 21 deletions server/internal/v2/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ type Conn interface {
// SetActor 设置连接使用的 Actor 名称
SetActor(actor string)

// GetActor 获取连接使用的 Actor 名称
GetActor() string
// DelActor 删除连接使用的 Actor
DelActor()

// GetActor 获取连接使用的 Actor 名称及是否拥有 Actor 名称的状态
GetActor() (string, bool)

// WritePacket 写入一个 Packet
WritePacket(packet Packet) error
Expand All @@ -27,9 +30,14 @@ type Conn interface {
// WriteContext 写入数据
WriteContext(data []byte, context interface{}) error

// PushMessage 通过连接推送特定消息到队列中进行处理
PushMessage(message Message)

// PushSyncMessage 是 PushMessage 中对于 GenerateConnSyncMessage 的快捷方式
PushSyncMessage(handler func(srv Server, conn Conn))

PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error))
// PushAsyncMessage 是 PushMessage 中对于 GenerateConnAsyncMessage 的快捷方式,当 callback 传入多个时,将仅有首个生效
PushAsyncMessage(handler func(srv Server, conn Conn) error, callback ...func(srv Server, conn Conn, err error))
}

func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn {
Expand All @@ -42,17 +50,25 @@ func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn {

type conn struct {
server *server
conn net.Conn // 连接
writer ConnWriter // 写入器
actor atomic.String // Actor 名称
conn net.Conn // 连接
writer ConnWriter // 写入器
actor atomic.Pointer[string] // Actor 名称
}

func (c *conn) SetActor(actor string) {
c.actor.Store(actor)
c.actor.Store(&actor)
}

func (c *conn) GetActor() string {
return c.actor.Load()
func (c *conn) DelActor() {
c.actor.Store(nil)
}

func (c *conn) GetActor() (string, bool) {
ident := c.actor.Load()
if ident == nil {
return "", false
}
return *ident, true
}

func (c *conn) WritePacket(packet Packet) error {
Expand All @@ -72,22 +88,29 @@ func (c *conn) WriteContext(data []byte, context interface{}) error {
return c.writer(NewPacket(data).SetContext(context))
}

func (c *conn) PushMessage(message Message) {
c.getDispatchHandler()(message)
}

func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) {
if err := c.server.reactor.AutoDispatch(c.GetActor(), SyncMessage(c.server, func(srv *server) {
handler(srv, c)
})); err != nil {
panic(err)
c.PushMessage(GenerateConnSyncMessage(c, handler))
}

func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callback ...func(srv Server, conn Conn, err error)) {
var cb func(srv Server, conn Conn, err error)
if len(callback) > 0 {
cb = callback[0]
}
c.PushMessage(GenerateConnAsyncMessage(c, handler, cb))
}

func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error)) {
if err := c.server.reactor.AutoDispatch(c.GetActor(), AsyncMessage(c.server, c.GetActor(), func(srv *server) error {
return handler(srv, c)
}, func(srv *server, err error) {
for _, callback := range callbacks {
callback(srv, c, err)
func (c *conn) getDispatchHandler() func(message Message) {
var ident, exist = c.GetActor()
return func(message Message) {
if !exist {
c.server.PushSystemMessage(message)
} else {
c.server.PushIdentMessage(ident, message)
}
})); err != nil {
panic(err)
}
}
96 changes: 77 additions & 19 deletions server/internal/v2/controller.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
package server

import "net"
import (
"github.com/kercylan98/minotaur/utils/log"
"github.com/panjf2000/ants/v2"
"net"
)

// Controller 控制器是暴露 Server 对用户非公开的接口信息,适用于功能性的拓展
type Controller interface {
// GetServer 获取服务器
GetServer() Server
// RegisterConnection 注册连接
RegisterConnection(conn net.Conn, writer ConnWriter)
// EliminateConnection 消除连接
EliminateConnection(conn net.Conn, err error)
// ReactPacket 反应连接数据包
ReactPacket(conn net.Conn, packet Packet)
// GetAnts 获取服务器异步池
GetAnts() *ants.Pool
// PushSystemMessage 推送系统消息
PushSystemMessage(message Message, errorHandlers ...func(err error))
// PushIdentMessage 推送标识消息
PushIdentMessage(ident string, message Message, errorHandlers ...func(err error))
// MessageErrProcess 消息错误处理
MessageErrProcess(message Message, err error)
}

type controller struct {
Expand All @@ -19,37 +37,77 @@ func (s *controller) init(srv *server) *controller {
return s
}

func (s *controller) GetServer() Server {
return s.server
}

func (s *controller) MessageErrProcess(message Message, err error) {
if err == nil {
return
}
if s.server.messageErrorHandler != nil {
s.server.messageErrorHandler(s.server, message, err)
} else {
s.server.GetLogger().Error("Server", log.Err(err))
}
}

func (s *controller) GetAnts() *ants.Pool {
return s.server.ants
}

func (s *controller) PushSystemMessage(message Message, errorHandlers ...func(err error)) {
if err := s.server.reactor.SystemDispatch(message); err != nil {
for _, f := range errorHandlers {
f(err)
}
s.MessageErrProcess(message, err)
}
}

func (s *controller) PushIdentMessage(ident string, message Message, errorHandlers ...func(err error)) {
if err := s.server.reactor.IdentDispatch(ident, message); err != nil {
for _, f := range errorHandlers {
f(err)
}
s.MessageErrProcess(message, err)
}
}

func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) {
if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
c := newConn(s.server, conn, writer)
srv.connections[conn] = c
s.server.connections[conn] = c
s.events.onConnectionOpened(c)
})); err != nil {
panic(err)
}
}))
}

func (s *controller) EliminateConnection(conn net.Conn, err error) {
if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
c, exist := srv.connections[conn]
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
c, exist := s.server.connections[conn]
if !exist {
return
}
delete(srv.connections, conn)
srv.events.onConnectionClosed(c, err)
})); err != nil {
panic(err)
}
delete(s.server.connections, conn)
s.server.events.onConnectionClosed(c, err)
}))
}

func (s *controller) ReactPacket(conn net.Conn, packet Packet) {
if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
c, exist := srv.connections[conn]
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
c, exist := s.server.connections[conn]
if !exist {
return
}
srv.events.onConnectionReceivePacket(c, packet)
})); err != nil {
panic(err)
}
ident, exist := c.GetActor()
if !exist {
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
s.events.onConnectionReceivePacket(c, packet)
}))
} else {
s.PushIdentMessage(ident, GenerateSystemSyncMessage(func(srv Server) {
s.events.onConnectionReceivePacket(c, packet)
}))
}
}))
}
12 changes: 6 additions & 6 deletions server/internal/v2/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *events) onLaunched() {
opt.logger.Info("Minotaur Server", log.String("", "============================================================================"))
})

_ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
s.PushMessage(GenerateSystemSyncMessage(func(srv Server) {
s.launchedEventHandlers.RangeValue(func(index int, value LaunchedEventHandler) bool {
value(s.server, s.server.state.Ip, s.server.state.LaunchedAt)
return true
Expand All @@ -79,7 +79,7 @@ func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHand
}

func (s *events) onConnectionOpened(conn Conn) {
_ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
s.PushMessage(GenerateSystemSyncMessage(func(srv Server) {
s.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool {
value(s.server, conn)
return true
Expand All @@ -92,7 +92,7 @@ func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHand
}

func (s *events) onConnectionClosed(conn Conn, err error) {
_ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
s.PushMessage(GenerateSystemSyncMessage(func(srv Server) {
s.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool {
value(s.server, conn, err)
return true
Expand All @@ -104,8 +104,8 @@ func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceiveP
s.connectionReceivePacketEventHandlers.AppendByOptionalPriority(handler, priority...)
}

func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) {
_ = s.server.reactor.AutoDispatch(conn.GetActor(), SyncMessage(s.server, func(srv *server) {
func (s *events) onConnectionReceivePacket(conn *conn, packet Packet) {
conn.getDispatchHandler()(GenerateConnSyncMessage(conn, func(srv Server, conn Conn) {
s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool {
value(s.server, conn, packet)
return true
Expand All @@ -118,7 +118,7 @@ func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority ..
}

func (s *events) onShutdown() {
_ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
s.shutdownEventHandlers.RangeValue(func(index int, value ShutdownEventHandler) bool {
value(s.server)
return true
Expand Down
Loading

0 comments on commit 35e13d9

Please sign in to comment.