forked from davyxu/cellnet
/
session.go
154 lines (112 loc) · 2.56 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package gorillaws
import (
"github.com/davyxu/cellnet"
"github.com/davyxu/cellnet/peer"
"github.com/davyxu/cellnet/util"
"github.com/gorilla/websocket"
"sync"
)
// Socket会话
type wsSession struct {
peer.CoreContextSet
peer.CoreSessionIdentify
*peer.CoreProcBundle
pInterface cellnet.Peer
conn *websocket.Conn
// 退出同步器
exitSync sync.WaitGroup
// 发送队列
sendQueue *cellnet.Pipe
cleanupGuard sync.Mutex
endNotify func()
}
func (self *wsSession) Peer() cellnet.Peer {
return self.pInterface
}
// 取原始连接
func (self *wsSession) Raw() interface{} {
if self.conn == nil {
return nil
}
return self.conn
}
func (self *wsSession) Close() {
self.sendQueue.Add(nil)
}
// 发送封包
func (self *wsSession) Send(msg interface{}) {
self.sendQueue.Add(msg)
}
// 接收循环
func (self *wsSession) recvLoop() {
for self.conn != nil {
msg, err := self.ReadMessage(self)
if err != nil {
log.Debugln(err)
if !util.IsEOFOrNetReadError(err) {
log.Errorln("session closed:", err)
}
self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: &cellnet.SessionClosed{}})
break
}
self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: msg})
}
self.Close()
// 通知完成
self.exitSync.Done()
}
// 发送循环
func (self *wsSession) sendLoop() {
var writeList []interface{}
for {
writeList = writeList[0:0]
exit := self.sendQueue.Pick(&writeList)
// 遍历要发送的数据
for _, msg := range writeList {
// TODO SendMsgEvent并不是很有意义
self.SendMessage(&cellnet.SendMsgEvent{Ses: self, Msg: msg})
}
if exit {
break
}
}
// 关闭连接
if self.conn != nil {
self.conn.Close()
self.conn = nil
}
// 通知完成
self.exitSync.Done()
}
// 启动会话的各种资源
func (self *wsSession) Start() {
// 将会话添加到管理器
self.Peer().(peer.SessionManager).Add(self)
// 需要接收和发送线程同时完成时才算真正的完成
self.exitSync.Add(2)
go func() {
// 等待2个任务结束
self.exitSync.Wait()
// 将会话从管理器移除
self.Peer().(peer.SessionManager).Remove(self)
if self.endNotify != nil {
self.endNotify()
}
}()
// 启动并发接收goroutine
go self.recvLoop()
// 启动并发发送goroutine
go self.sendLoop()
}
func newSession(conn *websocket.Conn, p cellnet.Peer, endNotify func()) *wsSession {
self := &wsSession{
conn: conn,
endNotify: endNotify,
sendQueue: cellnet.NewPipe(),
pInterface: p,
CoreProcBundle: p.(interface {
GetBundle() *peer.CoreProcBundle
}).GetBundle(),
}
return self
}