-
Notifications
You must be signed in to change notification settings - Fork 77
/
Copy pathlinked_queue.go
157 lines (139 loc) · 3.39 KB
/
linked_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*
lock free linked queue.
ref:
1. http://ddrv.cn/a/591069
2. https://coolshell.cn/articles/8239.html
*/
package concurrent
import (
"sync"
"sync/atomic"
"unsafe"
)
type LinkedQueueNode struct {
Value interface{}
Next *LinkedQueueNode
}
func (node *LinkedQueueNode) casNext(oldV, newV *LinkedQueueNode) bool {
return atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&node.Next)),
unsafe.Pointer(oldV),
unsafe.Pointer(newV),
)
}
func (node *LinkedQueueNode) loadNext() *LinkedQueueNode {
return (*LinkedQueueNode)(atomic.LoadPointer(
(*unsafe.Pointer)(unsafe.Pointer(&node.Next)),
))
}
type LinkedQueue struct {
head *LinkedQueueNode
tail *LinkedQueueNode
size int64
m sync.Mutex
}
func NewLinkedQueue() *LinkedQueue {
dummy := &LinkedQueueNode{}
dummy.Value = nil
dummy.Next = nil
return &LinkedQueue{ // like container/list, use same node
head: dummy,
tail: dummy,
}
}
func (queue *LinkedQueue) casTail(oldV, newV *LinkedQueueNode) bool {
return atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&queue.tail)),
unsafe.Pointer(oldV),
unsafe.Pointer(newV),
)
}
func (queue *LinkedQueue) casHead(oldV, newV *LinkedQueueNode) bool {
return atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&queue.head)),
unsafe.Pointer(oldV),
unsafe.Pointer(newV),
)
}
func (queue *LinkedQueue) loadHead() *LinkedQueueNode {
return (*LinkedQueueNode)(atomic.LoadPointer(
(*unsafe.Pointer)(unsafe.Pointer(&queue.head)),
))
}
func (queue *LinkedQueue) loadTail() *LinkedQueueNode {
return (*LinkedQueueNode)(atomic.LoadPointer(
(*unsafe.Pointer)(unsafe.Pointer(&queue.tail)),
))
}
func (queue *LinkedQueue) Enqueue(v interface{}) bool {
newNode := &LinkedQueueNode{Value: v, Next: nil}
var tail, next *LinkedQueueNode
for {
// use atomic load and cas
tail = queue.loadTail()
next = tail.loadNext()
if tail == queue.loadTail() { // double check
if next == nil { // queue tail
if tail.casNext(next, newNode) { // link to queue
break
}
} else {
queue.casTail(tail, next) // move tail pointer to real tail
}
}
}
queue.casTail(tail, newNode) // failure is ok, another thread has update
atomic.AddInt64(&queue.size, 1)
return true
}
func (queue *LinkedQueue) Dequeue() interface{} {
var head, tail, first *LinkedQueueNode
for {
// use atomic load and cas
head = queue.loadHead() // dummy
tail = queue.loadTail() // dummy
first = head.loadNext() // nil
if head == queue.loadHead() { // double check
if first == nil { // empty list
return nil
}
if head == tail { // empty list
queue.casTail(tail, first) // move tail to real pointer
continue
}
if queue.casHead(head, first) {
break
}
}
}
atomic.AddInt64(&queue.size, -1)
return first.Value
}
func (queue *LinkedQueue) Size() int64 {
return atomic.LoadInt64(&queue.size)
}
func (queue *LinkedQueue) EnqueueWithLock(v interface{}) bool {
newNode := &LinkedQueueNode{Value: v, Next: nil}
queue.m.Lock()
defer queue.m.Unlock()
tail := queue.tail
tail.Next = newNode
queue.tail = newNode
queue.size += 1
return true
}
func (queue *LinkedQueue) DequeueWithLock() interface{} {
var head, tail, first *LinkedQueueNode
queue.m.Lock()
defer queue.m.Unlock()
head = queue.head
tail = queue.tail
first = head.Next
if head == tail {
return nil
}
queue.head = first
head.Next = nil
queue.size -= 1
return first.Value
}