Permalink
Browse files

Merge pull request #8 from square/atomic-clear

Write header before truncating QueueFile
  • Loading branch information...
2 parents 826721c + bcab3a2 commit 8143ef640adafc6dcb5e2d1f7380d7f2d3a4d24c @crazybob crazybob committed Sep 2, 2011
@@ -1,14 +1,17 @@
// Copyright 2010 Square, Inc.
package retrofit.io;
-import junit.framework.ComparisonFailure;
-import junit.framework.TestCase;
-
-import java.io.*;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.logging.Logger;
+import junit.framework.ComparisonFailure;
+import junit.framework.TestCase;
/**
* Tests for QueueFile.
@@ -206,15 +209,15 @@ public void testFailedExpansion() throws IOException {
assertEquals(values[99], queueFile.peek());
}
- public void testPeakWithElementReader() throws IOException {
+ public void testPeekWithElementReader() throws IOException {
QueueFile queueFile = new QueueFile(file);
final byte[] a = {1, 2};
queueFile.add(a);
final byte[] b = {3, 4, 5};
queueFile.add(b);
queueFile.peek(new QueueFile.ElementReader() {
- public void read(InputStream in, int length) throws IOException {
+ @Override public void read(InputStream in, int length) throws IOException {
assertEquals(length, 2);
byte[] actual = new byte[length];
in.read(actual);
@@ -223,7 +226,7 @@ public void read(InputStream in, int length) throws IOException {
});
queueFile.peek(new QueueFile.ElementReader() {
- public void read(InputStream in, int length) throws IOException {
+ @Override public void read(InputStream in, int length) throws IOException {
assertEquals(length, 2);
assertEquals(1, in.read());
assertEquals(2, in.read());
@@ -234,7 +237,7 @@ public void read(InputStream in, int length) throws IOException {
queueFile.remove();
queueFile.peek(new QueueFile.ElementReader() {
- public void read(InputStream in, int length) throws IOException {
+ @Override public void read(InputStream in, int length) throws IOException {
assertEquals(length, 3);
byte[] actual = new byte[length];
in.read(actual);
@@ -256,7 +259,7 @@ public void testForEach() throws IOException {
final int[] iteration = new int[]{0};
QueueFile.ElementReader elementReader = new QueueFile.ElementReader() {
- public void read(InputStream in, int length) throws IOException {
+ @Override public void read(InputStream in, int length) throws IOException {
if (iteration[0] == 0) {
assertEquals(length, 2);
byte[] actual = new byte[length];
@@ -15,7 +15,11 @@
*/
package retrofit.io;
-import java.io.*;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.NoSuchElementException;
import java.util.logging.Level;
@@ -358,17 +362,15 @@ private void expandIfNecessary(int dataLength) throws IOException {
previousLength = newLength;
} while (remainingBytes < elementLength);
- // Set new file length (considered metadata) and sync it to storage.
- raf.setLength(newLength);
- FileChannel channel = raf.getChannel();
- channel.force(true);
+ setLength(newLength);
// Calculate the position of the tail end of the data in the ring buffer
int endOfLastElement = wrapPosition(
last.position + Element.HEADER_LENGTH + last.length);
// If the buffer is split, we need to make it contiguous
if (endOfLastElement < first.position) {
+ FileChannel channel = raf.getChannel();
channel.position(fileLength); // destination position
int count = endOfLastElement - Element.HEADER_LENGTH;
if (channel.transferTo(HEADER_LENGTH, count, channel) != count) {
@@ -388,6 +390,13 @@ private void expandIfNecessary(int dataLength) throws IOException {
fileLength = newLength;
}
+ /** Sets the length of the file. */
+ private void setLength(int newLength) throws IOException {
+ // Set new file length (considered metadata) and sync it to storage.
+ raf.setLength(newLength);
+ raf.getChannel().force(true);
+ }
+
/** Reads the eldest element. Returns null if the queue is empty. */
public synchronized byte[] peek() throws IOException {
if (isEmpty()) return null;
@@ -428,11 +437,6 @@ public synchronized void forEach(ElementReader reader) throws IOException {
return t;
}
- /** Returns true if the two possibly objects are equal. */
- private static <T> boolean equal(T a, T b) {
- return a == b || a != null && a.equals(b);
- }
-
/** Reads a single element. */
private class ElementInputStream extends InputStream {
private int position;
@@ -495,11 +499,11 @@ public synchronized void remove() throws IOException {
/** Clears this queue. Truncates the file to the initial size. */
public synchronized void clear() throws IOException {
- if (fileLength > INITIAL_LENGTH) raf.setLength(INITIAL_LENGTH);
writeHeader(INITIAL_LENGTH, 0, 0, 0);
elementCount = 0;
first = last = Element.NULL;
fileLength = INITIAL_LENGTH;
+ if (fileLength > INITIAL_LENGTH) setLength(INITIAL_LENGTH);
}
/** Closes the underlying file. */
@@ -519,7 +523,7 @@ public synchronized void close() throws IOException {
forEach(new ElementReader() {
boolean first = true;
- public void read(InputStream in, int length) throws IOException {
+ @Override public void read(InputStream in, int length) throws IOException {
if (first) {
first = false;
} else {

0 comments on commit 8143ef6

Please sign in to comment.