-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
280 lines (232 loc) · 7.23 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
// @Title
// @Description
// @Author Wangwengang 2021/8/17 下午5:08
// @Update Wangwengang 2021/8/17 下午5:08
package impl
import (
"context"
"fmt"
"net"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/wwengg/proto/identity"
"go.uber.org/zap"
"github.com/wwengg/arsenal/anet"
"github.com/wwengg/arsenal/config"
"github.com/wwengg/arsenal/logger"
"github.com/wwengg/arsenal/sdk/rpcx"
)
//Server 接口实现,定义一个Server服务类
type Server struct {
//服务器的名称
Name string
//tcp4 or other
IPVersion string
//服务绑定的IP地址
IP string
//服务绑定的端口
Port int
// Websocket Addr
WsAddr string
//当前Server的消息管理模块,用来绑定MsgID和对应的处理方法
msgHandler anet.MsgHandle
//当前Server的链接管理器
ConnMgr anet.ConnManager
//该Server的连接创建时Hook函数
OnConnStart func(conn anet.Connection)
//该Server的连接断开时的Hook函数
OnConnStop func(conn anet.Connection)
packet anet.Packet
identityClient *identity.IdentityClient
}
//NewServer 创建一个服务器句柄
func NewServer(opts ...Option) anet.Server {
//printLogo()
rpcx.RpcxClientsObj.SetupServiceDiscovery()
xclient, err := rpcx.RpcxClientsObj.GetXClient("Identity")
if err != nil {
logger.ZapLog.Error("Identity service not found")
}
s := &Server{
Name: config.ConfigHub.TcpConfig.Name,
IPVersion: "tcp4",
IP: config.ConfigHub.TcpConfig.Ip,
Port: config.ConfigHub.TcpConfig.TcpPort,
WsAddr: config.ConfigHub.Websocket.Addr,
msgHandler: NewMsgHandle(),
ConnMgr: NewConnManager(),
packet: NewDataPack(),
identityClient: identity.NewIdentityClient(xclient),
}
for _, opt := range opts {
opt(s)
}
return s
}
func (s *Server) GenID() uint64 {
reply, err := s.identityClient.GetId(context.Background(), nil)
if err != nil {
return 0
}
return uint64(reply.Id)
}
//============== 实现 anet.Server 里的全部接口方法 ========
//Start 开启Tcp网络服务
func (s *Server) StartTcp() {
fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)
//开启一个go去做服务端Linster业务
go func() {
//0 启动worker工作池机制
s.msgHandler.StartWorkerPool()
//1 获取一个TCP的Addr
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Println("resolve tcp addr err: ", err)
return
}
//2 监听服务器地址
listener, err := net.ListenTCP(s.IPVersion, addr)
if err != nil {
panic(err)
}
//已经监听成功
fmt.Println("start Zinx server ", s.Name, " succ, now listenning...")
//TODO server.go 应该有一个自动生成ID的方法
//3 启动server网络连接业务
for {
//3.1 阻塞等待客户端建立连接请求
conn, err := listener.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}
fmt.Println("Get conn remote addr = ", conn.RemoteAddr().String())
//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
if s.ConnMgr.Len() >= 100 {
conn.Close()
continue
}
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewConnection(s, conn, s.GenID(), s.msgHandler)
//3.4 启动当前链接的处理业务
go dealConn.Start()
}
}()
}
//Start Websocket网络服务
func (s *Server) StartWebsocket() {
logger.ZapLog.Info("Start Websocket server", zap.String("addr", s.WsAddr))
httpServer := &http.Server{
Addr: s.WsAddr,
Handler: &WsHandler{upgrader: websocket.Upgrader{
HandshakeTimeout: 0,
ReadBufferSize: 0,
WriteBufferSize: 0,
WriteBufferPool: nil,
Subprotocols: nil,
Error: nil,
CheckOrigin: func(r *http.Request) bool {
return true
},
EnableCompression: false,
},
sv: s},
ReadTimeout: time.Second * time.Duration(config.ConfigHub.Websocket.ConnReadTimeout),
WriteTimeout: time.Second * time.Duration(config.ConfigHub.Websocket.ConnWriteTimeout),
MaxHeaderBytes: config.ConfigHub.Websocket.MaxHeaderLen,
}
httpServer.ListenAndServe()
}
type WsHandler struct {
sv *Server
upgrader websocket.Upgrader
}
func (h *WsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Method not allowed", 405)
return
}
conn, err := h.upgrader.Upgrade(w, r, nil)
if err != nil {
logger.ZapLog.Error("upgrade error", zap.Error(err))
return
}
//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
if h.sv.ConnMgr.Len() >= 100 {
conn.Close()
}
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewWsConnection(h.sv, conn, h.sv.GenID(), h.sv.msgHandler)
//3.4 启动当前链接的处理业务
go dealConn.Start()
}
func (s *Server) httpServe() {
}
//Stop 停止服务
func (s *Server) Stop() {
fmt.Println("[STOP] server , name ", s.Name)
//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
s.ConnMgr.ClearConn()
}
//Serve 运行服务
func (s *Server) Serve() {
s.StartTcp() // 默认开启
if config.ConfigHub.Websocket.Enable {
s.StartWebsocket()
}
// TODO quic、kcp的支持
//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加
//阻塞,否则主Go退出, listenner的go将会退出
select {}
}
//AddRouter 路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
func (s *Server) AddRouter(msgID uint32, router anet.Router) {
s.msgHandler.AddRouter(msgID, router)
}
//AddRouter 路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
func (s *Server) SetRpcxRouter(router anet.RpcxRouter) {
s.msgHandler.SetRpcxRouter(router)
}
//GetConnMgr 得到链接管理
func (s *Server) GetConnMgr() anet.ConnManager {
return s.ConnMgr
}
//SetOnConnStart 设置该Server的连接创建时Hook函数
func (s *Server) SetOnConnStart(hookFunc func(anet.Connection)) {
s.OnConnStart = hookFunc
}
//SetOnConnStop 设置该Server的连接断开时的Hook函数
func (s *Server) SetOnConnStop(hookFunc func(anet.Connection)) {
s.OnConnStop = hookFunc
}
//CallOnConnStart 调用连接OnConnStart Hook函数
func (s *Server) CallOnConnStart(conn anet.Connection) {
if s.OnConnStart != nil {
fmt.Println("---> CallOnConnStart....")
s.OnConnStart(conn)
}
}
//CallOnConnStop 调用连接OnConnStop Hook函数
func (s *Server) CallOnConnStop(conn anet.Connection) {
if s.OnConnStop != nil {
fmt.Println("---> CallOnConnStop....")
s.OnConnStop(conn)
}
}
func (s *Server) Packet() anet.Packet {
return s.packet
}
func printLogo() {
//fmt.Println(zinxLogo)
//fmt.Println(topLine)
//fmt.Println(fmt.Sprintf("%s [Github] https://github.com/aceld %s", borderLine, borderLine))
//fmt.Println(fmt.Sprintf("%s [tutorial] https://www.kancloud.cn/aceld/zinx %s", borderLine, borderLine))
//fmt.Println(bottomLine)
//fmt.Printf("[Zinx] Version: %s, MaxConn: %d, MaxPacketSize: %d\n",
// utils.GlobalObject.Version,
// utils.GlobalObject.MaxConn,
// utils.GlobalObject.MaxPacketSize)
}
func init() {
}