-
Notifications
You must be signed in to change notification settings - Fork 2
/
sync_queue.go
89 lines (71 loc) · 1.5 KB
/
sync_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package synckit
import (
"sync"
)
func NewSyncQueue(locker sync.Locker) SyncQueue {
if locker == nil {
panic("illegal value")
}
return SyncQueue{locker: locker}
}
func NewSignalCondQueue(signal *sync.Cond) SyncQueue {
if signal == nil {
panic("illegal value")
}
return SyncQueue{locker: signal.L, signalFn: signal.Broadcast}
}
func NewSignalFuncQueue(locker sync.Locker, signalFn func()) SyncQueue {
if locker == nil {
panic("illegal value")
}
return SyncQueue{locker: locker, signalFn: signalFn}
}
func NewNoSyncQueue() SyncQueue {
return SyncQueue{locker: DummyLocker()}
}
type SyncFunc func(interface{})
type SyncFuncList []SyncFunc
type SyncQueue struct {
locker sync.Locker
signalFn func()
queue SyncFuncList
}
func (p *SyncQueue) Locker() sync.Locker {
return p.locker
}
func (p *SyncQueue) IsZero() bool {
return p.locker == nil
}
func (p *SyncQueue) Add(fn SyncFunc) {
if fn == nil {
panic("illegal value")
}
p.locker.Lock()
defer p.locker.Unlock()
p.queue = append(p.queue, fn)
if p.signalFn != nil {
p.signalFn()
}
}
func (p *SyncQueue) Flush() SyncFuncList {
p.locker.Lock()
defer p.locker.Unlock()
if len(p.queue) == 0 {
return nil
}
nextCap := cap(p.queue)
if nextCap > 128 && len(p.queue)<<1 < nextCap {
nextCap >>= 1
}
queue := p.queue
p.queue = make(SyncFuncList, 0, nextCap)
return queue
}
func (p *SyncQueue) AddAll(list SyncFuncList) {
if len(list) == 0 {
return
}
p.locker.Lock()
defer p.locker.Unlock()
p.queue = append(p.queue, list...)
}