Browse files

add doug lea's concurrent LinkedBlockingQueue to the test cases, and …

…make it selectable with "-7". presumably this will appear in java 7.
  • Loading branch information...
1 parent 3f62ae6 commit 9619fc46b88aafc518862e38659b084c97d08125 Robey Pointer committed Jan 13, 2012
View
876 src/main/java/java/util/concurrent/LinkedBlockingQueue.java
@@ -0,0 +1,876 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/publicdomain/zero/1.0/
+ */
+
+package java.util.concurrent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
+ * linked nodes.
+ * This queue orders elements FIFO (first-in-first-out).
+ * The <em>head</em> of the queue is that element that has been on the
+ * queue the longest time.
+ * The <em>tail</em> of the queue is that element that has been on the
+ * queue the shortest time. New elements
+ * are inserted at the tail of the queue, and the queue retrieval
+ * operations obtain elements at the head of the queue.
+ * Linked queues typically have higher throughput than array-based queues but
+ * less predictable performance in most concurrent applications.
+ *
+ * <p> The optional capacity bound constructor argument serves as a
+ * way to prevent excessive queue expansion. The capacity, if unspecified,
+ * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
+ * dynamically created upon each insertion unless this would bring the
+ * queue above capacity.
+ *
+ * <p>This class and its iterator implement all of the
+ * <em>optional</em> methods of the {@link Collection} and {@link
+ * Iterator} interfaces.
+ *
+ * <p>This class is a member of the
+ * <a href="{@docRoot}/../technotes/guides/collections/index.html">
+ * Java Collections Framework</a>.
+ *
+ * @since 1.5
+ * @author Doug Lea
+ * @param <E> the type of elements held in this collection
+ *
+ */
+public class LinkedBlockingQueue<E> extends AbstractQueue<E>
+ implements BlockingQueue<E>, java.io.Serializable {
+ private static final long serialVersionUID = -6903933977591709194L;
+
+ /*
+ * A variant of the "two lock queue" algorithm. The putLock gates
+ * entry to put (and offer), and has an associated condition for
+ * waiting puts. Similarly for the takeLock. The "count" field
+ * that they both rely on is maintained as an atomic to avoid
+ * needing to get both locks in most cases. Also, to minimize need
+ * for puts to get takeLock and vice-versa, cascading notifies are
+ * used. When a put notices that it has enabled at least one take,
+ * it signals taker. That taker in turn signals others if more
+ * items have been entered since the signal. And symmetrically for
+ * takes signalling puts. Operations such as remove(Object) and
+ * iterators acquire both locks.
+ *
+ * Visibility between writers and readers is provided as follows:
+ *
+ * Whenever an element is enqueued, the putLock is acquired and
+ * count updated. A subsequent reader guarantees visibility to the
+ * enqueued Node by either acquiring the putLock (via fullyLock)
+ * or by acquiring the takeLock, and then reading n = count.get();
+ * this gives visibility to the first n items.
+ *
+ * To implement weakly consistent iterators, it appears we need to
+ * keep all Nodes GC-reachable from a predecessor dequeued Node.
+ * That would cause two problems:
+ * - allow a rogue Iterator to cause unbounded memory retention
+ * - cause cross-generational linking of old Nodes to new Nodes if
+ * a Node was tenured while live, which generational GCs have a
+ * hard time dealing with, causing repeated major collections.
+ * However, only non-deleted Nodes need to be reachable from
+ * dequeued Nodes, and reachability does not necessarily have to
+ * be of the kind understood by the GC. We use the trick of
+ * linking a Node that has just been dequeued to itself. Such a
+ * self-link implicitly means to advance to head.next.
+ */
+
+ /**
+ * Linked list node class
+ */
+ static class Node<E> {
+ E item;
+
+ /**
+ * One of:
+ * - the real successor Node
+ * - this Node, meaning the successor is head.next
+ * - null, meaning there is no successor (this is the last node)
+ */
+ Node<E> next;
+
+ Node(E x) { item = x; }
+ }
+
+ /** The capacity bound, or Integer.MAX_VALUE if none */
+ private final int capacity;
+
+ /** Current number of elements */
+ private final AtomicInteger count = new AtomicInteger();
+
+ /**
+ * Head of linked list.
+ * Invariant: head.item == null
+ */
+ transient Node<E> head;
+
+ /**
+ * Tail of linked list.
+ * Invariant: last.next == null
+ */
+ private transient Node<E> last;
+
+ /** Lock held by take, poll, etc */
+ private final ReentrantLock takeLock = new ReentrantLock();
+
+ /** Wait queue for waiting takes */
+ private final Condition notEmpty = takeLock.newCondition();
+
+ /** Lock held by put, offer, etc */
+ private final ReentrantLock putLock = new ReentrantLock();
+
+ /** Wait queue for waiting puts */
+ private final Condition notFull = putLock.newCondition();
+
+ /**
+ * Signals a waiting take. Called only from put/offer (which do not
+ * otherwise ordinarily lock takeLock.)
+ */
+ private void signalNotEmpty() {
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
+ }
+ }
+
+ /**
+ * Signals a waiting put. Called only from take/poll.
+ */
+ private void signalNotFull() {
+ final ReentrantLock putLock = this.putLock;
+ putLock.lock();
+ try {
+ notFull.signal();
+ } finally {
+ putLock.unlock();
+ }
+ }
+
+ /**
+ * Links node at end of queue.
+ *
+ * @param node the node
+ */
+ private void enqueue(Node<E> node) {
+ // assert putLock.isHeldByCurrentThread();
+ // assert last.next == null;
+ last = last.next = node;
+ }
+
+ /**
+ * Removes a node from head of queue.
+ *
+ * @return the node
+ */
+ private E dequeue() {
+ // assert takeLock.isHeldByCurrentThread();
+ // assert head.item == null;
+ Node<E> h = head;
+ Node<E> first = h.next;
+ h.next = h; // help GC
+ head = first;
+ E x = first.item;
+ first.item = null;
+ return x;
+ }
+
+ /**
+ * Lock to prevent both puts and takes.
+ */
+ void fullyLock() {
+ putLock.lock();
+ takeLock.lock();
+ }
+
+ /**
+ * Unlock to allow both puts and takes.
+ */
+ void fullyUnlock() {
+ takeLock.unlock();
+ putLock.unlock();
+ }
+
+// /**
+// * Tells whether both locks are held by current thread.
+// */
+// boolean isFullyLocked() {
+// return (putLock.isHeldByCurrentThread() &&
+// takeLock.isHeldByCurrentThread());
+// }
+
+ /**
+ * Creates a {@code LinkedBlockingQueue} with a capacity of
+ * {@link Integer#MAX_VALUE}.
+ */
+ public LinkedBlockingQueue() {
+ this(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
+ *
+ * @param capacity the capacity of this queue
+ * @throws IllegalArgumentException if {@code capacity} is not greater
+ * than zero
+ */
+ public LinkedBlockingQueue(int capacity) {
+ if (capacity <= 0) throw new IllegalArgumentException();
+ this.capacity = capacity;
+ last = head = new Node<E>(null);
+ }
+
+ /**
+ * Creates a {@code LinkedBlockingQueue} with a capacity of
+ * {@link Integer#MAX_VALUE}, initially containing the elements of the
+ * given collection,
+ * added in traversal order of the collection's iterator.
+ *
+ * @param c the collection of elements to initially contain
+ * @throws NullPointerException if the specified collection or any
+ * of its elements are null
+ */
+ public LinkedBlockingQueue(Collection<? extends E> c) {
+ this(Integer.MAX_VALUE);
+ final ReentrantLock putLock = this.putLock;
+ putLock.lock(); // Never contended, but necessary for visibility
+ try {
+ int n = 0;
+ for (E e : c) {
+ if (e == null)
+ throw new NullPointerException();
+ if (n == capacity)
+ throw new IllegalStateException("Queue full");
+ enqueue(new Node<E>(e));
+ ++n;
+ }
+ count.set(n);
+ } finally {
+ putLock.unlock();
+ }
+ }
+
+ // this doc comment is overridden to remove the reference to collections
+ // greater in size than Integer.MAX_VALUE
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this queue
+ */
+ public int size() {
+ return count.get();
+ }
+
+ // this doc comment is a modified copy of the inherited doc comment,
+ // without the reference to unlimited queues.
+ /**
+ * Returns the number of additional elements that this queue can ideally
+ * (in the absence of memory or resource constraints) accept without
+ * blocking. This is always equal to the initial capacity of this queue
+ * less the current {@code size} of this queue.
+ *
+ * <p>Note that you <em>cannot</em> always tell if an attempt to insert
+ * an element will succeed by inspecting {@code remainingCapacity}
+ * because it may be the case that another thread is about to
+ * insert or remove an element.
+ */
+ public int remainingCapacity() {
+ return capacity - count.get();
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue, waiting if
+ * necessary for space to become available.
+ *
+ * @throws InterruptedException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public void put(E e) throws InterruptedException {
+ if (e == null) throw new NullPointerException();
+ // Note: convention in all put/take/etc is to preset local var
+ // holding count negative to indicate failure unless set.
+ int c = -1;
+ Node<E> node = new Node<E>(e);
+ final ReentrantLock putLock = this.putLock;
+ final AtomicInteger count = this.count;
+ putLock.lockInterruptibly();
+ try {
+ /*
+ * Note that count is used in wait guard even though it is
+ * not protected by lock. This works because count can
+ * only decrease at this point (all other puts are shut
+ * out by lock), and we (or some other waiting put) are
+ * signalled if it ever changes from capacity. Similarly
+ * for all other uses of count in other wait guards.
+ */
+ while (count.get() == capacity) {
+ notFull.await();
+ }
+ enqueue(node);
+ c = count.getAndIncrement();
+ if (c + 1 < capacity)
+ notFull.signal();
+ } finally {
+ putLock.unlock();
+ }
+ if (c == 0)
+ signalNotEmpty();
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue, waiting if
+ * necessary up to the specified wait time for space to become available.
+ *
+ * @return {@code true} if successful, or {@code false} if
+ * the specified waiting time elapses before space is available.
+ * @throws InterruptedException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public boolean offer(E e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+
+ if (e == null) throw new NullPointerException();
+ long nanos = unit.toNanos(timeout);
+ int c = -1;
+ final ReentrantLock putLock = this.putLock;
+ final AtomicInteger count = this.count;
+ putLock.lockInterruptibly();
+ try {
+ while (count.get() == capacity) {
+ if (nanos <= 0)
+ return false;
+ nanos = notFull.awaitNanos(nanos);
+ }
+ enqueue(new Node<E>(e));
+ c = count.getAndIncrement();
+ if (c + 1 < capacity)
+ notFull.signal();
+ } finally {
+ putLock.unlock();
+ }
+ if (c == 0)
+ signalNotEmpty();
+ return true;
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue if it is
+ * possible to do so immediately without exceeding the queue's capacity,
+ * returning {@code true} upon success and {@code false} if this queue
+ * is full.
+ * When using a capacity-restricted queue, this method is generally
+ * preferable to method {@link BlockingQueue#add add}, which can fail to
+ * insert an element only by throwing an exception.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean offer(E e) {
+ if (e == null) throw new NullPointerException();
+ final AtomicInteger count = this.count;
+ if (count.get() == capacity)
+ return false;
+ int c = -1;
+ Node<E> node = new Node<E>(e);
+ final ReentrantLock putLock = this.putLock;
+ putLock.lock();
+ try {
+ if (count.get() < capacity) {
+ enqueue(node);
+ c = count.getAndIncrement();
+ if (c + 1 < capacity)
+ notFull.signal();
+ }
+ } finally {
+ putLock.unlock();
+ }
+ if (c == 0)
+ signalNotEmpty();
+ return c >= 0;
+ }
+
+ public E take() throws InterruptedException {
+ E x;
+ int c = -1;
+ final AtomicInteger count = this.count;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lockInterruptibly();
+ try {
+ while (count.get() == 0) {
+ notEmpty.await();
+ }
+ x = dequeue();
+ c = count.getAndDecrement();
+ if (c > 1)
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
+ }
+ if (c == capacity)
+ signalNotFull();
+ return x;
+ }
+
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ E x = null;
+ int c = -1;
+ long nanos = unit.toNanos(timeout);
+ final AtomicInteger count = this.count;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lockInterruptibly();
+ try {
+ while (count.get() == 0) {
+ if (nanos <= 0)
+ return null;
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+ x = dequeue();
+ c = count.getAndDecrement();
+ if (c > 1)
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
+ }
+ if (c == capacity)
+ signalNotFull();
+ return x;
+ }
+
+ public E poll() {
+ final AtomicInteger count = this.count;
+ if (count.get() == 0)
+ return null;
+ E x = null;
+ int c = -1;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ if (count.get() > 0) {
+ x = dequeue();
+ c = count.getAndDecrement();
+ if (c > 1)
+ notEmpty.signal();
+ }
+ } finally {
+ takeLock.unlock();
+ }
+ if (c == capacity)
+ signalNotFull();
+ return x;
+ }
+
+ public E peek() {
+ if (count.get() == 0)
+ return null;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ Node<E> first = head.next;
+ if (first == null)
+ return null;
+ else
+ return first.item;
+ } finally {
+ takeLock.unlock();
+ }
+ }
+
+ /**
+ * Unlinks interior Node p with predecessor trail.
+ */
+ void unlink(Node<E> p, Node<E> trail) {
+ // assert isFullyLocked();
+ // p.next is not changed, to allow iterators that are
+ // traversing p to maintain their weak-consistency guarantee.
+ p.item = null;
+ trail.next = p.next;
+ if (last == p)
+ last = trail;
+ if (count.getAndDecrement() == capacity)
+ notFull.signal();
+ }
+
+ /**
+ * Removes a single instance of the specified element from this queue,
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
+ * elements.
+ * Returns {@code true} if this queue contained the specified element
+ * (or equivalently, if this queue changed as a result of the call).
+ *
+ * @param o element to be removed from this queue, if present
+ * @return {@code true} if this queue changed as a result of the call
+ */
+ public boolean remove(Object o) {
+ if (o == null) return false;
+ fullyLock();
+ try {
+ for (Node<E> trail = head, p = trail.next;
+ p != null;
+ trail = p, p = p.next) {
+ if (o.equals(p.item)) {
+ unlink(p, trail);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Returns {@code true} if this queue contains the specified element.
+ * More formally, returns {@code true} if and only if this queue contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
+ *
+ * @param o object to be checked for containment in this queue
+ * @return {@code true} if this queue contains the specified element
+ */
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ fullyLock();
+ try {
+ for (Node<E> p = head.next; p != null; p = p.next)
+ if (o.equals(p.item))
+ return true;
+ return false;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Returns an array containing all of the elements in this queue, in
+ * proper sequence.
+ *
+ * <p>The returned array will be "safe" in that no references to it are
+ * maintained by this queue. (In other words, this method must allocate
+ * a new array). The caller is thus free to modify the returned array.
+ *
+ * <p>This method acts as bridge between array-based and collection-based
+ * APIs.
+ *
+ * @return an array containing all of the elements in this queue
+ */
+ public Object[] toArray() {
+ fullyLock();
+ try {
+ int size = count.get();
+ Object[] a = new Object[size];
+ int k = 0;
+ for (Node<E> p = head.next; p != null; p = p.next)
+ a[k++] = p.item;
+ return a;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Returns an array containing all of the elements in this queue, in
+ * proper sequence; the runtime type of the returned array is that of
+ * the specified array. If the queue fits in the specified array, it
+ * is returned therein. Otherwise, a new array is allocated with the
+ * runtime type of the specified array and the size of this queue.
+ *
+ * <p>If this queue fits in the specified array with room to spare
+ * (i.e., the array has more elements than this queue), the element in
+ * the array immediately following the end of the queue is set to
+ * {@code null}.
+ *
+ * <p>Like the {@link #toArray()} method, this method acts as bridge between
+ * array-based and collection-based APIs. Further, this method allows
+ * precise control over the runtime type of the output array, and may,
+ * under certain circumstances, be used to save allocation costs.
+ *
+ * <p>Suppose {@code x} is a queue known to contain only strings.
+ * The following code can be used to dump the queue into a newly
+ * allocated array of {@code String}:
+ *
+ * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
+ *
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
+ *
+ * @param a the array into which the elements of the queue are to
+ * be stored, if it is big enough; otherwise, a new array of the
+ * same runtime type is allocated for this purpose
+ * @return an array containing all of the elements in this queue
+ * @throws ArrayStoreException if the runtime type of the specified array
+ * is not a supertype of the runtime type of every element in
+ * this queue
+ * @throws NullPointerException if the specified array is null
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T[] toArray(T[] a) {
+ fullyLock();
+ try {
+ int size = count.get();
+ if (a.length < size)
+ a = (T[])java.lang.reflect.Array.newInstance
+ (a.getClass().getComponentType(), size);
+
+ int k = 0;
+ for (Node<E> p = head.next; p != null; p = p.next)
+ a[k++] = (T)p.item;
+ if (a.length > k)
+ a[k] = null;
+ return a;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ public String toString() {
+ fullyLock();
+ try {
+ Node<E> p = head.next;
+ if (p == null)
+ return "[]";
+
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (;;) {
+ E e = p.item;
+ sb.append(e == this ? "(this Collection)" : e);
+ p = p.next;
+ if (p == null)
+ return sb.append(']').toString();
+ sb.append(',').append(' ');
+ }
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Atomically removes all of the elements from this queue.
+ * The queue will be empty after this call returns.
+ */
+ public void clear() {
+ fullyLock();
+ try {
+ for (Node<E> p, h = head; (p = h.next) != null; h = p) {
+ h.next = h;
+ p.item = null;
+ }
+ head = last;
+ // assert head.item == null && head.next == null;
+ if (count.getAndSet(0) == capacity)
+ notFull.signal();
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * @throws UnsupportedOperationException {@inheritDoc}
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection<? super E> c) {
+ return drainTo(c, Integer.MAX_VALUE);
+ }
+
+ /**
+ * @throws UnsupportedOperationException {@inheritDoc}
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ if (maxElements <= 0)
+ return 0;
+ boolean signalNotFull = false;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ int n = Math.min(maxElements, count.get());
+ // count.get provides visibility to first n Nodes
+ Node<E> h = head;
+ int i = 0;
+ try {
+ while (i < n) {
+ Node<E> p = h.next;
+ c.add(p.item);
+ p.item = null;
+ h.next = h;
+ h = p;
+ ++i;
+ }
+ return n;
+ } finally {
+ // Restore invariants even if c.add() threw
+ if (i > 0) {
+ // assert h.item == null;
+ head = h;
+ signalNotFull = (count.getAndAdd(-i) == capacity);
+ }
+ }
+ } finally {
+ takeLock.unlock();
+ if (signalNotFull)
+ signalNotFull();
+ }
+ }
+
+ /**
+ * Returns an iterator over the elements in this queue in proper sequence.
+ * The elements will be returned in order from first (head) to last (tail).
+ *
+ * <p>The returned iterator is a "weakly consistent" iterator that
+ * will never throw {@link java.util.ConcurrentModificationException
+ * ConcurrentModificationException}, and guarantees to traverse
+ * elements as they existed upon construction of the iterator, and
+ * may (but is not guaranteed to) reflect any modifications
+ * subsequent to construction.
+ *
+ * @return an iterator over the elements in this queue in proper sequence
+ */
+ public Iterator<E> iterator() {
+ return new Itr();
+ }
+
+ private class Itr implements Iterator<E> {
+ /*
+ * Basic weakly-consistent iterator. At all times hold the next
+ * item to hand out so that if hasNext() reports true, we will
+ * still have it to return even if lost race with a take etc.
+ */
+ private Node<E> current;
+ private Node<E> lastRet;
+ private E currentElement;
+
+ Itr() {
+ fullyLock();
+ try {
+ current = head.next;
+ if (current != null)
+ currentElement = current.item;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ /**
+ * Returns the next live successor of p, or null if no such.
+ *
+ * Unlike other traversal methods, iterators need to handle both:
+ * - dequeued nodes (p.next == p)
+ * - (possibly multiple) interior removed nodes (p.item == null)
+ */
+ private Node<E> nextNode(Node<E> p) {
+ for (;;) {
+ Node<E> s = p.next;
+ if (s == p)
+ return head.next;
+ if (s == null || s.item != null)
+ return s;
+ p = s;
+ }
+ }
+
+ public E next() {
+ fullyLock();
+ try {
+ if (current == null)
+ throw new NoSuchElementException();
+ E x = currentElement;
+ lastRet = current;
+ current = nextNode(current);
+ currentElement = (current == null) ? null : current.item;
+ return x;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ public void remove() {
+ if (lastRet == null)
+ throw new IllegalStateException();
+ fullyLock();
+ try {
+ Node<E> node = lastRet;
+ lastRet = null;
+ for (Node<E> trail = head, p = trail.next;
+ p != null;
+ trail = p, p = p.next) {
+ if (p == node) {
+ unlink(p, trail);
+ break;
+ }
+ }
+ } finally {
+ fullyUnlock();
+ }
+ }
+ }
+
+ /**
+ * Saves this queue to a stream (that is, serializes it).
+ *
+ * @serialData The capacity is emitted (int), followed by all of
+ * its elements (each an {@code Object}) in the proper order,
+ * followed by a null
+ */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+
+ fullyLock();
+ try {
+ // Write out any hidden stuff, plus capacity
+ s.defaultWriteObject();
+
+ // Write out all elements in the proper order.
+ for (Node<E> p = head.next; p != null; p = p.next)
+ s.writeObject(p.item);
+
+ // Use trailing null as sentinel
+ s.writeObject(null);
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Reconstitutes this queue from a stream (that is, deserializes it).
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ // Read in capacity, and any hidden stuff
+ s.defaultReadObject();
+
+ count.set(0);
+ last = head = new Node<E>(null);
+
+ // Read in all elements and place in queue
+ for (;;) {
+ @SuppressWarnings("unchecked")
+ E item = (E)s.readObject();
+ if (item == null)
+ break;
+ add(item);
+ }
+ }
+}
View
51 src/main/scala/com/twitter/libkestrel/JBlockingQueue.scala
@@ -0,0 +1,51 @@
+package com.twitter.libkestrel
+
+import com.twitter.util.{Future, Time}
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+
+final class JBlockingQueue[A <: AnyRef](maxItems: Long) extends BlockingQueue[A] {
+ val queue = new LinkedBlockingQueue[A](maxItems.toInt)
+
+ def put(item: A): Boolean = {
+ try {
+ queue.put(item)
+ } catch { case e => e.printStackTrace() }
+ true
+ }
+
+ def putHead(item: A) {
+ throw new RuntimeException("unsupported operation")
+ }
+
+ def size: Int = {
+ queue.size()
+ }
+
+ def get(): Future[Option[A]] = {
+ Future(Option(queue.poll()))
+ }
+
+ def get(deadline: Time): Future[Option[A]] = {
+ Future(Option(queue.poll((Time.now - deadline).inMilliseconds, TimeUnit.MILLISECONDS)))
+ }
+
+ def poll(): Future[Option[A]] = {
+ Future(Option(queue.peek()))
+ }
+
+ def pollIf(predicate: A => Boolean): Future[Option[A]] = {
+ throw new RuntimeException("unsupported operation")
+ }
+
+ def flush() {
+ queue.clear()
+ }
+
+ def toDebug: String = {
+ "<JBlockingQueue(%d) size %d>".format(maxItems, size)
+ }
+
+ def close() {
+ // nothing
+ }
+}
View
5 src/test/scala/com/twitter/libkestrel/load/LoadTesting.scala
@@ -37,6 +37,7 @@ trait LoadTesting {
case object Simple extends QueueType
case object Concurrent extends QueueType
case object Journaled extends QueueType
+ case object Java7 extends QueueType
}
var queueType: QueueType = QueueType.Concurrent
@@ -48,6 +49,7 @@ trait LoadTesting {
help(None, "help", "show this help screen")
opt("S", "simple", "use old simple synchronized-based queue", { queueType = QueueType.Simple; () })
opt("J", "journal", "use journaled queue in /tmp", { queueType = QueueType.Journaled; () })
+ opt("7", "java7", "use new java 7 LinkedBlockingQueue", { queueType = QueueType.Java7; () })
opt("L", "limit", "<items>", "limit total queue size (default: %d)".format(itemLimit), { x: String =>
itemLimit = x.toInt
})
@@ -74,6 +76,9 @@ trait LoadTesting {
)
), new File("/tmp"), javaTimer, scheduler).toBlockingQueue[String]
}
+ case QueueType.Java7 => {
+ new JBlockingQueue(itemLimit * 1000)
+ }
}
}

0 comments on commit 9619fc4

Please sign in to comment.