Skip to content

Commit c70529d

Browse files
committed
[Function add]
1.ArrayBlockingQueue. 2.LinkedBlockingQueue.
1 parent 7b5888b commit c70529d

File tree

2 files changed

+1448
-0
lines changed

2 files changed

+1448
-0
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
# Queue Conclusion
2+
3+
### Queue interface
4+
> `boolean add(E e);` Add and offer are used to add element into the queue.
5+
>
6+
> `boolean offer(E e);`
7+
>
8+
> `E remove();` Retrieve and remove the head of the queue.
9+
>
10+
> `E poll();`
11+
>
12+
> `E element();` Retrieves, but does not remove, the head of this queue.
13+
>
14+
> `E peek();`
15+
16+
### Blocking and non-blocking
17+
* Concurrent queue有两种实现方法,阻塞和非阻塞。
18+
* 阻塞队列是通过锁实现。
19+
* 非阻塞队列通过AQS实现。
20+
21+
#### ArrayBlockingQueue
22+
* ArrayBlockingQueue :一个由数组支持的有界队列。
23+
* 如果到达了上界,将无法添加新的元素进入。
24+
* FIFO
25+
```Java
26+
public boolean offer(E e) {
27+
checkNotNull(e);
28+
final ReentrantLock lock = this.lock;
29+
lock.lock(); //在写入的过程中获取锁
30+
try {
31+
if (count == items.length)
32+
return false;
33+
else {
34+
enqueue(e); //调用私有的enqueue方法
35+
return true;
36+
}
37+
} finally {
38+
lock.unlock(); //释放锁
39+
}
40+
}
41+
```
42+
```Java
43+
/**
44+
* Inserts element at current put position, advances, and signals.
45+
* Call only when holding lock.
46+
*/
47+
private void enqueue(E x) {
48+
// assert lock.getHoldCount() == 1;
49+
// assert items[putIndex] == null;
50+
final Object[] items = this.items;
51+
items[putIndex] = x;
52+
if (++putIndex == items.length)
53+
putIndex = 0;
54+
count++;
55+
notEmpty.signal(); //取消notEmpty的await.
56+
}
57+
```
58+
```Java
59+
public E poll() {
60+
final ReentrantLock lock = this.lock;
61+
lock.lock();
62+
try {
63+
return (count == 0) ? null : dequeue(); //判断当前队列有没有元素。有的话调用deqeueu方法。
64+
} finally {
65+
lock.unlock();
66+
}
67+
}
68+
```
69+
```Java
70+
private E dequeue() {
71+
// assert lock.getHoldCount() == 1;
72+
// assert items[takeIndex] != null;
73+
final Object[] items = this.items;
74+
@SuppressWarnings("unchecked")
75+
E x = (E) items[takeIndex];
76+
items[takeIndex] = null;
77+
if (++takeIndex == items.length)
78+
takeIndex = 0;
79+
count--;
80+
if (itrs != null)
81+
itrs.elementDequeued();
82+
notFull.signal();
83+
return x;
84+
}
85+
```
86+
87+
#### LinkedBlockingQueue
88+
* 一个由链接节点支持的可选有界队列。
89+
* 内部维护了一个Node类
90+
```Java
91+
static class Node<E> {
92+
E item;
93+
94+
/**
95+
* One of:
96+
* - the real successor Node
97+
* - this Node, meaning the successor is head.next
98+
* - null, meaning there is no successor (this is the last node)
99+
*/
100+
Node<E> next;
101+
102+
Node(E x) { item = x; }
103+
}
104+
```
105+
```Java
106+
public boolean offer(E e) {
107+
if (e == null) throw new NullPointerException();
108+
final AtomicInteger count = this.count; //此处的count为AtomicInteger,维护了原子性
109+
if (count.get() == capacity)
110+
return false;
111+
int c = -1;
112+
Node<E> node = new Node<E>(e);
113+
final ReentrantLock putLock = this.putLock;
114+
putLock.lock();
115+
try {
116+
if (count.get() < capacity) {
117+
enqueue(node);
118+
c = count.getAndIncrement();
119+
if (c + 1 < capacity)
120+
notFull.signal();
121+
}
122+
} finally {
123+
putLock.unlock();
124+
}
125+
if (c == 0)
126+
signalNotEmpty();
127+
return c >= 0;
128+
}
129+
130+
private void enqueue(Node<E> node) {
131+
// assert putLock.isHeldByCurrentThread();
132+
// assert last.next == null;
133+
last = last.next = node; //在链表的结尾,添加要插入的结点。
134+
}
135+
```
136+
```Java
137+
public E poll() {
138+
final AtomicInteger count = this.count;
139+
if (count.get() == 0)
140+
return null;
141+
E x = null;
142+
int c = -1;
143+
final ReentrantLock takeLock = this.takeLock;
144+
takeLock.lock();
145+
try {
146+
if (count.get() > 0) {
147+
x = dequeue();
148+
c = count.getAndDecrement();
149+
if (c > 1)
150+
notEmpty.signal();
151+
}
152+
} finally {
153+
takeLock.unlock();
154+
}
155+
if (c == capacity)
156+
signalNotFull();
157+
return x;
158+
}
159+
```
160+
161+
#### PriorityBlockingQueue
162+
* 一个由优先级堆支持的无界优先级队列。
163+
```Java
164+
public boolean offer(E e) {
165+
if (e == null)
166+
throw new NullPointerException();
167+
final ReentrantLock lock = this.lock;
168+
lock.lock();
169+
int n, cap;
170+
Object[] array;
171+
while ((n = size) >= (cap = (array = queue).length))
172+
tryGrow(array, cap);
173+
try {
174+
Comparator<? super E> cmp = comparator;
175+
if (cmp == null)
176+
siftUpComparable(n, e, array);
177+
else
178+
siftUpUsingComparator(n, e, array, cmp);
179+
size = n + 1;
180+
notEmpty.signal();
181+
} finally {
182+
lock.unlock();
183+
}
184+
return true;
185+
}
186+
```
187+
188+
189+
190+
191+
192+
193+
194+
195+
196+
197+
198+
199+
200+
201+
202+
203+
204+
205+
206+
207+
208+
209+
210+
211+
212+
213+
214+
215+
216+
217+
218+
219+
220+
221+
222+
223+
224+
225+
226+
227+
228+

0 commit comments

Comments
 (0)