forked from yireyun/go-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
esQueue.go
352 lines (308 loc) · 9.03 KB
/
esQueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
package queue
/*
@File : lock_free_queue.go
@Description: an bounded lock free queue use slice as circle queue
clone from https://github.com/yireyun/go-queue & and changed
@Author : gxl
@Time : 2021/3/29 14:16
@Update:
*/
import (
"fmt"
"runtime"
"time"
"go.uber.org/atomic"
)
const (
MinCap = 8 // 最小队列长度,防止队列过小,竞争太激烈; 理论上越大冲突越小
MaxWait = 100 // 当出现饥饿竞态时,最多让出cpu的次数
)
type IQueue interface {
Info() string
Capacity() uint32
Count() uint32
Put(val interface{}) (ok bool, count uint32)
RetryPut(val interface{}, retry uint32) (ok bool, count uint32)
Get() (val interface{}, ok bool, count uint32)
RetryGet(retry uint32) (val interface{}, ok bool, count uint32)
Gets(values []interface{}) (gets, count uint32)
Puts(values []interface{}) (puts, count uint32)
}
// slot of queue, each slot has a putID and getID
// when new value put, putID will increase by cap, and that mean's it's has value
// only getID + cap == putID, can get value from this slot, then getID increase by cap
// when getID == putID, this slot is empty
type slot struct {
putID *atomic.Uint32
getID *atomic.Uint32
value interface{}
}
// EsQueue An bounded lock free Queue
type EsQueue struct {
capacity uint32 // const after init, always 2's power
capMod uint32 // cap - 1, const after init
putPos *atomic.Uint32
getPos *atomic.Uint32
carrier []slot
}
// NewQueue alloc a fixed size of cap Queue
// and do some essential init
func NewQueue(cap uint32) *EsQueue {
if cap < 1 {
cap = MinCap
}
q := new(EsQueue)
q.capacity = minRoundNumBy2(cap)
q.capMod = q.capacity - 1
q.putPos = atomic.NewUint32(0)
q.getPos = atomic.NewUint32(0)
q.carrier = make([]slot, q.capacity)
// putID/getID 提前分配好,每用一个,delta增加一轮
tmp := &q.carrier[0]
tmp.putID = atomic.NewUint32(q.capacity)
tmp.getID = atomic.NewUint32(q.capacity)
var i uint32 = 1
for ; i < q.capacity; i++ {
tmp = &q.carrier[i]
tmp.getID = atomic.NewUint32(i)
tmp.putID = atomic.NewUint32(i)
}
return q
}
// Info return the summary info of queue
func (q *EsQueue) Info() string {
return fmt.Sprintf("Queue{capacity: %v, capMod: %v, putPos: %v, getPos: %v}",
q.capacity, q.capMod, q.putPos.Load(), q.getPos.Load())
}
// Capacity max capacity
func (q *EsQueue) Capacity() uint32 {
return q.capacity
}
// Count Current count, maybe changed every moment
func (q *EsQueue) Count() uint32 {
getPos := q.getPos.Load()
putPos := q.putPos.Load()
return q.posCount(getPos, putPos)
}
// Put May failed if lock slot failed or full
// caller should retry if failed
// should not put nil for normal logic
func (q *EsQueue) Put(val interface{}) (ok bool, count uint32) {
getPos := q.getPos.Load()
putPos := q.putPos.Load()
cnt := q.posCount(getPos, putPos)
// 如果满了,就直接失败
if cnt >= q.capMod-1 {
runtime.Gosched()
return false, cnt
}
// 先占一个坑,如果占坑失败,就直接返回
posNext := putPos + 1
if !q.putPos.CAS(putPos, posNext) {
runtime.Gosched()
return false, cnt
}
var cache *slot = &q.carrier[posNext&q.capMod]
var waitCounter = 0
for {
getID := cache.getID.Load()
putID := cache.putID.Load()
if posNext == putID && getID == putID {
cache.value = val
cache.putID.Add(q.capacity)
return true, cnt + 1
} else {
// 存线程的竞争过多,而队列cap过小,前面的如果写数据动作比较慢,而后来的进程已经lock到这个位置的下一轮了
// 此时,这个位置等于已经被他预约了,但是数据还没取走,需要等待下次get了数据之后,才能重新put
// 所以就先让出cpu,等待下次调度
// 为啥不直接返回? 因为位置已经占了,其他线程不会占用这个地方了
waitCounter++
fmt.Printf("put too quick: getID %v, putID %v and putPosNext: %v, wait: %v\n", getID, putID, posNext, waitCounter)
if waitCounter > MaxWait {
// 实在put不进去,一直没有消费, 那就扔一条吧, 这里主要是防止调用进程死等, 理论上极小概率到这里
val, ok, cnt := q.Get()
if ok {
fmt.Printf("throw val: %v away, cnt: %v\n", val, cnt)
continue
}
}
runtime.Gosched()
}
}
}
// Get May failed if lock slot failed or empty
// caller should retry if failed, val nil also means false
func (q *EsQueue) Get() (val interface{}, ok bool, count uint32) {
getPos := q.getPos.Load()
putPos := q.putPos.Load()
cnt := q.posCount(getPos, putPos)
if cnt < 1 {
runtime.Gosched()
return nil, false, cnt
}
getPosNext := getPos + 1
if !q.getPos.CAS(getPos, getPosNext) {
runtime.Gosched()
return nil, false, cnt
}
cache := &q.carrier[getPosNext&q.capMod]
var waitCounter = 0
for {
getID := cache.getID.Load()
putID := cache.putID.Load()
if getPosNext == getID && (getID+q.capacity == putID) {
val = cache.value
cache.value = nil
cache.getID.Add(q.capacity)
ret := true
if val == nil {
ret = false
}
return val, ret, cnt - 1
} else {
// 可能是取的竞争过多,而队列cap过小,前面的如果取数据动作比较慢,而后来的进程已经取到这个位置的下一轮了
// 此时,这个位置等于已经被他预约了,但是却没数据,需要等待下次put了数据之后,才能重新取到
// 所以就先让出cpu,等待下次调度
waitCounter++
fmt.Printf("get too quick: getID %v, putID %v and getPosNext: %v, wait: %v\n", getID, putID, getPosNext, waitCounter)
if waitCounter > MaxWait {
// 实在get不到,一直没有put, 那就put一个假数据吧, 这里主要是防止调用进程死等, 理论上极小概率到这里
ok, _ := q.Put(nil)
if ok {
fmt.Printf("put nil to escape\n")
continue
}
}
runtime.Gosched()
}
}
}
// RetryPut Retry max retry times to put val to queue
// Each interval will sleep a short time, current is 3 millisecond
func (q *EsQueue) RetryPut(val interface{}, retry uint32) (ok bool, count uint32) {
if retry == 0 {
return false, q.Count()
}
ok, cnt := q.Put(val)
if ok || retry == 1 {
return ok, cnt
}
time.Sleep(time.Millisecond * 3)
return q.RetryPut(val, retry-1)
}
// RetryGet Retry max retry times to get val from queue
// Each interval will sleep a short time, current is 3 millisecond
func (q *EsQueue) RetryGet(retry uint32) (val interface{}, ok bool, count uint32) {
if retry == 0 {
return nil, false, q.Count()
}
val, ok, cnt := q.Get()
if ok || retry == 1 {
return val, ok, cnt
}
time.Sleep(time.Millisecond * 3)
return q.RetryGet(retry - 1)
}
// Gets one time get at most N val from queue
// Storage Array values should be init to fixed size
func (q *EsQueue) Gets(values []interface{}) (gets, count uint32) {
getPos := q.getPos.Load()
putPos := q.putPos.Load()
cnt := q.posCount(getPos, putPos)
if cnt < 1 {
runtime.Gosched()
return 0, cnt
}
var getCnt uint32
if size := uint32(len(values)); cnt >= size {
getCnt = size
} else {
getCnt = cnt
}
getPosNew := getPos + getCnt
if !q.getPos.CAS(getPos, getPosNew) {
runtime.Gosched()
return 0, cnt
}
for posNew, v := getPos+1, uint32(0); v < getCnt; posNew, v = posNew+1, v+1 {
var cache *slot = &q.carrier[posNew&q.capMod]
for {
if q.canGet(posNew, cache) {
values[v] = cache.value
cache.value = nil
cache.getID.Add(q.capacity)
break
} else {
runtime.Gosched()
}
}
}
return getCnt, cnt - getCnt
}
// Puts one time put at most N val to queue
// Storage Array values should carry N val
func (q *EsQueue) Puts(values []interface{}) (puts, count uint32) {
getPos := q.getPos.Load()
putPos := q.putPos.Load()
cnt := q.posCount(getPos, putPos)
if cnt >= q.capMod-1 {
runtime.Gosched()
return 0, cnt
}
var putCnt uint32
if capPuts, size := q.capacity-cnt, uint32(len(values)); capPuts >= size {
putCnt = size
} else {
putCnt = capPuts
}
putPosNew := putPos + putCnt
if !q.putPos.CAS(putPos, putPosNew) {
runtime.Gosched()
return 0, cnt
}
for posNew, v := putPos+1, uint32(0); v < putCnt; posNew, v = posNew+1, v+1 {
var cache *slot = &q.carrier[posNew&q.capMod]
for {
if q.canPut(posNew, cache) {
cache.value = values[v]
cache.putID.Add(q.capacity)
break
} else {
runtime.Gosched()
}
}
}
return putCnt, cnt + putCnt
}
func (q *EsQueue) canPut(posNew uint32, cache *slot) bool {
getID := cache.getID.Load()
putID := cache.putID.Load()
// putID == getID 才代表此处为空,可以写新数据
return posNew == putID && getID == putID
}
func (q *EsQueue) canGet(getPosNew uint32, cache *slot) bool {
getID := cache.getID.Load()
putID := cache.putID.Load()
return getPosNew == getID && (getID+q.capacity == putID)
}
func (q *EsQueue) isFull() bool {
return q.Count() >= q.capMod-1
}
func (q *EsQueue) posCount(getPos, putPos uint32) uint32 {
if putPos >= getPos {
return putPos - getPos
}
return q.capMod - getPos + putPos
}
// minRoundNumBy2 round 到 >=N的 最近的2的倍数,
// eg f(3) = 4
func minRoundNumBy2(v uint32) uint32 {
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
return v
}