Skip to content

Commit

Permalink
Remove pooling of FileChannel and use only non-stateful operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkreps committed Jun 15, 2009
1 parent d28dc46 commit 62f6228
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 33 deletions.
40 changes: 24 additions & 16 deletions src/java/voldemort/store/readonly/ChunkedFileSet.java
Expand Up @@ -18,6 +18,12 @@
import voldemort.store.PersistenceFailureException;
import voldemort.utils.Utils;

/**
* A set of chunked data and index files for a read-only store
*
* @author jay
*
*/
public class ChunkedFileSet {

private static Logger logger = Logger.getLogger(ChunkedFileSet.class);
Expand All @@ -29,7 +35,7 @@ public class ChunkedFileSet {
private final List<Integer> indexFileSizes;
private final List<Integer> dataFileSizes;
private final List<BlockingQueue<MappedByteBuffer>> indexFiles;
private final List<BlockingQueue<FileChannel>> dataFiles;
private final List<FileChannel> dataFiles;

public ChunkedFileSet(File directory, int numBuffersPerChunk, long bufferWaitTimeoutMs) {
this.baseDir = directory;
Expand All @@ -41,7 +47,7 @@ public ChunkedFileSet(File directory, int numBuffersPerChunk, long bufferWaitTim
this.indexFileSizes = new ArrayList<Integer>();
this.dataFileSizes = new ArrayList<Integer>();
this.indexFiles = new ArrayList<BlockingQueue<MappedByteBuffer>>();
this.dataFiles = new ArrayList<BlockingQueue<FileChannel>>();
this.dataFiles = new ArrayList<FileChannel>();

// if the directory is empty create empty files
if(baseDir.list() != null && baseDir.list().length == 0) {
Expand All @@ -64,19 +70,25 @@ public ChunkedFileSet(File directory, int numBuffersPerChunk, long bufferWaitTim
else if(index.exists() ^ data.exists())
throw new VoldemortException("One of the following does not exist: "
+ index.toString() + " and " + data.toString() + ".");

/* Deal with file sizes */
long indexLength = index.length();
long dataLength = data.length();
validateFileSizes(indexLength, dataLength);
indexFileSizes.add((int) indexLength);
dataFileSizes.add((int) dataLength);

/* Add the file channel for data */
dataFiles.add(openChannel(data));

/*
* Add multiple MappedByteBuffers for the index since we cannot
* share handles
*/
BlockingQueue<MappedByteBuffer> indexFds = new ArrayBlockingQueue<MappedByteBuffer>(numBuffersPerChunk);
BlockingQueue<FileChannel> dataFds = new ArrayBlockingQueue<FileChannel>(numBuffersPerChunk);
for(int i = 0; i < numBuffersPerChunk; i++) {
for(int i = 0; i < numBuffersPerChunk; i++)
indexFds.add(mapFile(index));
dataFds.add(openChannel(data));
}
indexFiles.add(indexFds);
dataFiles.add(dataFds);
chunkId++;
}
if(chunkId == 0)
Expand All @@ -103,7 +115,7 @@ public void close() {
for(int chunk = 0; chunk < this.numChunks; chunk++) {
for(int i = 0; i < this.numBuffersPerChunk; i++) {
checkoutIndexFile(chunk);
FileChannel channel = checkoutDataFile(chunk);
FileChannel channel = getDataFile(chunk);
try {
channel.close();
} catch(IOException e) {
Expand Down Expand Up @@ -144,16 +156,12 @@ public MappedByteBuffer checkoutIndexFile(int chunk) {
return checkout(indexFiles.get(chunk));
}

public void checkinIndexFile(MappedByteBuffer mmap, int chunk) {
checkin(mmap, indexFiles.get(chunk));
}

public FileChannel checkoutDataFile(int chunk) {
return checkout(dataFiles.get(chunk));
public void checkinIndexFile(MappedByteBuffer file, int chunk) {
checkin(file, indexFiles.get(chunk));
}

public void checkinDataFile(FileChannel channel, int chunk) {
checkin(channel, dataFiles.get(chunk));
public FileChannel getDataFile(int chunk) {
return dataFiles.get(chunk);
}

public int getIndexFileSize(int chunk) {
Expand Down
23 changes: 6 additions & 17 deletions src/java/voldemort/store/readonly/ReadOnlyStorageEngine.java
Expand Up @@ -332,19 +332,16 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys)
}

private byte[] readValue(int chunk, int valueLocation) {
FileChannel data = fileSet.checkoutDataFile(chunk);
FileChannel dataFile = fileSet.getDataFile(chunk);
try {
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
data.read(sizeBuffer, valueLocation);
sizeBuffer.position(0);
int size = sizeBuffer.getInt();
dataFile.read(sizeBuffer, valueLocation);
int size = sizeBuffer.getInt(0);
ByteBuffer valueBuffer = ByteBuffer.allocate(size);
data.read(valueBuffer, valueLocation + 4);
dataFile.read(valueBuffer, valueLocation + 4);
return valueBuffer.array();
} catch(IOException e) {
throw new VoldemortException(e);
} finally {
fileSet.checkinDataFile(data, chunk);
}
}

Expand Down Expand Up @@ -390,19 +387,11 @@ private int getValueLocation(int chunk, byte[] keyMd5) {
* Read the key, potentially from the cache
*/
private byte[] readKey(MappedByteBuffer index, int indexByteOffset, byte[] foundKey) {
readFrom(index, indexByteOffset, foundKey);
index.position(indexByteOffset);
index.get(foundKey);
return foundKey;
}

/*
* Seek to the given object and read into the buffer exactly buffer.length
* bytes
*/
private static void readFrom(MappedByteBuffer file, int indexByteOffset, byte[] buffer) {
file.position(indexByteOffset);
file.get(buffer);
}

/**
* Not supported, throws UnsupportedOperationException if called
*/
Expand Down

0 comments on commit 62f6228

Please sign in to comment.