- ArrayBlockingQueue : 一个由数组支持的有界队列。
- 如果到达了上界,将无法添加新的元素进入。
- FIFO
ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。
- 阻塞方法: put() <---> take()
- 非阻塞方法: offer() <---> poll()
- put() put是ArrayBlockingQueue的方法(不是从Queue接口中继承来),在该方法中获取全局锁,如果队列满,将会阻塞直到有空间可以插入元素。条件(队列空,满)是通过Condition接口实现的。
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; //获取全局的锁
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); //如果当前的队列满了,则一直阻塞
enqueue(e); //调用全局的私有入队列方法
} finally {
lock.unlock();
}
}
- offer() offer是Queue接口要求实现的方法,如果队列仍有位置允许插入,插入元素,如果队列已满,直接返回false,不会阻塞。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock(); //在写入的过程中获取锁
try {
if (count == items.length) //如果queue中的元素已经到达了上限,直接返回false
return false;
else {
enqueue(e); //调用私有的enqueue方法
return true;
}
} finally {
lock.unlock(); //释放锁
}
}
- add() add是Queue接口要求实现的方法,内部调用了offer(),如果队列仍有位置允许插入,插入元素,如果队列已满,抛出异常,不会阻塞。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
- enqueue() enqueue是插入队列的核心方法,维护了一个读指针,一个写指针。
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); //取消notEmpty的await(put方法).
}
- take() take和put对应,是ArrayBlockingQueue私有的阻塞方法,在读取的过程中,如果发现队列为空,则会阻塞直到有元素可以读取。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
- poll() 和offer()对应,如果没有元素,不会阻塞,返回false
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue(); //判断当前队列有没有元素。有的话调用deqeueu方法。
} finally {
lock.unlock();
}
}
- dequeue()
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) //循环读取数组
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
- peek() 并不会删除队列的第一个元素,单纯的读取值。调用的是itemAt方法
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
- Producer Producer continuously put data into queue.
public class ArrayBlockingQueueProducer implements Runnable {
private ArrayBlockingQueue<Integer> queue;
public ArrayBlockingQueueProducer(ArrayBlockingQueue<Integer> queue) {
super();
this.queue = queue;
}
private volatile AtomicInteger ai = new AtomicInteger(0);
@Override
public void run() {
while(true){
try {
queue.put(ai.getAndIncrement());
System.out.println("Producer: put " + ai.get() + " into queue...");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- Consumer Consumer get take data from queue 100 times
public class ArrayBlockingQueueConsumer implements Runnable {
private ArrayBlockingQueue<Integer> queue;
public ArrayBlockingQueueConsumer(ArrayBlockingQueue<Integer> queue) {
super();
this.queue = queue;
}
@Override
public void run() {
try {
for(int i = 0; i < 100; i++){
Thread.currentThread().join(10);
System.out.println("Consumer: get " + queue.take() + " from queue...");
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
new Thread(new ArrayBlockingQueueConsumer(queue)).start();
new Thread(new ArrayBlockingQueueProducer(queue)).start();
Thread.currentThread().join();
System.out.println("Finish...");
}
}
- Result
... Producer: put 108 into queue... Consumer: get 98 from queue... Producer: put 109 into queue... Consumer: get 99 from queue... Producer: put 110 into queue...
We can find that Producer is blocked.