Permalink
Browse files

Switch to using FileChannel instead of MappedByteBuffer for .data files.

  • Loading branch information...
1 parent 2efb591 commit d28dc46352f608e6fcef190cf606ea2342042ceb @jkreps jkreps committed Jun 15, 2009
@@ -12,20 +12,24 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
import voldemort.VoldemortException;
import voldemort.store.PersistenceFailureException;
import voldemort.utils.Utils;
public class ChunkedFileSet {
+ private static Logger logger = Logger.getLogger(ChunkedFileSet.class);
+
private final int numChunks;
private final int numBuffersPerChunk;
private final File baseDir;
private final long bufferWaitTimeoutMs;
private final List<Integer> indexFileSizes;
private final List<Integer> dataFileSizes;
private final List<BlockingQueue<MappedByteBuffer>> indexFiles;
- private final List<BlockingQueue<MappedByteBuffer>> dataFiles;
+ private final List<BlockingQueue<FileChannel>> dataFiles;
public ChunkedFileSet(File directory, int numBuffersPerChunk, long bufferWaitTimeoutMs) {
this.baseDir = directory;
@@ -37,13 +41,14 @@ public ChunkedFileSet(File directory, int numBuffersPerChunk, long bufferWaitTim
this.indexFileSizes = new ArrayList<Integer>();
this.dataFileSizes = new ArrayList<Integer>();
this.indexFiles = new ArrayList<BlockingQueue<MappedByteBuffer>>();
- this.dataFiles = new ArrayList<BlockingQueue<MappedByteBuffer>>();
+ this.dataFiles = new ArrayList<BlockingQueue<FileChannel>>();
// if the directory is empty create empty files
if(baseDir.list() != null && baseDir.list().length == 0) {
try {
new File(baseDir, "0.index").createNewFile();
new File(baseDir, "0.data").createNewFile();
+ logger.info("No index or data files found, creating empty files 0.index and 0.data.");
} catch(IOException e) {
throw new VoldemortException("Error creating empty read-only files.", e);
}
@@ -65,10 +70,10 @@ else if(index.exists() ^ data.exists())
indexFileSizes.add((int) indexLength);
dataFileSizes.add((int) dataLength);
BlockingQueue<MappedByteBuffer> indexFds = new ArrayBlockingQueue<MappedByteBuffer>(numBuffersPerChunk);
- BlockingQueue<MappedByteBuffer> dataFds = new ArrayBlockingQueue<MappedByteBuffer>(numBuffersPerChunk);
+ BlockingQueue<FileChannel> dataFds = new ArrayBlockingQueue<FileChannel>(numBuffersPerChunk);
for(int i = 0; i < numBuffersPerChunk; i++) {
indexFds.add(mapFile(index));
- dataFds.add(mapFile(data));
+ dataFds.add(openChannel(data));
}
indexFiles.add(indexFds);
dataFiles.add(dataFds);
@@ -98,11 +103,24 @@ public void close() {
for(int chunk = 0; chunk < this.numChunks; chunk++) {
for(int i = 0; i < this.numBuffersPerChunk; i++) {
checkoutIndexFile(chunk);
- checkoutDataFile(chunk);
+ FileChannel channel = checkoutDataFile(chunk);
+ try {
+ channel.close();
+ } catch(IOException e) {
+ logger.error("Error while closing file.", e);
+ }
}
}
}
+ private FileChannel openChannel(File file) {
+ try {
+ return new FileInputStream(file).getChannel();
+ } catch(IOException e) {
+ throw new VoldemortException(e);
+ }
+ }
+
private MappedByteBuffer mapFile(File file) {
try {
FileChannel channel = new FileInputStream(file).getChannel();
@@ -123,19 +141,19 @@ public int getChunkForKey(byte[] key) {
}
public MappedByteBuffer checkoutIndexFile(int chunk) {
- return checkoutFile(indexFiles.get(chunk));
+ return checkout(indexFiles.get(chunk));
}
public void checkinIndexFile(MappedByteBuffer mmap, int chunk) {
- checkinFile(mmap, indexFiles.get(chunk));
+ checkin(mmap, indexFiles.get(chunk));
}
- public MappedByteBuffer checkoutDataFile(int chunk) {
- return checkoutFile(dataFiles.get(chunk));
+ public FileChannel checkoutDataFile(int chunk) {
+ return checkout(dataFiles.get(chunk));
}
- public void checkinDataFile(MappedByteBuffer mmap, int chunk) {
- checkinFile(mmap, dataFiles.get(chunk));
+ public void checkinDataFile(FileChannel channel, int chunk) {
+ checkin(channel, dataFiles.get(chunk));
}
public int getIndexFileSize(int chunk) {
@@ -146,22 +164,22 @@ public int getDataFileSize(int chunk) {
return this.indexFileSizes.get(chunk);
}
- private void checkinFile(MappedByteBuffer map, BlockingQueue<MappedByteBuffer> mmaps) {
+ private <T> void checkin(T item, BlockingQueue<T> items) {
try {
- mmaps.put(map);
+ items.put(item);
} catch(InterruptedException e) {
throw new VoldemortException("Interrupted while waiting for file to checking.");
}
}
- private MappedByteBuffer checkoutFile(BlockingQueue<MappedByteBuffer> mmaps) {
+ private <T> T checkout(BlockingQueue<T> pool) {
try {
- MappedByteBuffer map = mmaps.poll(bufferWaitTimeoutMs, TimeUnit.MILLISECONDS);
- if(map == null)
+ T item = pool.poll(bufferWaitTimeoutMs, TimeUnit.MILLISECONDS);
+ if(item == null)
throw new VoldemortException("Timeout after waiting for " + bufferWaitTimeoutMs
+ " ms to acquire file descriptor");
else
- return map;
+ return item;
} catch(InterruptedException e) {
throw new PersistenceFailureException("Interrupted while waiting for file descriptor.",
e);
@@ -18,7 +18,9 @@
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
@@ -330,13 +332,17 @@ private void shiftBackupsRight(int beginShift) {
}
private byte[] readValue(int chunk, int valueLocation) {
- MappedByteBuffer data = fileSet.checkoutDataFile(chunk);
+ FileChannel data = fileSet.checkoutDataFile(chunk);
try {
- data.position(valueLocation);
- int size = data.getInt();
- byte[] value = new byte[size];
- data.get(value);
- return value;
+ ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+ data.read(sizeBuffer, valueLocation);
+ sizeBuffer.position(0);
+ int size = sizeBuffer.getInt();
+ ByteBuffer valueBuffer = ByteBuffer.allocate(size);
+ data.read(valueBuffer, valueLocation + 4);
+ return valueBuffer.array();
+ } catch(IOException e) {
+ throw new VoldemortException(e);
} finally {
fileSet.checkinDataFile(data, chunk);
}
Oops, something went wrong.

0 comments on commit d28dc46

Please sign in to comment.