diff --git a/go.mod b/go.mod index c551cd6d..e73072f3 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/nats-io/nats-server/v2 v2.9.16 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/panjf2000/ants/v2 v2.8.1 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/smartystreets/assertions v1.13.1 // indirect diff --git a/go.sum b/go.sum index 4f7cfe0a..51e59130 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M= github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= +github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ= +github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/panjf2000/gnet v1.6.6 h1:P6bApc54hnVcJVgH+SMe41mn47ECCajB6E/dKq27Y0c= github.com/panjf2000/gnet v1.6.6/go.mod h1:KcOU7QsCaCBjeD5kyshBIamG3d9kAQtlob4Y0v0E+sc= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= @@ -228,6 +230,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/server/conn.go b/server/conn.go index 3588b21d..1d0fd692 100644 --- a/server/conn.go +++ b/server/conn.go @@ -156,29 +156,15 @@ func (slf *Conn) IsWebsocket() bool { return slf.server.network == NetworkWebsocket } -// WriteString 向连接中写入字符串 -// - 通过转换为[]byte调用 *Conn.Write -func (slf *Conn) WriteString(data string, messageType ...int) { - slf.Write([]byte(data), messageType...) -} - -// WriteStringWithCallback 与 WriteString 相同,但是会在写入完成后调用 callback -// - 当 callback 为 nil 时,与 WriteString 相同 -func (slf *Conn) WriteStringWithCallback(data string, callback func(err error), messageType ...int) { - slf.WriteWithCallback([]byte(data), callback, messageType...) -} - // Write 向连接中写入数据 // - messageType: websocket模式中指定消息类型 -func (slf *Conn) Write(data []byte, messageType ...int) { +func (slf *Conn) Write(packet Packet) { if slf.packetPool == nil { return } cp := slf.packetPool.Get() - if len(messageType) > 0 { - cp.websocketMessageType = messageType[0] - } - cp.packet = data + cp.websocketMessageType = packet.WebsocketType + cp.packet = packet.Data slf.mutex.Lock() slf.packets = append(slf.packets, cp) slf.mutex.Unlock() @@ -186,15 +172,13 @@ func (slf *Conn) Write(data []byte, messageType ...int) { // WriteWithCallback 与 Write 相同,但是会在写入完成后调用 callback // - 当 callback 为 nil 时,与 Write 相同 -func (slf *Conn) WriteWithCallback(data []byte, callback func(err error), messageType ...int) { +func (slf *Conn) WriteWithCallback(packet Packet, callback func(err error), messageType ...int) { if slf.packetPool == nil { return } cp := slf.packetPool.Get() - if len(messageType) > 0 { - cp.websocketMessageType = messageType[0] - } - cp.packet = data + cp.websocketMessageType = packet.WebsocketType + cp.packet = packet.Data cp.callback = callback slf.mutex.Lock() slf.packets = append(slf.packets, cp) @@ -233,18 +217,8 @@ func (slf *Conn) writeLoop(wait *sync.WaitGroup) { slf.mutex.Unlock() for i := 0; i < len(packets); i++ { data := packets[i] - //if len(data.packet) == 0 { - // for _, packet := range packets { - // slf.packetPool.Release(packet) - // } - // slf.Close() - // return - //} var err error if slf.IsWebsocket() { - if data.websocketMessageType <= 0 { - data.websocketMessageType = slf.server.websocketWriteMessageType - } err = slf.ws.WriteMessage(data.websocketMessageType, data.packet) } else { if slf.gn != nil { diff --git a/server/errors.go b/server/errors.go index 855f40ea..5ddd56a1 100644 --- a/server/errors.go +++ b/server/errors.go @@ -3,22 +3,12 @@ package server import "errors" var ( - ErrConstructed = errors.New("the Server must be constructed using the server.New function") - ErrCanNotSupportNetwork = errors.New("can not support network") - ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte") - ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)") - ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction") - ErrMessageTypeCrossErrorAttrs = errors.New("MessageTypeCross must contain int64(server id) and []byte") - ErrMessageTypeTickerErrorAttrs = errors.New("MessageTypeTicker must contain func()") - ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") - ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") - ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") - ErrWebsocketMessageTypeException = errors.New("unknown message type, will not work") - ErrNotWebsocketUseMessageType = errors.New("message type filtering only supports websocket and does not take effect") - ErrWebsocketIllegalMessageType = errors.New("illegal message type") - ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register") - ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register") - ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server") - ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server") - ErrUnregisteredCrossName = errors.New("unregistered cross name, please use the WithCross option to create the server") + ErrConstructed = errors.New("the Server must be constructed using the server.New function") + ErrCanNotSupportNetwork = errors.New("can not support network") + ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") + ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") + ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") + ErrWebsocketIllegalMessageType = errors.New("illegal message type") + ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server") + ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server") ) diff --git a/server/event.go b/server/event.go index 7d2bc1bb..3c2080d6 100644 --- a/server/event.go +++ b/server/event.go @@ -13,8 +13,7 @@ import ( type StartBeforeEventHandle func(srv *Server) type StartFinishEventHandle func(srv *Server) type StopEventHandle func(srv *Server) -type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte) -type ConnectionReceiveWebsocketPacketEventHandle func(srv *Server, conn *Conn, packet []byte, messageType int) +type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet Packet) type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any) type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId int64, packet []byte) @@ -24,16 +23,15 @@ type ConsoleCommandEventHandle func(srv *Server) type event struct { *Server - startBeforeEventHandles []StartBeforeEventHandle - startFinishEventHandles []StartFinishEventHandle - stopEventHandles []StopEventHandle - connectionReceivePacketEventHandles []ConnectionReceivePacketEventHandle - connectionReceiveWebsocketPacketEventHandles []ConnectionReceiveWebsocketPacketEventHandle - connectionOpenedEventHandles []ConnectionOpenedEventHandle - connectionClosedEventHandles []ConnectionClosedEventHandle - receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle - messageErrorEventHandles []MessageErrorEventHandle - messageLowExecEventHandles []MessageLowExecEventHandle + startBeforeEventHandles []StartBeforeEventHandle + startFinishEventHandles []StartFinishEventHandle + stopEventHandles []StopEventHandle + connectionReceivePacketEventHandles []ConnectionReceivePacketEventHandle + connectionOpenedEventHandles []ConnectionOpenedEventHandle + connectionClosedEventHandles []ConnectionClosedEventHandle + receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle + messageErrorEventHandles []MessageErrorEventHandle + messageLowExecEventHandles []MessageLowExecEventHandle consoleCommandEventHandles map[string][]ConsoleCommandEventHandle @@ -147,34 +145,16 @@ func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacket if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - if slf.network == NetworkWebsocket { - panic(ErrPleaseUseWebsocketHandle) - } slf.connectionReceivePacketEventHandles = append(slf.connectionReceivePacketEventHandles, handle) log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String())) } -func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) { +func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet Packet) { for _, handle := range slf.connectionReceivePacketEventHandles { handle(slf.Server, conn, packet) } } -// RegConnectionReceiveWebsocketPacketEvent 在接收到Websocket数据包时将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionReceiveWebsocketPacketEvent(handle ConnectionReceiveWebsocketPacketEventHandle) { - if slf.network != NetworkWebsocket { - panic(ErrPleaseUseOrdinaryPacketHandle) - } - slf.connectionReceiveWebsocketPacketEventHandles = append(slf.connectionReceiveWebsocketPacketEventHandles, handle) - log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String())) -} - -func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []byte, messageType int) { - for _, handle := range slf.connectionReceiveWebsocketPacketEventHandles { - handle(slf.Server, conn, packet, messageType) - } -} - // RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数 func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle) { slf.receiveCrossPacketEventHandles = append(slf.receiveCrossPacketEventHandles, handle) @@ -215,15 +195,8 @@ func (slf *event) check() { switch slf.network { case NetworkHttp, NetworkGRPC: default: - switch slf.network { - case NetworkWebsocket: - if len(slf.connectionReceiveWebsocketPacketEventHandles) == 0 { - log.Warn("Server", zap.String("ConnectionReceiveWebsocketPacketEvent", "invalid server, no packets processed")) - } - default: - if len(slf.connectionReceivePacketEventHandles) == 0 { - log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) - } + if len(slf.connectionReceivePacketEventHandles) == 0 { + log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) } } diff --git a/server/gnet.go b/server/gnet.go index d42d06b2..73f9d57b 100644 --- a/server/gnet.go +++ b/server/gnet.go @@ -48,7 +48,7 @@ func (slf *gNet) AfterWrite(c gnet.Conn, b []byte) { } func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Action) { - slf.Server.pushMessage(MessageTypePacket, c.Context().(*Conn), bytes.Clone(packet)) + PushPacketMessage(slf.Server, c.Context().(*Conn), append(bytes.Clone(packet), 0)) return nil, gnet.None } diff --git a/server/message.go b/server/message.go index fb2122fc..3222a6cb 100644 --- a/server/message.go +++ b/server/message.go @@ -1,6 +1,8 @@ package server -import "runtime/debug" +import ( + "runtime/debug" +) const ( // MessageTypePacket 数据包消息类型:该类型的数据将被发送到 ConnectionReceivePacketEvent 进行处理 @@ -24,6 +26,7 @@ var messageNames = map[MessageType]string{ MessageTypeError: "MessageTypeError", MessageTypeCross: "MessageTypeCross", MessageTypeTicker: "MessageTypeTicker", + MessageTypeAsync: "MessageTypeAsync", } const ( @@ -57,85 +60,6 @@ func (slf MessageType) String() string { return messageNames[slf] } -func (slf MessageType) deconstructWebSocketPacket(attrs ...any) (conn *Conn, packet []byte, messageType int) { - if len(attrs) != 3 { - panic(ErrWebsocketMessageTypePacketAttrs) - } - var ok bool - if conn, ok = attrs[0].(*Conn); !ok { - panic(ErrWebsocketMessageTypePacketAttrs) - } - if packet, ok = attrs[1].([]byte); !ok { - panic(ErrWebsocketMessageTypePacketAttrs) - } - if messageType, ok = attrs[2].(int); !ok { - panic(ErrWebsocketMessageTypePacketAttrs) - } - return -} - -func (slf MessageType) deconstructPacket(attrs ...any) (conn *Conn, packet []byte) { - if len(attrs) != 2 { - panic(ErrMessageTypePacketAttrs) - } - var ok bool - if conn, ok = attrs[0].(*Conn); !ok { - panic(ErrMessageTypePacketAttrs) - } - if packet, ok = attrs[1].([]byte); !ok { - panic(ErrMessageTypePacketAttrs) - } - return -} - -func (slf MessageType) deconstructError(attrs ...any) (err error, action MessageErrorAction, stack string) { - if len(attrs) != 3 { - panic(ErrMessageTypeErrorAttrs) - } - var ok bool - if err, ok = attrs[0].(error); !ok { - panic(ErrMessageTypeErrorAttrs) - } - if action, ok = attrs[1].(MessageErrorAction); !ok { - panic(ErrMessageTypeErrorAttrs) - } - stack = attrs[2].(string) - return -} - -func (slf MessageType) deconstructCross(attrs ...any) (serverId int64, packet []byte) { - if len(attrs) != 2 { - panic(ErrMessageTypeCrossErrorAttrs) - } - var ok bool - if serverId, ok = attrs[0].(int64); !ok { - panic(ErrMessageTypeCrossErrorAttrs) - } - if packet, ok = attrs[1].([]byte); !ok { - panic(ErrMessageTypeCrossErrorAttrs) - } - return -} - -func (slf MessageType) deconstructTicker(attrs ...any) (caller func()) { - if len(attrs) != 1 { - panic(ErrMessageTypeTickerErrorAttrs) - } - var ok bool - if caller, ok = attrs[0].(func()); !ok { - panic(ErrMessageTypeTickerErrorAttrs) - } - return -} - -// PushWebsocketPacketMessage 向特定服务器中推送 WebsocketPacket 消息 -func PushWebsocketPacketMessage(srv *Server, conn *Conn, packet []byte, messageType int) { - msg := srv.messagePool.Get() - msg.t = MessageTypePacket - msg.attrs = []any{conn, packet, messageType} - srv.pushMessage(msg) -} - // PushPacketMessage 向特定服务器中推送 Packet 消息 func PushPacketMessage(srv *Server, conn *Conn, packet []byte) { msg := srv.messagePool.Get() @@ -154,15 +78,27 @@ func PushErrorMessage(srv *Server, err error, action MessageErrorAction) { // PushCrossMessage 向特定服务器中推送 Cross 消息 func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []byte) { - if len(srv.cross) == 0 { - return - } - _, exist := srv.cross[crossName] - if !exist { - return + if serverId == srv.id { + msg := srv.messagePool.Get() + msg.t = MessageTypeCross + msg.attrs = []any{serverId, packet} + srv.pushMessage(msg) + } else { + if len(srv.cross) == 0 { + return + } + cross, exist := srv.cross[crossName] + if !exist { + return + } + _ = cross.PushMessage(serverId, packet) } +} + +// PushTickerMessage 向特定服务器中推送 Ticker 消息 +func PushTickerMessage(srv *Server, caller func()) { msg := srv.messagePool.Get() - msg.t = MessageTypeCross - msg.attrs = []any{serverId, packet} + msg.t = MessageTypeTicker + msg.attrs = []any{caller} srv.pushMessage(msg) } diff --git a/server/options.go b/server/options.go index c014c17e..49d3b240 100644 --- a/server/options.go +++ b/server/options.go @@ -45,7 +45,7 @@ func WithTicker(size int, autonomy bool) Option { srv.ticker = timer.GetTicker(size) } else { srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) { - srv.pushMessage(MessageTypeTicker, caller) + PushTickerMessage(srv, caller) })) } } @@ -64,7 +64,10 @@ func WithCross(crossName string, serverId int64, cross Cross) Option { } srv.cross[crossName] = cross err := cross.Init(srv, func(serverId int64, packet []byte) { - srv.pushMessage(MessageTypeCross, serverId, packet) + msg := srv.messagePool.Get() + msg.t = MessageTypeCross + msg.attrs = []any{serverId, packet} + srv.pushMessage(msg) }) if err != nil { log.Info("Cross", zap.Int64("ServerID", serverId), zap.String("Cross", reflect.TypeOf(cross).String()), zap.String("State", "WaitNatsRun")) @@ -105,24 +108,10 @@ func WithProd() Option { } } -// WithWebsocketWriteMessageType 设置客户端写入的Websocket消息类型 -// - 默认: WebsocketMessageTypeBinary -func WithWebsocketWriteMessageType(messageType int) Option { - return func(srv *Server) { - switch messageType { - case WebsocketMessageTypeText, WebsocketMessageTypeBinary, WebsocketMessageTypeClose, WebsocketMessageTypePing, WebsocketMessageTypePong: - srv.websocketWriteMessageType = messageType - default: - log.Warn("WithWebsocketWriteMessageType", zap.Int("MessageType", messageType), zap.Error(ErrWebsocketMessageTypeException)) - } - } -} - // WithWebsocketMessageType 设置仅支持特定类型的Websocket消息 func WithWebsocketMessageType(messageTypes ...int) Option { return func(srv *Server) { if srv.network != NetworkWebsocket { - log.Warn("WitchWebsocketMessageType", zap.String("Network", string(srv.network)), zap.Error(ErrNotWebsocketUseMessageType)) return } var supports = make(map[int]bool) @@ -130,8 +119,6 @@ func WithWebsocketMessageType(messageTypes ...int) Option { switch messageType { case WebsocketMessageTypeText, WebsocketMessageTypeBinary, WebsocketMessageTypeClose, WebsocketMessageTypePing, WebsocketMessageTypePong: supports[messageType] = true - default: - log.Warn("WitchWebsocketMessageType", zap.Int("MessageType", messageType), zap.Error(ErrWebsocketMessageTypeException)) } } srv.supportMessageTypes = supports diff --git a/server/packet.go b/server/packet.go new file mode 100644 index 00000000..b863c0ff --- /dev/null +++ b/server/packet.go @@ -0,0 +1,10 @@ +package server + +type Packet struct { + WebsocketType int + Data []byte +} + +func (slf Packet) String() string { + return string(slf.Data) +} diff --git a/server/server.go b/server/server.go index bfd934fd..8a97e1d0 100644 --- a/server/server.go +++ b/server/server.go @@ -28,11 +28,10 @@ import ( // New 根据特定网络类型创建一个服务器 func New(network Network, options ...Option) *Server { server := &Server{ - event: &event{}, - network: network, - options: options, - closeChannel: make(chan struct{}, 1), - websocketWriteMessageType: WebsocketMessageTypeBinary, + event: &event{}, + network: network, + options: options, + closeChannel: make(chan struct{}, 1), } server.event.Server = server @@ -71,14 +70,13 @@ type Server struct { isShutdown atomic.Bool // 是否已关闭 closeChannel chan struct{} // 关闭信号 - gServer *gNet // TCP或UDP模式下的服务器 - messagePool *synchronization.Pool[*Message] // 消息池 - messagePoolSize int // 消息池大小 - messageChannel chan *Message // 消息管道 - multiple *MultipleServer // 多服务器模式下的服务器 - prod bool // 是否为生产模式 - websocketWriteMessageType int // websocket写入的消息类型 - ticker *timer.Ticker // 定时器 + gServer *gNet // TCP或UDP模式下的服务器 + messagePool *synchronization.Pool[*Message] // 消息池 + messagePoolSize int // 消息池大小 + messageChannel chan *Message // 消息管道 + multiple *MultipleServer // 多服务器模式下的服务器 + prod bool // 是否为生产模式 + ticker *timer.Ticker // 定时器 multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 @@ -274,7 +272,7 @@ func (slf *Server) Run(addr string) error { if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] { panic(ErrWebsocketIllegalMessageType) } - PushWebsocketPacketMessage(slf, conn, packet, messageType) + PushPacketMessage(slf, conn, append(packet, byte(messageType))) } }) go func() { @@ -455,17 +453,14 @@ func (slf *Server) dispatchMessage(msg *Message) { slf.messagePool.Release(msg) } }() + var attrs = msg.attrs switch msg.t { case MessageTypePacket: - if slf.network == NetworkWebsocket { - conn, packet, messageType := msg.t.deconstructWebSocketPacket(msg.attrs...) - slf.OnConnectionReceiveWebsocketPacketEvent(conn, packet, messageType) - } else { - conn, packet := msg.t.deconstructPacket(msg.attrs...) - slf.OnConnectionReceivePacketEvent(conn, packet) - } + var packet = attrs[1].([]byte) + var wst = int(packet[len(packet)-1]) + slf.OnConnectionReceivePacketEvent(attrs[0].(*Conn), Packet{Data: packet[:len(packet)-1], WebsocketType: wst}) case MessageTypeError: - err, action, stack := msg.t.deconstructError(msg.attrs...) + err, action, stack := attrs[0].(error), attrs[1].(MessageErrorAction), attrs[2].(string) switch action { case MessageErrorActionNone: log.ErrorWithStack("Server", stack, zap.Error(err)) @@ -475,11 +470,11 @@ func (slf *Server) dispatchMessage(msg *Message) { log.Warn("Server", zap.String("not support message error action", action.String())) } case MessageTypeCross: - serverId, packet := msg.t.deconstructCross(msg.attrs...) - slf.OnReceiveCrossPacketEvent(serverId, packet) + slf.OnReceiveCrossPacketEvent(attrs[0].(int64), attrs[1].([]byte)) case MessageTypeTicker: - caller := msg.t.deconstructTicker(msg.attrs...) - caller() + attrs[0].(func())() + case MessageTypeAsync: + default: log.Warn("Server", zap.String("not support message type", msg.t.String())) } diff --git a/utils/str/str.go b/utils/str/str.go index ddf83ee2..f4332683 100644 --- a/utils/str/str.go +++ b/utils/str/str.go @@ -1,5 +1,9 @@ package str +const ( + None = "" // 空字符串 +) + // FirstUpper 首字母大写 func FirstUpper(str string) string { var upperStr string