-
Notifications
You must be signed in to change notification settings - Fork 2
/
queue.go
148 lines (112 loc) · 2.4 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package lib
import (
"core/logs"
"runtime/debug"
"sync"
)
type EventQueue interface {
StartLoop() EventQueue
StopLoop() EventQueue
Wait()
Post(callback func())
EnableCapturePanic(v bool)
}
type CapturePanicNotifyFunc func(interface{}, EventQueue)
type eventQueue struct {
*Pipe
endSignal sync.WaitGroup
capturePanic bool
onPanic CapturePanicNotifyFunc
}
// 启动崩溃捕获
func (self *eventQueue) EnableCapturePanic(v bool) {
self.capturePanic = v
}
// 设置捕获崩溃通知
func (self *eventQueue) SetCapturePanicNotify(callback CapturePanicNotifyFunc) {
self.onPanic = callback
}
// 派发事件处理回调到队列中
func (self *eventQueue) Post(callback func()) {
if callback == nil {
return
}
self.Add(callback)
}
// 保护调用用户函数
func (self *eventQueue) protectedCall(callback func()) {
if self.capturePanic {
defer func() {
if err := recover(); err != nil {
self.onPanic(err, self)
}
}()
}
callback()
}
// 开启事件循环
func (self *eventQueue) StartLoop() EventQueue {
self.endSignal.Add(1)
go func() {
var writeList []interface{}
for {
writeList = writeList[0:0]
exit := self.Pick(&writeList)
// 遍历要发送的数据
for _, msg := range writeList {
switch t := msg.(type) {
case func():
self.protectedCall(t)
case nil:
break
default:
logs.Debug("unexpected type %T", t)
}
}
if exit {
break
}
}
self.endSignal.Done()
}()
return self
}
// 停止事件循环
func (self *eventQueue) StopLoop() EventQueue {
self.Add(nil)
return self
}
// 等待退出消息
func (self *eventQueue) Wait() {
self.endSignal.Wait()
}
// 创建默认长度的队列
func NewEventQueue() EventQueue {
return &eventQueue{
Pipe: NewPipe(),
// 默认的崩溃捕获打印
onPanic: func(raw interface{}, queue EventQueue) {
logs.Error("%v \n%s\n", raw, string(debug.Stack()))
debug.PrintStack()
},
}
}
// 在会话对应的Peer上的事件队列中执行callback,如果没有队列,则马上执行
func SessionQueuedCall(ses Session, callback func()) {
if ses == nil {
return
}
// q := ses.Peer().(interface {
// Queue() EventQueue
// }).Queue()
q := ses.GetPeer().Queue()
QueuedCall(q, callback)
}
// 有队列时队列调用,无队列时直接调用
func QueuedCall(queue EventQueue, callback func()) {
if queue == nil {
callback()
} else {
queue.Post(callback)
}
}