forked from davyxu/cellnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
session.go
162 lines (116 loc) · 2.7 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package tcp
import (
"github.com/davyxu/cellnet"
"github.com/davyxu/cellnet/peer"
"github.com/davyxu/cellnet/util"
"net"
"sync"
)
// Socket会话
type tcpSession struct {
peer.CoreContextSet
peer.CoreSessionIdentify
*peer.CoreProcBundle
pInterface cellnet.Peer
// Socket原始连接
conn net.Conn
// 退出同步器
exitSync sync.WaitGroup
// 发送队列
sendQueue *peer.MsgQueue
cleanupGuard sync.Mutex
endNotify func()
}
func (self *tcpSession) Peer() cellnet.Peer {
return self.pInterface
}
// 取原始连接
func (self *tcpSession) Raw() interface{} {
return self.conn
}
func (self *tcpSession) Close() {
self.Send(nil)
}
// 发送封包
func (self *tcpSession) Send(msg interface{}) {
self.sendQueue.Add(msg)
}
// 接收循环
func (self *tcpSession) recvLoop() {
for self.conn != nil {
msg, err := self.ReadMessage(self)
if err != nil {
if !util.IsEOFOrNetReadError(err) {
log.Errorf("session closed, sesid: %d, err: %s", self.ID(), err)
}
self.Close()
self.PostEvent(&cellnet.RecvMsgEvent{self, &cellnet.SessionClosed{}})
break
}
self.PostEvent(&cellnet.RecvMsgEvent{self, msg})
}
self.cleanup()
}
// 发送循环
func (self *tcpSession) sendLoop() {
var writeList []interface{}
for {
writeList = writeList[0:0]
exit := self.sendQueue.Pick(&writeList)
// 遍历要发送的数据
for _, msg := range writeList {
// TODO SendMsgEvent并不是很有意义
self.SendMessage(&cellnet.SendMsgEvent{self, msg})
}
if exit {
break
}
}
self.cleanup()
}
// 清理资源
func (self *tcpSession) cleanup() {
self.cleanupGuard.Lock()
defer self.cleanupGuard.Unlock()
// 关闭连接
if self.conn != nil {
self.conn.Close()
self.conn = nil
}
// 通知完成
self.exitSync.Done()
}
// 启动会话的各种资源
func (self *tcpSession) Start() {
// connector复用session时,上一次发送队列未释放可能造成问题
self.sendQueue.Reset()
// 将会话添加到管理器
self.Peer().(peer.SessionManager).Add(self)
// 需要接收和发送线程同时完成时才算真正的完成
self.exitSync.Add(2)
go func() {
// 等待2个任务结束
self.exitSync.Wait()
// 将会话从管理器移除
self.Peer().(peer.SessionManager).Remove(self)
if self.endNotify != nil {
self.endNotify()
}
}()
// 启动并发接收goroutine
go self.recvLoop()
// 启动并发发送goroutine
go self.sendLoop()
}
func newSession(conn net.Conn, p cellnet.Peer, endNotify func()) *tcpSession {
self := &tcpSession{
conn: conn,
endNotify: endNotify,
sendQueue: peer.NewMsgQueue(),
pInterface: p,
CoreProcBundle: p.(interface {
GetBundle() *peer.CoreProcBundle
}).GetBundle(),
}
return self
}