-
Notifications
You must be signed in to change notification settings - Fork 0
/
chan.go
197 lines (173 loc) · 4.36 KB
/
chan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
* @Author: jinde.zgm
* @Date: 2020-08-20 20:11:32
* @Descripttion:
*/
package concurrent
import (
"container/list"
"sync/atomic"
)
// QueuedChan is a channel with dynamic buffer size implemented using a list
type QueuedChan struct {
*list.List // Object list
len int32 // Because List.Len() is not thread safe, atomic access to the variable to implement thread safety
close chan struct{} // Close signal
done chan struct{} // Closed signal
pushc chan interface{} // Push object channel
popc chan interface{} // Pop object channel
ctrlc chan interface{} // Control channel
}
// NewQueuedChan create QueuedChan.
func NewQueuedChan() *QueuedChan {
c := &QueuedChan{
List: list.New(),
close: make(chan struct{}),
done: make(chan struct{}),
pushc: make(chan interface{}),
popc: make(chan interface{}),
ctrlc: make(chan interface{}),
}
// Create a coroutine to push and pop object.
go c.run()
return c
}
// queuedChanRemoveCmd is remove object command.
type queuedChanRemoveCmd struct {
f func(i interface{}) (ok bool, cont bool) // Object filter function.
r chan int // Object number removed.
}
// PushChan get push object channel.
func (c *QueuedChan) PushChan() chan<- interface{} { return c.pushc }
// Push object into channel.
func (c *QueuedChan) Push(i interface{}) {
select {
case c.pushc <- i:
case <-c.close:
}
}
// PopChan get pop object channel.
func (c *QueuedChan) PopChan() <-chan interface{} { return c.popc }
// Pop object from channel.
func (c *QueuedChan) Pop() interface{} {
select {
case i := <-c.popc:
return i
case <-c.close:
return nil
}
}
// Len get buffered object count.
func (c *QueuedChan) Len() int { return int(atomic.LoadInt32(&c.len)) }
// Remove the object filtered by function f.
// The two return values of function f: the first is whether to delete, the second is whether to continue.
func (c *QueuedChan) Remove(f func(i interface{}) (ok bool, cont bool)) int {
// Send remove command.
r := make(chan int)
select {
case c.ctrlc <- queuedChanRemoveCmd{f: f, r: r}:
case <-c.close:
return 0
}
// Waiting for result.
select {
case n := <-r:
return n
case <-c.close:
return 0
}
}
// Close channel
func (c *QueuedChan) Close() {
close(c.close)
<-c.done
// Close pop channel avoid reader blocked.
close(c.popc)
}
// CloseAndFlush close channel and flush buffered objects.
func (c *QueuedChan) CloseAndFlush() {
close(c.close)
<-c.done
// Flush buffered objects.
c.flush()
// Close pop channel avoid reader blocked.
close(c.popc)
}
// run get object from push channel and push back into list,
// pop front object from list and output by pop channel.
func (c *QueuedChan) run() {
// Notify close channel coroutine.
defer close(c.done)
for {
var elem *list.Element
var item interface{}
var popc chan<- interface{}
// Get front element of the queue.
if elem = c.Front(); nil != elem {
popc, item = c.popc, elem.Value
}
select {
// Put the new object into the end of queue.
case i := <-c.pushc:
c.PushBack(i)
// Remove the front element from queue if send out success
case popc <- item:
c.List.Remove(elem)
// Control command
case cmd := <-c.ctrlc:
c.control(cmd)
// Channel is closed
case <-c.close:
return
}
// Update channel length
atomic.StoreInt32(&c.len, int32(c.List.Len()))
}
}
// flush flush the value buffered in the queue.
func (c *QueuedChan) flush() {
// Flush queue.
for elem := c.Front(); nil != elem; elem = c.Front() {
c.popc <- elem.Value
c.List.Remove(elem)
}
}
// control channel.
func (c *QueuedChan) control(ctrl interface{}) {
switch cmd := ctrl.(type) {
// Remove object from channel.
case queuedChanRemoveCmd:
c.remove(&cmd)
// Unknown command.
default:
panic(ctrl)
}
}
// remove execute remove command
func (c *QueuedChan) remove(cmd *queuedChanRemoveCmd) {
// Get object count before remove.
count := c.List.Len()
// Iterate list.
for i := c.Front(); i != nil; {
var re *list.Element
// Filter object.
ok, cont := cmd.f(i.Value)
if ok {
re = i
}
// Next element.
i = i.Next()
// Remove element
if nil != re {
c.List.Remove(re)
}
// Continue
if !cont {
break
}
}
// Update channel length
atomic.StoreInt32(&c.len, int32(c.List.Len()))
// Return removed object number.
cmd.r <- count - c.List.Len()
}