Skip to content

Commit

Permalink
Merge pull request #112 from f2prateek/object-queue-iterable
Browse files Browse the repository at this point in the history
Make ObjectQueue implement Iterable.
  • Loading branch information
f2prateek committed Sep 6, 2016
2 parents 14c0f9b + cdf41d9 commit 82699b2
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 74 deletions.
63 changes: 41 additions & 22 deletions tape/src/main/java/com/squareup/tape/FileObjectQueue.java
Expand Up @@ -6,14 +6,12 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static java.util.Collections.unmodifiableList;

/**
* Base queue class, implements common functionality for a QueueFile-backed
* queue manager. This class is not thread safe; instances should be kept
* queue manager. This class is not thread safe; instances should be kept
* thread-confined.
*
* @param <T> The type of elements in the queue.
Expand All @@ -25,7 +23,7 @@ public final class FileObjectQueue<T> extends ObjectQueue<T> implements Closeabl
private final DirectByteArrayOutputStream bytes = new DirectByteArrayOutputStream();
/** Keep file around for error reporting. */
private final File file;
final Converter<T> converter;
@Private final Converter<T> converter;
private Listener<T> listener;

public FileObjectQueue(File file, Converter<T> converter) throws IOException {
Expand Down Expand Up @@ -55,23 +53,6 @@ public File file() {
return converter.from(bytes);
}

/**
* Reads up to {@code max} entries from the head of the queue without removing the entries.
* If the queue's {@link #size()} is less than {@code max} then only {@link #size()} entries
* are read.
*/
@Override public List<T> peek(int max) throws IOException {
List<T> entries = new ArrayList<T>(queueFile.size() > max ? max : queueFile.size());
int count = 0;
for (byte[] data : queueFile) {
if (++count > max) {
break;
}
entries.add(converter.from(data));
}
return unmodifiableList(entries);
}

@Override public List<T> asList() throws IOException {
return peek(size());
}
Expand All @@ -98,6 +79,44 @@ public File file() {
this.listener = listener;
}

/**
* Returns an iterator over entries in this queue.
*
* <p>The iterator disallows modifications to the queue during iteration. Removing entries from
* the head of the queue is permitted during iteration using {@link Iterator#remove()}.
*
* <p>The iterator may throw an unchecked {@link RuntimeException} during {@link Iterator#next()}
* or {@link Iterator#remove()}.
*/
@Override public Iterator<T> iterator() {
return new QueueFileIterator(queueFile.iterator());
}

private final class QueueFileIterator implements Iterator<T> {
final Iterator<byte[]> iterator;

@Private QueueFileIterator(Iterator<byte[]> iterator) {
this.iterator = iterator;
}

@Override public boolean hasNext() {
return iterator.hasNext();
}

@Override public T next() {
byte[] data = iterator.next();
try {
return converter.from(data);
} catch (IOException e) {
throw new RuntimeException("todo: throw a proper error", e);
}
}

@Override public void remove() {
iterator.remove();
}
}

/**
* Convert a byte stream to and from a concrete type.
*
Expand Down
88 changes: 74 additions & 14 deletions tape/src/main/java/com/squareup/tape/InMemoryObjectQueue.java
Expand Up @@ -2,12 +2,10 @@
package com.squareup.tape;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.NoSuchElementException;

/**
* A queue for objects that are not serious enough to be written to disk. Objects in this queue
Expand All @@ -16,8 +14,14 @@
* @param <T> The type of elements in the queue.
*/
public final class InMemoryObjectQueue<T> extends ObjectQueue<T> {
// LinkedList can be used both as a List and Queue.
private final LinkedList<T> entries;
// LinkedList can be used both as a List (for get(n)) and Queue (for peek() and remove()).
@Private final LinkedList<T> entries;
/**
* The number of times this file has been structurally modified — it is incremented during {@link
* #remove(int)} and {@link #add(Object)}. Used by {@link InMemoryObjectQueue.EntryIterator} to
* guard against concurrent modification.
*/
@Private int modCount = 0;
private Listener<T> listener;

@SuppressWarnings("unchecked")
Expand All @@ -26,6 +30,7 @@ public InMemoryObjectQueue() {
}

@Override public void add(T entry) {
modCount++;
entries.add(entry);
if (listener != null) listener.onAdd(this, entry);
}
Expand All @@ -34,17 +39,12 @@ public InMemoryObjectQueue() {
return entries.peek();
}

@Override public List<T> peek(int max) throws IOException {
int end = Math.min(max, entries.size());
List<T> subList = entries.subList(0, end);
return Collections.unmodifiableList(subList);
}

@Override public int size() {
return entries.size();
}

@Override public void remove(int n) throws IOException {
modCount++;
for (int i = 0; i < n; i++) {
entries.remove();
if (listener != null) listener.onRemove(this);
Expand All @@ -59,4 +59,64 @@ public InMemoryObjectQueue() {
}
this.listener = listener;
}
}

/**
* Returns an iterator over entries in this queue.
*
* <p>The iterator disallows modifications to the queue during iteration. Removing entries from
* the head of the queue is permitted during iteration using{@link Iterator#remove()}.
*/
@Override public Iterator<T> iterator() {
return new EntryIterator();
}

private final class EntryIterator implements Iterator<T> {
/** Index of element to be returned by subsequent call to next. */
int nextElementIndex = 0;

/**
* The {@link #modCount} value that the iterator believes that the backing QueueFile should
* have. If this expectation is violated, the iterator has detected concurrent modification.
*/
int expectedModCount = modCount;

@Private EntryIterator() {

}

@Override public boolean hasNext() {
checkForComodification();

return nextElementIndex != size();
}

@Override public T next() {
checkForComodification();

if (size() == 0) throw new NoSuchElementException();
return entries.get(nextElementIndex++);
}

@Override public void remove() {
checkForComodification();

if (size() == 0) throw new NoSuchElementException();
if (nextElementIndex != 1) {
throw new UnsupportedOperationException("Removal is only permitted from the head.");
}

try {
InMemoryObjectQueue.this.remove();
} catch (IOException e) {
throw new RuntimeException("todo: throw a proper error", e);
}

expectedModCount = modCount;
nextElementIndex--;
}

private void checkForComodification() {
if (modCount != expectedModCount) throw new ConcurrentModificationException();
}
}
}
15 changes: 13 additions & 2 deletions tape/src/main/java/com/squareup/tape/ObjectQueue.java
Expand Up @@ -2,14 +2,17 @@
package com.squareup.tape;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

/**
* A queue of objects.
*
* @param <T> The type of queue for the elements.
*/
public abstract class ObjectQueue<T> {
public abstract class ObjectQueue<T> implements Iterable<T> {

/** Returns the number of entries in the queue. */
abstract int size();
Expand All @@ -28,7 +31,15 @@ public abstract class ObjectQueue<T> {
* If the queue's {@link #size()} is less than {@code max} then only {@link #size()} entries
* are read.
*/
abstract List<T> peek(int max) throws IOException;
List<T> peek(int max) throws IOException {
int end = Math.min(max, size());
List<T> subList = new ArrayList<T>(end);
Iterator<T> iterator = iterator();
for (int i = 0; i < end; i++) {
subList.add(iterator.next());
}
return Collections.unmodifiableList(subList);
}

/** Returns the entries in the queue as an unmodifiable {@link List}.*/
List<T> asList() throws IOException {
Expand Down
36 changes: 1 addition & 35 deletions tape/src/main/java/com/squareup/tape/QueueFile.java
Expand Up @@ -450,8 +450,7 @@ private final class ElementIterator implements Iterator<byte[]> {
*/
int expectedModCount = modCount;

ElementIterator() {
// Prevent synthetic accessor method from being generated.
@Private ElementIterator() {
}

private void checkForComodification() {
Expand Down Expand Up @@ -506,39 +505,6 @@ private void checkForComodification() {
}
}

private final class ElementInputStream extends InputStream {
private int position;
private int remaining;

ElementInputStream(Element element) {
position = wrapPosition(element.position + Element.HEADER_LENGTH);
remaining = element.length;
}

@Override public int read(byte[] buffer, int offset, int length) throws IOException {
if ((offset | length) < 0 || length > buffer.length - offset) {
throw new ArrayIndexOutOfBoundsException();
}
if (remaining == 0) {
return -1;
}
if (length > remaining) length = remaining;
ringRead(position, buffer, offset, length);
position = wrapPosition(position + length);
remaining -= length;
return length;
}

@Override public int read() throws IOException {
if (remaining == 0) return -1;
raf.seek(position);
int b = raf.read();
position = wrapPosition(position + 1);
remaining--;
return b;
}
}

/** Returns the number of elements in this queue. */
public int size() {
return elementCount;
Expand Down

0 comments on commit 82699b2

Please sign in to comment.