-
Notifications
You must be signed in to change notification settings - Fork 7
/
server.go
319 lines (286 loc) · 9.23 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
/*
Package server micserver中管理与其他服务器连接的管理器
*/
package server
import (
"github.com/liasece/micserver/conf"
"github.com/liasece/micserver/connect"
"github.com/liasece/micserver/log"
"github.com/liasece/micserver/msg"
serverbase "github.com/liasece/micserver/server/base"
"github.com/liasece/micserver/server/gate"
gatebase "github.com/liasece/micserver/server/gate/base"
"github.com/liasece/micserver/server/subnet"
"github.com/liasece/micserver/servercomm"
"github.com/liasece/micserver/session"
"github.com/liasece/micserver/util"
)
// Server 一个Module就是一个Server
type Server struct {
*log.Logger
// event libs
ROCServer
serverCmdHandler serverCmdHandler
clientEventHandler clientEventHandler
subnetManager *subnet.Manager
gateBase *gate.Base
sessionManager session.Manager
// server info
moduleid string
moduleConfig *conf.ModuleConfig
isStop bool
stopChan chan bool
}
// Init 初始化本服务
func (s *Server) Init(moduleid string) {
s.moduleid = moduleid
s.stopChan = make(chan bool)
s.ROCServer.Init(s)
}
// InitSubnet 初始化本服务的子网管理器
func (s *Server) InitSubnet(conf *conf.ModuleConfig) {
s.moduleConfig = conf
// 初始化服务器网络管理器
if s.subnetManager == nil {
s.subnetManager = &subnet.Manager{}
}
s.serverCmdHandler.server = s
s.subnetManager.Logger = s.Logger.Clone()
s.subnetManager.Init(conf)
s.subnetManager.HookSubnet(&s.serverCmdHandler)
}
// HookServer 设置本服务的服务事件监听者
func (s *Server) HookServer(serverHook serverbase.ServerHook) {
s.serverCmdHandler.HookServer(serverHook)
}
// HookGate 设置本服务的网关事件监听者,如果本服务没有启用网关,将不会收到任何事件
func (s *Server) HookGate(gateHook gatebase.GateHook) {
s.clientEventHandler.HookGate(gateHook)
}
// BindSubnet 尝试连接本服务子网中的其他服务器
func (s *Server) BindSubnet(subnetAddrMap map[string]string) {
for k, addr := range subnetAddrMap {
if k != s.moduleid {
s.subnetManager.TryConnectServer(k, addr)
}
}
}
// InitGate 初始化本服务的网关部分
func (s *Server) InitGate(gateaddr string) {
s.gateBase = &gate.Base{
Logger: s.Logger,
}
s.clientEventHandler.server = s
s.gateBase.Init(s.moduleid)
s.gateBase.BindOuterTCP(gateaddr)
// 事件监听
s.gateBase.HookGate(&s.clientEventHandler)
}
// SetLogger 设置本服务的Logger
func (s *Server) SetLogger(source *log.Logger) {
if source == nil {
s.Logger = nil
return
}
s.Logger = source.Clone()
}
// GetClient 获取一个客户端连接
func (s *Server) GetClient(tmpid string) *connect.Client {
if s.gateBase != nil {
return s.gateBase.GetClient(tmpid)
}
return nil
}
// RangeClient 获取一个客户端连接
func (s *Server) RangeClient(
f func(tmpid string, client *connect.Client) bool) {
if s.gateBase != nil {
s.gateBase.Range(f)
}
}
// onServerJoinSubnet 当一个服务器成功加入网络时调用
func (s *Server) onServerJoinSubnet(server *connect.Server) {
s.Debug("[Server.onServerJoinSubnet] Server joined the subnet successfully", log.String("ModuleID", server.ModuleInfo.ModuleID))
s.ROCServer.onServerJoinSubnet(server)
}
// SendModuleMsg 发送一个服务器消息到另一个服务器
func (s *Server) SendModuleMsg(
to string, msgstr msg.IMsgStruct) {
conn := s.subnetManager.GetServer(to)
if conn != nil {
conn.SendCmd(s.getModuleMsgPack(msgstr, conn))
}
}
// SInnerCloseSessionConnect 断开一个客户端连接,仅框架内使用
func (s *Server) SInnerCloseSessionConnect(gateid string, connectid string) {
s.ReqCloseConnect(gateid, connectid)
}
// ReqCloseConnect 请求关闭远程瞪的目标客户端连接
func (s *Server) ReqCloseConnect(gateid string, connectid string) {
if s.moduleid == gateid {
s.doCloseConnect(connectid)
} else {
// 向gate请求
conn := s.subnetManager.GetServer(gateid)
if conn != nil {
msg := &servercomm.SReqCloseConnect{
FromModuleID: s.moduleid,
ToModuleID: gateid,
ClientConnID: connectid,
}
conn.SendCmd(msg)
} else {
s.Error("[Server.ReqCloseConnect] Target module does not exist", log.String("GateID", gateid))
}
}
}
// doCloseConnect 关闭本地的目标客户端连接
func (s *Server) doCloseConnect(connectid string) {
if s.gateBase == nil {
s.Error("Server.doCloseConnect s module isn't gate")
return
}
client := s.gateBase.GetClient(connectid)
if client == nil {
s.Error("[Server.doCloseConnect] client does not exist", log.String("ConnectID", connectid))
return
}
client.Terminate()
}
// SInnerSendModuleMsg 发送一个服务器消息到另一个服务器,仅框架内使用
func (s *Server) SInnerSendModuleMsg(to string, msgstr msg.IMsgStruct) {
conn := s.subnetManager.GetServer(to)
if conn != nil {
conn.SendCmd(msgstr)
} else {
s.Error("[Server.SInnerSendServerMsg] conn == nil", log.String("To", to))
}
}
// SInnerSendClientMsg 发送一个服务器消息到另一个服务器,仅框架内使用
func (s *Server) SInnerSendClientMsg(gateid string, connectid string, msgid uint16, data []byte) {
s.SendBytesToClient(gateid, connectid, msgid, data)
}
// ForwardClientMsgToModule 转发一个客户端消息到另一个服务器
func (s *Server) ForwardClientMsgToModule(fromconn *connect.Client, to string, msgid uint16, data []byte) {
conn := s.subnetManager.GetServer(to)
if conn != nil {
conn.SendCmd(s.getFarwardFromGateMsgPack(msgid, data, fromconn, conn))
} else {
s.Error("[Server.ForwardClientMsgToModule] conn == nil", log.String("To", to))
}
}
// BroadcastModuleCmd 广播一个消息到连接到本服务器的所有服务器
func (s *Server) BroadcastModuleCmd(msgstr msg.IMsgStruct) {
s.subnetManager.BroadcastCmd(s.getModuleMsgPack(msgstr, nil))
}
// GetBalanceModuleID 获取一个均衡的负载服务器
func (s *Server) GetBalanceModuleID(moduletype string) string {
server := s.subnetManager.GetRandomServer(moduletype)
if server != nil {
return server.GetTempID()
}
return ""
}
// DeleteSession 删除本地维护的 session
func (s *Server) DeleteSession(uuid string) {
s.sessionManager.DeleteSession(uuid)
}
// GetSession 获取本地维护的 session
func (s *Server) GetSession(uuid string) *session.Session {
return s.sessionManager.GetSession(uuid)
}
// MustUpdateSessionFromMap 更新本地的Session,如果没有的话注册它
func (s *Server) MustUpdateSessionFromMap(uuid string, data map[string]string) {
se := s.server.sessionManager.GetSession(uuid)
if se == nil {
se = &session.Session{}
s.server.sessionManager.UpdateSessionUUID(uuid, se)
}
s.server.sessionManager.MustUpdateFromMap(se, data)
s.server.Syslog("[Server.MustUpdateSessionFromMap] Session Manager Update", log.Reflect("Data", data))
}
// UpdateSessionUUID 更新目标Session的UUID
func (s *Server) UpdateSessionUUID(uuid string, session *session.Session) {
s.server.sessionManager.UpdateSessionUUID(uuid, session)
}
// SendBytesToClient 发送一个消息到客户端
func (s *Server) SendBytesToClient(gateid string, to string, msgid uint16, data []byte) error {
sec := false
if s.moduleid == gateid {
if s.DoSendBytesToClient(
s.moduleid, gateid, to, msgid, data) == nil {
sec = true
}
} else {
conn := s.subnetManager.GetServer(gateid)
if conn != nil {
forward := &servercomm.SForwardToClient{}
forward.FromModuleID = s.moduleid
forward.MsgID = msgid
forward.ToClientID = to
forward.ToGateID = gateid
forward.Data = make([]byte, len(data))
copy(forward.Data, data)
conn.SendCmd(forward)
sec = true
} else {
s.Error("[Server.SendBytesToClient] Target server connection does not exist", log.String("GateID", gateid))
}
}
if !sec {
return ErrTargetClientDontExist
}
return nil
}
// DoSendBytesToClient 发送一个消息到连接到本服务器的客户端
func (s *Server) DoSendBytesToClient(fromserver string, gateid string, to string, msgid uint16, data []byte) error {
sec := false
if s.gateBase != nil {
conn := s.gateBase.GetClient(to)
if conn != nil {
if fromserver != gateid {
conn.Session.SetBind(util.GetModuleIDType(fromserver), fromserver)
}
conn.SendBytes(msgid, data)
sec = true
}
}
if !sec {
return ErrTargetClientDontExist
}
return nil
}
// getModuleMsgPack 获取一个服务器消息的服务器间转发协议
func (s *Server) getModuleMsgPack(msgstr msg.IMsgStruct, tarconn *connect.Server) msg.IMsgStruct {
res := &servercomm.SForwardToModule{}
res.FromModuleID = s.moduleid
if tarconn != nil {
res.ToModuleID = tarconn.ModuleInfo.ModuleID
}
res.MsgID = msgstr.GetMsgId()
size := msgstr.GetSize()
res.Data = make([]byte, size)
msgstr.WriteBinary(res.Data)
return res
}
// getFarwardFromGateMsgPack 获取一个客户端消息到其他服务器间的转发协议
func (s *Server) getFarwardFromGateMsgPack(msgid uint16, data []byte, fromconn *connect.Client, tarconn *connect.Server) msg.IMsgStruct {
res := &servercomm.SForwardFromGate{}
res.FromModuleID = s.moduleid
if tarconn != nil {
res.ToModuleID = tarconn.ModuleInfo.ModuleID
}
if fromconn != nil {
res.ClientConnID = fromconn.GetConnectID()
res.Session = fromconn.ToMap()
}
res.MsgID = msgid
size := len(data)
res.Data = make([]byte, size)
copy(res.Data, data)
return res
}
// Stop stop server
func (s *Server) Stop() {
s.isStop = true
}