Permalink
Browse files

Implemented a jna mlock to map and ping index files for RO stores in

memory
  • Loading branch information...
1 parent 349f852 commit 4db8cc30a7f50479ced89992b2204c56fc77d5bf @abh1nay abh1nay committed Nov 19, 2012
View
@@ -53,9 +53,9 @@
<classpathentry kind="lib" path="lib/snappy-0.2.jar"/>
<classpathentry kind="lib" path="lib/httpclient-4.1.2.jar"/>
<classpathentry kind="lib" path="lib/httpcore-4.1.2.jar"/>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="lib" path="lib/joda-time-1.6.jar"/>
<classpathentry kind="lib" path="lib/mail-1.4.1.jar"/>
<classpathentry kind="lib" path="lib/azkaban-common-0.05.jar"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
@@ -2,6 +2,6 @@ Manifest-Version: 1.0
Ant-Version: Apache Ant 1.7.1
Created-By: 20.1-b02 (Sun Microsystems Inc.)
Implementation-Title: Voldemort
-Implementation-Version: 1.1.3
+Implementation-Version: 1.1.4
Implementation-Vendor: LinkedIn
@@ -35,5 +35,23 @@
<schema-info version="1">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }, { "name": "new-field", "type": "string", "default":"" }]}</schema-info>
</value-serializer>
</store>
-
+ <store>
+ <name>anagpal-test-old</name>
+ <persistence>read-only</persistence>
+ <description>"test store"</description>
+ <owners>anagpal@linkedin.com</owners>
+ <routing-strategy>consistent-routing</routing-strategy>
+ <routing>client</routing>
+ <replication-factor>1</replication-factor>
+ <required-reads>1</required-reads>
+ <required-writes>1</required-writes>
+ <key-serializer>
+ <type>json</type>
+ <schema-info version="0">"string"</schema-info>
+ </key-serializer>
+ <value-serializer>
+ <type>json</type>
+ <schema-info version="0">{"cnt":"int32", "country":"string"}</schema-info>
+ </value-serializer>
+ </store>
</stores>
@@ -22,6 +22,7 @@
import voldemort.store.readonly.ReadOnlyStorageFormat;
import voldemort.store.readonly.ReadOnlyStorageMetadata;
import voldemort.store.readonly.ReadOnlyUtils;
+import voldemort.store.readonly.io.MappedFileReader;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Pair;
@@ -45,6 +46,8 @@
private final List<Integer> indexFileSizes;
private final List<Integer> dataFileSizes;
private final List<MappedByteBuffer> indexFiles;
+
+ private List<MappedFileReader> mappedIndexFileReader;
private final List<FileChannel> dataFiles;
private final HashMap<Object, Integer> chunkIdToChunkStart;
private final HashMap<Object, Integer> chunkIdToNumChunks;
@@ -76,6 +79,8 @@ public ChunkedFileSet(File directory, RoutingStrategy routingStrategy, int nodeI
this.indexFileSizes = new ArrayList<Integer>();
this.dataFileSizes = new ArrayList<Integer>();
this.indexFiles = new ArrayList<MappedByteBuffer>();
+ this.mappedIndexFileReader = new ArrayList<MappedFileReader>();
+
this.dataFiles = new ArrayList<FileChannel>();
this.chunkIdToChunkStart = new HashMap<Object, Integer>();
this.chunkIdToNumChunks = new HashMap<Object, Integer>();
@@ -148,7 +153,18 @@ else if(index.exists() ^ data.exists())
/* Add the file channel for data */
dataFiles.add(openChannel(data));
- indexFiles.add(mapFile(index));
+
+ MappedFileReader idxFileReader = null;
+ try {
+ idxFileReader = new MappedFileReader(index);
+ mappedIndexFileReader.add(idxFileReader);
+ indexFiles.add(idxFileReader.map());
+ } catch(IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // indexFiles.add(mapFile(index));
chunkId++;
}
if(chunkId == 0)
@@ -200,7 +216,18 @@ public void initVersion1() {
/* Add the file channel for data */
dataFiles.add(openChannel(data));
- indexFiles.add(mapFile(index));
+
+ MappedFileReader idxFileReader = null;
+ try {
+ idxFileReader = new MappedFileReader(index);
+ mappedIndexFileReader.add(idxFileReader);
+ indexFiles.add(idxFileReader.map());
+ } catch(IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // indexFiles.add(mapFile(index));
chunkId++;
globalChunkId++;
}
@@ -282,7 +309,18 @@ public void initVersion2() {
/* Add the file channel for data */
dataFiles.add(openChannel(data));
- indexFiles.add(mapFile(index));
+
+ MappedFileReader idxFileReader = null;
+ try {
+ idxFileReader = new MappedFileReader(index);
+ mappedIndexFileReader.add(idxFileReader);
+ indexFiles.add(idxFileReader.map());
+ } catch(IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // indexFiles.add(mapFile(index));
chunkId++;
globalChunkId++;
}
@@ -348,6 +386,14 @@ public void close() {
} catch(IOException e) {
logger.error("Error while closing file.", e);
}
+
+ MappedFileReader idxFileReader = mappedIndexFileReader.get(chunk);
+ try {
+ idxFileReader.close();
+ } catch(IOException e) {
+
+ logger.error("Error while closing file.", e);
+ }
}
}
@@ -0,0 +1,83 @@
+package voldemort.store.readonly.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class BaseCloser<T> {
+
+ protected List<T> delegates = new ArrayList();
+
+ private Throwable cause = null;
+
+ private boolean executed = false;
+
+ public BaseCloser() {}
+
+ public BaseCloser(List<T> delegates) {
+ this.delegates = delegates;
+ }
+
+ public BaseCloser(T... delegates) {
+ add(delegates);
+ }
+
+ public void add(T delegate) {
+ delegates.add(delegate);
+ }
+
+ public void add(T... delegates) {
+ for(T current: delegates) {
+ this.delegates.add(current);
+ }
+ }
+
+ public void setCause(Throwable cause) {
+ this.cause = cause;
+ }
+
+ protected boolean executed() {
+ return executed;
+ }
+
+ protected void exec() throws GroupIOException {
+
+ if(executed)
+ return;
+
+ GroupIOException exc = null;
+
+ if(cause != null)
+ exc = new GroupIOException(cause);
+
+ for(T current: delegates) {
+
+ if(current == null)
+ continue;
+
+ try {
+
+ onDelegate(current);
+
+ } catch(Throwable t) {
+
+ if(exc == null) {
+ exc = new GroupIOException(t);
+ } else {
+ exc.addSuppressed(t);
+ }
+
+ }
+
+ }
+
+ executed = true;
+
+ if(exc != null)
+ throw exc;
+
+ }
+
+ protected abstract void onDelegate(T delegate) throws IOException;
+
+}
@@ -0,0 +1,42 @@
+package voldemort.store.readonly.io;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+/**
+ *
+ */
+public class BaseMappedFile {
+
+ protected FileChannel channel;
+
+ protected long offset = 0;
+
+ protected long length = 0;
+
+ protected Closer closer = new Closer();
+
+ protected File file;
+
+ protected int fd;
+
+ protected boolean fadvise = true;
+
+ public File getFile() {
+ return file;
+ }
+
+ public int getFd() {
+ return fd;
+ }
+
+ public boolean isClosed() {
+ return closer.isClosed();
+ }
+
+ @Override
+ public String toString() {
+ return file.toString();
+ }
+
+}
@@ -0,0 +1,29 @@
+package voldemort.store.readonly.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A closeable which is smart enough to work on byte buffers.
+ */
+public class ByteBufferCloser implements Closeable {
+
+ private ByteBuffer buff;
+
+ public ByteBufferCloser(ByteBuffer buff) {
+ this.buff = buff;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ sun.misc.Cleaner cl = ((sun.nio.ch.DirectBuffer) buff).cleaner();
+
+ if(cl != null) {
+ cl.clean();
+ }
+
+ }
+
+}
@@ -0,0 +1,36 @@
+package voldemort.store.readonly.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+public class Closer extends BaseCloser<Closeable> implements Closeable {
+
+ public Closer() {}
+
+ public Closer(List delegates) {
+ this.delegates = (List<Closeable>) delegates;
+ }
+
+ public Closer(Closeable... delegates) {
+ add(delegates);
+ }
+
+ @Override
+ public void close() throws IOException {
+ exec();
+ }
+
+ public boolean closed() {
+ return executed();
+ }
+
+ public boolean isClosed() {
+ return executed();
+ }
+
+ protected void onDelegate(Closeable delegate) throws IOException {
+ delegate.close();
+ }
+
+}
@@ -0,0 +1,46 @@
+package voldemort.store.readonly.io;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GroupIOException extends IOException {
+
+ private static final long serialVersionUID = 1L;
+ List<Throwable> suppressed = new ArrayList<Throwable>();
+
+ public GroupIOException(Throwable cause) {
+ suppressed.add(cause);
+ }
+
+ public void addSuppressed(Throwable t) {
+ suppressed.add(t);
+ }
+
+ @Override
+ public void printStackTrace(PrintStream out) {
+
+ for(Throwable current: suppressed) {
+ current.printStackTrace(out);
+ }
+
+ // this will print ourselves AND the cause...
+ super.printStackTrace(out);
+
+ }
+
+ @Override
+ public void printStackTrace(PrintWriter out) {
+
+ for(Throwable current: suppressed) {
+ current.printStackTrace(out);
+ }
+
+ // this will print ourselves AND the cause...
+ super.printStackTrace(out);
+
+ }
+
+}
Oops, something went wrong.

0 comments on commit 4db8cc3

Please sign in to comment.