/
transmitter.go
165 lines (127 loc) · 3.2 KB
/
transmitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package frontend
import (
"core/codec"
"core/xlib"
// "core/xnet/tcp"
"github.com/gorilla/websocket"
"encoding/binary"
"io"
"net"
)
type socketOpt interface {
MaxPacketSize() int
ApplySocketReadTimeout(conn net.Conn, callback func())
ApplySocketWriteTimeout(conn net.Conn, callback func())
}
type DirectTCPTransmitter struct {
}
// 来自客户端的消息
func (DirectTCPTransmitter) OnRecvMessage(ses lib.Session) (msg interface{}, err error) {
reader, ok := ses.Raw().(io.Reader)
// 转换错误,或者连接已经关闭时退出
if !ok || reader == nil {
return nil, nil
}
opt := ses.GetPeer().Prop()
if conn, ok := ses.Raw().(net.Conn); ok {
for {
// 有读超时时,设置超时
opt.ApplySocketReadTimeout(conn, func() {
var msgId int32
var msgData []byte
// 接收来自客户端的封包
msgId, msgData, err = RecvLTVPacketData(reader, opt.MaxPacketSize())
// 尝试透传到后台或者解析
if err == nil {
msg, err = ProcFrontendPacket(msgId, msgData, ses)
}
})
// 有错退出
if err != nil {
break
}
// msg=nil时,透传了客户端的封包到后台, 不用传给下一个proc, 继续重新读取下一个包
}
}
return
}
// 网关发往客户端的消息
func (DirectTCPTransmitter) OnSendMessage(ses lib.Session, msg interface{}) (err error) {
writer, ok := ses.Raw().(io.Writer)
// 转换错误,或者连接已经关闭时退出
if !ok || writer == nil {
return nil
}
opt := ses.GetPeer().Prop()
// 有写超时时,设置超时
opt.ApplySocketWriteTimeout(writer.(net.Conn), func() {
err = lib.SendLTVPacket(writer, msg)
})
return
}
const (
MsgIdSize = 2 // uint16
)
type DirectWSMessageTransmitter struct {
}
func (DirectWSMessageTransmitter) OnRecvMessage(ses lib.Session) (msg interface{}, err error) {
conn, ok := ses.Raw().(*websocket.Conn)
// 转换错误,或者连接已经关闭时退出
if !ok || conn == nil {
return nil, nil
}
var (
messageType int
raw []byte
)
for {
messageType, raw, err = conn.ReadMessage()
if err != nil {
break
}
switch messageType {
case websocket.BinaryMessage:
msgId := int32(binary.BigEndian.Uint16(raw))
msgData := raw[MsgIdSize:]
// 尝试透传到后台或者解析
if err == nil {
msg, err = ProcFrontendPacket(msgId, msgData, ses)
}
}
if err != nil {
break
}
}
return
}
func (DirectWSMessageTransmitter) OnSendMessage(ses lib.Session, msg interface{}) error {
conn, ok := ses.Raw().(*websocket.Conn)
// 转换错误,或者连接已经关闭时退出
if !ok || conn == nil {
return nil
}
var (
msgData []byte
msgId int32
)
switch m := msg.(type) {
case *codec.RawPacket: // 发裸包
msgData = m.MsgData
msgId = m.MsgId
default: // 发普通编码包
var err error
var meta *codec.MessageMeta
// 将用户数据转换为字节数组和消息Id
msgData, meta, err = codec.EncodeMessage(msg)
if err != nil {
return err
}
msgId = meta.Id
}
pkt := make([]byte, MsgIdSize+len(msgData))
binary.BigEndian.PutUint16(pkt, uint16(msgId))
copy(pkt[MsgIdSize:], msgData)
// fmt.Println("msg send raw:", msgId, pkt)
conn.WriteMessage(websocket.BinaryMessage, pkt)
return nil
}