Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ReadOnly store batch data indexer package as contrib.

  • Loading branch information...
commit 1b788672ed3daefc8b1ebb0684d354134907fd5c 1 parent 0b5a479
@bbansal bbansal authored
Showing with 1,132 additions and 177 deletions.
  1. +4 −0 .classpath
  2. +0 −2  TODO
  3. +4 −0 build.properties
  4. +237 −167 build.xml
  5. +2 −2 build_number.txt
  6. BIN  contrib/batch-indexer/lib/commons-cli-2.0-SNAPSHOT.jar
  7. BIN  contrib/batch-indexer/lib/hadoop-0.18.1-core.jar
  8. +137 −0 contrib/batch-indexer/src/java/test/voldemort/contrib/batchindexer/TestReadOnlyBatchIndexer.java
  9. +108 −0 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexMapper.java
  10. +24 −0 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexPartitioner.java
  11. +108 −0 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexReducer.java
  12. +132 −0 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexer.java
  13. +69 −0 contrib/test/common/config/nine-node-cluster.xml
  14. +35 −0 contrib/test/common/config/stores.xml
  15. +4 −0 contrib/test/common/test-data/usersCSV.txt
  16. +65 −0 contrib/utils/src/java/test/voldemort/contrib/utils/TestContribUtils.java
  17. +73 −0 contrib/utils/src/java/voldemort/contrib/utils/ContribUtils.java
  18. +6 −0 src/java/voldemort/server/VoldemortConfig.java
  19. +4 −1 src/java/voldemort/store/readonly/RandomAccessFileStorageConfiguration.java
  20. +41 −3 src/java/voldemort/store/readonly/RandomAccessFileStore.java
  21. +71 −0 test/integration/voldemort/performance/ReadOnlyStorePerformanceTest.java
  22. +2 −1  test/unit/voldemort/store/readonly/JsonStoreBuilderTest.java
  23. +6 −1 test/unit/voldemort/store/readonly/TestRandomAccessFileStore.java
View
4 .classpath
@@ -1,6 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src/java"/>
+ <classpathentry kind="src" path="contrib/utils/src/java"/>
+ <classpathentry kind="src" path="contrib/batch-indexer/src/java"/>
<classpathentry kind="src" path="test/unit"/>
<classpathentry kind="src" path="test/integration"/>
<classpathentry kind="src" path="test/common"/>
@@ -28,6 +30,8 @@
<classpathentry kind="lib" path="lib/velocity-1.5.jar"/>
<classpathentry kind="lib" path="lib/xerces.jar"/>
<classpathentry kind="lib" path="lib/colt-1.2.0.jar"/>
+ <classpathentry kind="lib" path="contrib/batch-indexer/lib/hadoop-0.18.1-core.jar" sourcepath="/Users/bbansal/work/linkedin/downloads/hadoop/hadoop-0.18.1/src"/>
+ <classpathentry kind="lib" path="contrib/batch-indexer/lib/commons-cli-2.0-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="lib/protobuf-java-2.0.3.jar"/>
<classpathentry kind="lib" path="lib/libthrift-20080411p1.jar"/>
<classpathentry kind="output" path="classes"/>
View
2  TODO
@@ -1,6 +1,4 @@
TODO HIGH
--- push readonly data to voldemort
-- JMX mechanism to swap table on the fly (HTTP in war, method in vold. server)
-- initial implementation is in the read-only store using read/write lock
-- rebalancing JMX control
View
4 build.properties
@@ -1,15 +1,19 @@
src.dir=src
java.dir=src/java
+contrib.java.dir=contrib
+contrib.src.dir=contrib
lib.dir=lib
docs.dir=docs
javadoc.dir=docs/javadoc
dist.dir=dist
war.dir=war
classes.dir=dist/classes
+contrib.classes.dir=dist/contrib-classes
commontestsrc.dir=test/common
unittestsrc.dir=test/unit
inttestsrc.dir=test/integration
testclasses.dir=dist/testclasses
+contrib.testclasses.dir=dist/contribtestclasses
testreport.dir=dist/junit-reports
testhtml.dir=dist/junit-reports/html
tomcat.manager.url=http://localhost:8080/manager
View
404 build.xml
@@ -1,185 +1,255 @@
<?xml version="1.0"?>
<project name="voldemort" basedir="." default="all">
- <property file="build.properties"/>
-
- <property name="name" value="voldemort"/>
- <property name="display.name" value="Voldemort"/>
- <property name="author" value="Jay Kreps"/>
-
- <path id="main-classpath">
- <fileset dir="${lib.dir}">
- <include name="*.jar"/>
- </fileset>
- <pathelement path="${classes.dir}"/>
- </path>
-
- <path id="test-classpath">
- <path refid="main-classpath"/>
- <pathelement path="${testclasses.dir}"/>
- </path>
-
- <macrodef name="replace-dir">
- <attribute name="dir"/>
- <sequential>
- <delete dir="@{dir}"/>
- <mkdir dir="@{dir}"/>
- </sequential>
- </macrodef>
-
- <target name="all" depends="clean, srcjar, jar, war, test" description="Build all artifacts." />
-
- <target name="clean" description="Delete generated files.">
- <delete dir="${dist.dir}" />
- <replace-dir dir="${javadoc.dir}"/>
- </target>
-
- <target name="build" description="Compile main source tree java files">
- <replace-dir dir="${classes.dir}"/>
- <!-- copy non-java files to classes dir to load from classpath -->
- <copy todir="${classes.dir}">
- <fileset dir="${java.dir}">
- <exclude name="**/*.java"/>
- <exclude name="**/*.html"/>
- </fileset>
- </copy>
- <javac destdir="${classes.dir}" target="1.5" debug="true" deprecation="true" failonerror="true">
- <src path="${java.dir}"/>
- <classpath refid="main-classpath"/>
- </javac>
- <buildnumber file="build_number.txt"/>
- </target>
-
- <target name="test" depends="build, buildtest" description="Build test jar file">
- <jar destfile="${dist.dir}/${name}-test-${curr.release}.jar">
- <fileset dir="${testclasses.dir}"/>
- </jar>
- </target>
-
- <target name="jar" depends="build" description="Build server jar file">
- <jar destfile="${dist.dir}/${name}-${curr.release}.jar">
- <fileset dir="${classes.dir}">
- <include name="**/*.*"/>
- </fileset>
- <!-- include xsds -->
- <fileset dir="${java.dir}">
- <include name="**/*.xsd"/>
- </fileset>
- </jar>
- </target>
-
- <target name="srcjar" description="Build source jar file">
- <mkdir dir="${dist.dir}"/>
- <jar destfile="${dist.dir}/${name}-${curr.release}-src.jar">
- <fileset dir="${java.dir}">
- <include name="**/*.java"/>
- </fileset>
- </jar>
- </target>
-
- <target name="utils" depends="build" description="Build a jar file with voldemort util classes.">
- <tstamp>
- <format property="TODAY" pattern="yyyy-MM-dd" locale="en,US"/>
- </tstamp>
- <jar destfile="${dist.dir}/${name}-utils-${TODAY}.jar">
- <fileset dir="${classes.dir}">
- <include name="voldemort/VoldemortException.class"/>
- <include name="voldemort/Serializer.class"/>
- <include name="voldemort/serialization/SerializationException.class"/>
- <include name="voldemort/serialization/json/*"/>
- <include name="voldemort/utils/*"/>
- </fileset>
- </jar>
- </target>
-
- <target name="buildtest" description="Compile test classes">
- <replace-dir dir="${testclasses.dir}"/>
- <javac destdir="${testclasses.dir}" target="1.5" debug="true" deprecation="true" failonerror="true">
- <src path="${unittestsrc.dir}"/>
- <src path="${inttestsrc.dir}"/>
- <src path="${commontestsrc.dir}"/>
- <classpath refid="main-classpath"/>
- </javac>
- </target>
-
- <target name="junit" depends="build, buildtest" description="Run junit tests.">
- <replace-dir dir="${testreport.dir}"/>
- <replace-dir dir="${testhtml.dir}"/>
- <junit printsummary="yes" showoutput="true">
- <classpath refid="test-classpath"/>
- <formatter type="xml"/>
- <batchtest fork="yes" todir="${testreport.dir}">
- <fileset dir="${unittestsrc.dir}">
- <include name="**/*Test.java"/>
- </fileset>
- </batchtest>
- </junit>
- <junitreport todir="${testhtml.dir}">
- <fileset dir="${testreport.dir}">
- <include name="TEST-*.xml"/>
- </fileset>
- <report todir="${testhtml.dir}" styledir="junitreport" format="frames" />
- </junitreport>
- </target>
-
- <target name="war" depends="build" description="Build server war file">
- <war destfile="${dist.dir}/${name}.war" webxml="web.xml" basedir="${classes.dir}">
- <classes dir="${classes.dir}"/>
- <lib dir="${lib.dir}">
- <exclude name="${lib.dir}/servlet-api.jar"/>
- <exclude name="${lib.dir}/src/**"/>
- </lib>
- </war>
- </target>
-
- <macrodef name="make-javadocs">
- <attribute name="dir"/>
- <attribute name="packagenames"/>
- <sequential>
- <replace-dir dir="@{dir}"/>
- <javadoc sourcepath="${java.dir}" destdir="@{dir}" windowtitle="${display.name}"
+ <property file="build.properties"/>
+
+ <property name="name" value="voldemort"/>
+ <property name="display.name" value="Voldemort"/>
+ <property name="author" value="Jay Kreps"/>
+
+ <path id="main-classpath">
+ <fileset dir="${lib.dir}">
+ <include name="*.jar"/>
+ </fileset>
+ <pathelement path="${classes.dir}"/>
+ </path>
+
+ <path id="contrib-classpath">
+ <fileset dir="${dist.dir}">
+ <include name="${name}-${curr.release}.jar" />
+ </fileset>
+ <fileset dir="${contrib.src.dir}">
+ <include name="**/*.jar" />
+ </fileset>
+ </path>
+
+ <path id="test-classpath">
+ <path refid="main-classpath" />
+ <pathelement path="${testclasses.dir}"/>
+ </path>
+
+ <path id="contrib-test-classpath">
+ <path refid="main-classpath" />
+ <path refid="contrib-classpath" />
+ <fileset dir="${dist.dir}">
+ <include name="${name}-contrib-${curr.release}.jar" />
+ </fileset>
+ </path>
+
+ <macrodef name="replace-dir">
+ <attribute name="dir"/>
+ <sequential>
+ <delete dir="@{dir}"/>
+ <mkdir dir="@{dir}"/>
+ </sequential>
+ </macrodef>
+
+ <target name="all" depends="clean, srcjar, jar, war, test, contrib-jar" description="Build all artifacts." />
+
+ <target name="clean" description="Delete generated files.">
+ <delete dir="${dist.dir}" />
+ <replace-dir dir="${javadoc.dir}"/>
+ </target>
+
+ <target name="build" description="Compile main source tree java files">
+ <replace-dir dir="${classes.dir}"/>
+ <!-- copy non-java files to classes dir to load from classpath -->
+ <copy todir="${classes.dir}">
+ <fileset dir="${java.dir}">
+ <exclude name="**/*.java"/>
+ <exclude name="**/*.html"/>
+ </fileset>
+ </copy>
+ <javac destdir="${classes.dir}" target="1.5" debug="true" deprecation="true" failonerror="true">
+ <src path="${java.dir}"/>
+ <classpath refid="main-classpath"/>
+ </javac>
+ <buildnumber file="build_number.txt"/>
+ </target>
+
+ <target name="test" depends="build, buildtest" description="Build test jar file">
+ <jar destfile="${dist.dir}/${name}-test-${curr.release}.jar">
+ <fileset dir="${testclasses.dir}"/>
+ </jar>
+ </target>
+
+ <target name="jar" depends="build" description="Build server jar file">
+ <jar destfile="${dist.dir}/${name}-${curr.release}.jar">
+ <fileset dir="${classes.dir}">
+ <include name="**/*.*"/>
+ </fileset>
+ <!-- include xsds -->
+ <fileset dir="${java.dir}">
+ <include name="**/*.xsd"/>
+ </fileset>
+ </jar>
+ </target>
+
+ <target name="srcjar" description="Build source jar file">
+ <mkdir dir="${dist.dir}"/>
+ <jar destfile="${dist.dir}/${name}-${curr.release}-src.jar">
+ <fileset dir="${java.dir}">
+ <include name="**/*.java"/>
+ </fileset>
+ </jar>
+ </target>
+
+ <target name="contrib-build" depends="build" description="Compile contrib packages (java and test) ">
+ <replace-dir dir="${contrib.classes.dir}"/>
+ <!-- copy non-java files to classes dir to load from classpath -->
+ <copy todir="${contrib.classes.dir}">
+ <fileset dir="${contrib.java.dir}">
+ <exclude name="**/*.java"/>
+ <exclude name="**/*.html"/>
+ </fileset>
+ </copy>
+ <javac destdir="${contrib.classes.dir}" target="1.5" debug="true" deprecation="true" failonerror="true">
+ <src path="${contrib.java.dir}"/>
+ <classpath refid="main-classpath"/>
+ <classpath refid="contrib-classpath"/>
+ </javac>
+ <buildnumber file="build_number.txt"/>
+ </target>
+
+
+
+ <target name="contrib-jar" depends="contrib-build" description="Build contrib jar file">
+ <jar destfile="${dist.dir}/${name}-contrib-${curr.release}.jar">
+ <fileset dir="${contrib.classes.dir}">
+ <include name="**/*.*"/>
+ </fileset>
+ <!-- include xsds -->
+ <fileset dir="${contrib.java.dir}">
+ <include name="**/*.xsd"/>
+ </fileset>
+ </jar>
+ </target>
+
+ <target name="contrib-srcjar" description="Build contrib source jar file">
+ <mkdir dir="${dist.dir}"/>
+ <jar destfile="${dist.dir}/${name}-contrib-${curr.release}-src.jar">
+ <fileset dir="${contrib.java.dir}">
+ <include name="**/*.java"/>
+ </fileset>
+ </jar>
+ </target>
+
+ <target name="utils" depends="build" description="Build a jar file with voldemort util classes.">
+ <tstamp>
+ <format property="TODAY" pattern="yyyy-MM-dd" locale="en,US"/>
+ </tstamp>
+ <jar destfile="${dist.dir}/${name}-utils-${TODAY}.jar">
+ <fileset dir="${classes.dir}">
+ <include name="voldemort/VoldemortException.class"/>
+ <include name="voldemort/Serializer.class"/>
+ <include name="voldemort/serialization/SerializationException.class"/>
+ <include name="voldemort/serialization/json/*"/>
+ <include name="voldemort/utils/*"/>
+ </fileset>
+ </jar>
+ </target>
+
+ <target name="buildtest" description="Compile test classes">
+ <replace-dir dir="${testclasses.dir}"/>
+ <javac destdir="${testclasses.dir}" target="1.5" debug="true" deprecation="true" failonerror="true">
+ <src path="${unittestsrc.dir}"/>
+ <src path="${inttestsrc.dir}"/>
+ <src path="${commontestsrc.dir}"/>
+ <classpath refid="main-classpath"/>
+ </javac>
+ </target>
+
+ <target name="contrib-junit" depends="contrib-jar" description="Run contrib junit tests.">
+ <junit printsummary="yes" maxmemory="1024m" showoutput="true" failureProperty="test.failure">
+ <classpath refid="contrib-test-classpath"/>
+ <formatter type="brief" usefile="false"/>
+ <batchtest fork="yes" todir="${testreport.dir}">
+ <fileset dir="${contrib.classes.dir}">
+ <include name="**/Test*.class"/>
+ </fileset>
+ </batchtest>
+ </junit>
+ </target>
+
+ <target name="junit" depends="build, buildtest" description="Run junit tests.">
+ <replace-dir dir="${testreport.dir}"/>
+ <replace-dir dir="${testhtml.dir}"/>
+ <junit printsummary="yes" showoutput="true">
+ <classpath refid="test-classpath"/>
+ <formatter type="xml"/>
+ <batchtest fork="yes" todir="${testreport.dir}">
+ <fileset dir="${unittestsrc.dir}">
+ <include name="**/*Test.java"/>
+ </fileset>
+ </batchtest>
+ </junit>
+ <junitreport todir="${testhtml.dir}">
+ <fileset dir="${testreport.dir}">
+ <include name="TEST-*.xml"/>
+ </fileset>
+ <report todir="${testhtml.dir}" styledir="junitreport" format="frames" />
+ </junitreport>
+ </target>
+
+ <target name="war" depends="build" description="Build server war file">
+ <war destfile="${dist.dir}/${name}.war" webxml="web.xml" basedir="${classes.dir}">
+ <classes dir="${classes.dir}"/>
+ <lib dir="${lib.dir}">
+ <exclude name="${lib.dir}/servlet-api.jar"/>
+ <exclude name="${lib.dir}/src/**"/>
+ </lib>
+ </war>
+ </target>
+
+ <macrodef name="make-javadocs">
+ <attribute name="dir"/>
+ <attribute name="packagenames"/>
+ <sequential>
+ <replace-dir dir="@{dir}"/>
+ <javadoc sourcepath="${java.dir}" destdir="@{dir}" windowtitle="${display.name}"
source="1.5" author="true" version="true" use="true" packagenames="@{packagenames}">
- <doctitle>${display.name}</doctitle>
- <bottom>${author}</bottom>
- <classpath refid="main-classpath"/>
- </javadoc>
- </sequential>
- </macrodef>
-
- <target name="docs" description="Create complete Javadoc documentation">
- <make-javadocs dir="${javadoc.dir}/all" packagenames="*"/>
- <make-javadocs dir="${javadoc.dir}/client" packagenames="voldemort.versioning.*, voldemort.client.*,voldemort.serialization.*"/>
- </target>
-
- <target name="redeploy" depends="war, deploy">
- </target>
-
- <taskdef file="tomcat-tasks.properties">
- <classpath>
- <path location="${lib.dir}/catalina-ant.jar"/>
- </classpath>
- </taskdef>
-
- <target name="list" description="List Tomcat applications">
- <list url="${tomcat.manager.url}"
+ <doctitle>${display.name}</doctitle>
+ <bottom>${author}</bottom>
+ <classpath refid="main-classpath"/>
+ </javadoc>
+ </sequential>
+ </macrodef>
+
+ <target name="docs" description="Create complete Javadoc documentation">
+ <make-javadocs dir="${javadoc.dir}/all" packagenames="*"/>
+ <make-javadocs dir="${javadoc.dir}/client" packagenames="voldemort.versioning.*, voldemort.client.*,voldemort.serialization.*"/>
+ <make-javadocs dir="${javadoc.dir}/contrib" packagenames="voldemort.contrib.*"/>
+ </target>
+
+ <target name="redeploy" depends="war, deploy">
+ </target>
+
+ <taskdef file="tomcat-tasks.properties">
+ <classpath>
+ <path location="${lib.dir}/catalina-ant.jar"/>
+ </classpath>
+ </taskdef>
+
+ <target name="list" description="List Tomcat applications">
+ <list url="${tomcat.manager.url}"
username="${tomcat.manager.username}"
password="${tomcat.manager.password}"/>
- </target>
+ </target>
- <target name="deploy" description="Deploy application" depends="war">
- <deploy url="${tomcat.manager.url}"
+ <target name="deploy" description="Deploy application" depends="war">
+ <deploy url="${tomcat.manager.url}"
username="${tomcat.manager.username}"
password="${tomcat.manager.password}"
path="${tomcat.context}"
update="true"
war="file:${dist.dir}/${name}.war"/>
- </target>
+ </target>
- <target name="undeploy" description="Undeploy application">
- <undeploy url="${tomcat.manager.url}"
+ <target name="undeploy" description="Undeploy application">
+ <undeploy url="${tomcat.manager.url}"
username="${tomcat.manager.username}"
password="${tomcat.manager.password}"
path="${tomcat.context}"/>
- </target>
+ </target>
</project>
View
4 build_number.txt
@@ -1,3 +1,3 @@
#Build Number for ANT. Do not edit!
-#Fri Jan 30 17:10:30 PST 2009
-build.number=191
+#Thu Feb 05 11:29:14 PST 2009
+build.number=300
View
BIN  contrib/batch-indexer/lib/commons-cli-2.0-SNAPSHOT.jar
Binary file not shown
View
BIN  contrib/batch-indexer/lib/hadoop-0.18.1-core.jar
Binary file not shown
View
137 contrib/batch-indexer/src/java/test/voldemort/contrib/batchindexer/TestReadOnlyBatchIndexer.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package test.voldemort.contrib.batchindexer;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ToolRunner;
+
+import voldemort.contrib.batchindexer.ReadOnlyBatchIndexMapper;
+import voldemort.contrib.batchindexer.ReadOnlyBatchIndexer;
+import voldemort.serialization.DefaultSerializerFactory;
+import voldemort.serialization.Serializer;
+import voldemort.serialization.SerializerDefinition;
+import voldemort.store.Store;
+import voldemort.store.readonly.RandomAccessFileStore;
+import voldemort.store.serialized.SerializingStore;
+import voldemort.versioning.Versioned;
+
+/**
+ * Unit test to check Read-Only Batch Indexer <strong>in Local mode numReduce
+ * will be only one hence we will see only one node files irrespective of
+ * cluster details.</strong>
+ *
+ * @author bbansal
+ *
+ */
+public class TestReadOnlyBatchIndexer extends TestCase {
+
+ @SuppressWarnings("unchecked")
+ public void testCSVFileBatchIndexer() throws Exception {
+
+ ToolRunner.run(new Configuration(), new TextBatchIndexer(), null);
+
+ // rename Files
+ File dataDir = new File("contrib/batch-indexer/temp-output/text");
+ new File(dataDir, "users.index_0").renameTo(new File(dataDir, "users.index"));
+ new File(dataDir, "users.data_0").renameTo(new File(dataDir, "users.data"));
+
+ // open Store
+ SerializerDefinition serDef = new SerializerDefinition("string", "UTF-8");
+ Serializer<Object> Keyserializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(serDef);
+ Serializer<Object> Valueserializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(new SerializerDefinition("java-serialization"));
+
+ Store<Object, Object> store = new SerializingStore<Object, Object>(new RandomAccessFileStore("users",
+ dataDir,
+ 1,
+ 3,
+ 1000,
+ 100 * 1000 * 1000),
+ Keyserializer,
+ Valueserializer);
+
+ // query all keys and check for value
+ BufferedReader reader = new BufferedReader(new FileReader(new File("contrib/test/common/test-data/usersCSV.txt")));
+ String line;
+ while(null != (line = reader.readLine())) {
+
+ // correct Query
+ String[] tokens = line.split("\\|");
+ List<Versioned<Object>> found = store.get(tokens[0]);
+ String result = (String) found.get(0).getValue();
+ assertEquals("Value for key should match for set value", tokens[1], result);
+
+ // wrong query
+ int changeIndex = (int) (Math.random() * tokens[0].length());
+ found = store.get(tokens[0].replace(tokens[0].charAt(changeIndex), '|'));
+ // found size should be 0 or not match the original value.
+ if(found.size() > 0) {
+ result = (String) found.get(0).getValue();
+ assertNotSame("Value for key should not match for set value", tokens[1], result);
+ }
+ }
+
+ }
+}
+
+class TextBatchMapper extends ReadOnlyBatchIndexMapper<LongWritable, Text> {
+
+ @Override
+ public Object getKeyBytes(LongWritable key, Text value) {
+ String[] tokens = value.toString().split("\\|");
+ return tokens[0];
+ }
+
+ @Override
+ public Object getValueBytes(LongWritable key, Text value) {
+ String[] tokens = value.toString().split("\\|");
+ return tokens[1];
+ }
+
+}
+
+class TextBatchIndexer extends ReadOnlyBatchIndexer {
+
+ public void configure(JobConf conf) {
+
+ conf.set("job.name", "testCSVBatchIndexer");
+ conf.set("voldemort.cluster.local.filePath",
+ "contrib/test/common/config/nine-node-cluster.xml");
+ conf.set("voldemort.store.local.filePath", "contrib/test/common/config/stores.xml");
+ conf.set("voldemort.store.name", "users");
+
+ // set inset/outset path
+ FileInputFormat.addInputPaths(conf, "contrib/test/common/test-data/usersCSV.txt");
+ FileOutputFormat.setOutputPath(conf, new Path("contrib/batch-indexer/temp-output/text"));
+
+ conf.setMapperClass(TextBatchMapper.class);
+ conf.setInputFormat(TextInputFormat.class);
+ }
+}
View
108 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexMapper.java
@@ -0,0 +1,108 @@
+package voldemort.contrib.batchindexer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import voldemort.cluster.Cluster;
+import voldemort.cluster.Node;
+import voldemort.contrib.utils.ContribUtils;
+import voldemort.routing.ConsistentRoutingStrategy;
+import voldemort.serialization.DefaultSerializerFactory;
+import voldemort.serialization.Serializer;
+import voldemort.serialization.SerializerDefinition;
+import voldemort.store.StoreDefinition;
+import voldemort.utils.ByteUtils;
+
+/**
+ * Mapper code for Read-Only store batch Indexer Reads following properties from
+ * JobConf
+ * <p>
+ * <li><strong>voldemort.store.name </strong></li>
+ * <li><strong>voldemort.store.version </strong></li>
+ * <p>
+ * Assumes Distributed cache have files with names
+ * <li><strong>cluster.xml </strong></li>
+ * <li><strong>stores.xml</strong></li>
+ *
+ * @author bbansal
+ *
+ */
+public abstract class ReadOnlyBatchIndexMapper<K, V> implements
+ Mapper<K, V, BytesWritable, BytesWritable> {
+
+ private static Logger logger = Logger.getLogger(ReadOnlyBatchIndexMapper.class);
+ private Cluster _cluster = null;
+ private StoreDefinition _storeDef = null;
+ private ConsistentRoutingStrategy _routingStrategy = null;
+ private Serializer<Object> _keySerializer;
+ private Serializer<Object> _valueSerializer;
+
+ public abstract Object getKeyBytes(K key, V value);
+
+ public abstract Object getValueBytes(K key, V value);
+
+ public void map(K key,
+ V value,
+ OutputCollector<BytesWritable, BytesWritable> output,
+ Reporter reporter) throws IOException {
+ byte[] keyBytes = _keySerializer.toBytes(getKeyBytes(key, value));
+ byte[] valBytes = _valueSerializer.toBytes(getValueBytes(key, value));
+
+ List<Node> nodes = _routingStrategy.routeRequest(keyBytes);
+ for(Node node: nodes) {
+ ByteArrayOutputStream versionedValue = new ByteArrayOutputStream();
+ DataOutputStream valueDin = new DataOutputStream(versionedValue);
+ valueDin.writeInt(node.getId());
+ valueDin.write(valBytes);
+ valueDin.close();
+ BytesWritable outputKey = new BytesWritable(ByteUtils.md5(keyBytes));
+ BytesWritable outputVal = new BytesWritable(versionedValue.toByteArray());
+
+ output.collect(outputKey, outputVal);
+ }
+ }
+
+ public void configure(JobConf conf) {
+
+ try {
+
+ // get the voldemort cluster.xml and store.xml files.
+ String clusterFilePath = ContribUtils.getFileFromCache(conf, "cluster.xml");
+ String storeFilePath = ContribUtils.getFileFromCache(conf, "store.xml");
+
+ if(null == clusterFilePath || null == storeFilePath) {
+ throw new RuntimeException("Mapper expects cluster.xml / stores.xml passed through Distributed cache.");
+ }
+
+ // get Cluster and Store details
+ _cluster = ContribUtils.getVoldemortClusterDetails(clusterFilePath);
+ _storeDef = ContribUtils.getVoldemortStoreDetails(storeFilePath,
+ conf.get("voldemort.store.name"));
+
+ SerializerDefinition serDef = new SerializerDefinition("string", "UTF-8");
+
+ _keySerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(_storeDef.getKeySerializer());
+ _valueSerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(_storeDef.getValueSerializer());
+
+ _routingStrategy = new ConsistentRoutingStrategy(_cluster.getNodes(),
+ _storeDef.getReplicationFactor());
+
+ if(_routingStrategy == null) {
+ throw new RuntimeException("Failed to create routing strategy");
+ }
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void close() throws IOException {}
+}
View
24 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexPartitioner.java
@@ -0,0 +1,24 @@
+package voldemort.contrib.batchindexer;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+
+public class ReadOnlyBatchIndexPartitioner extends HashPartitioner<BytesWritable, BytesWritable> {
+
+ @Override
+ public int getPartition(BytesWritable key, BytesWritable value, int numReduceTasks) {
+ // The partition id is first 4 bytes in the value.
+ DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(value.get()));
+ int nodeId = -2;
+ try {
+ nodeId = buffer.readInt();
+ } catch(IOException e) {
+ throw new RuntimeException("Failed to parse nodeId from buffer.", e);
+ }
+ return (nodeId) % numReduceTasks;
+ }
+}
View
108 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexReducer.java
@@ -0,0 +1,108 @@
+package voldemort.contrib.batchindexer;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import voldemort.utils.ByteUtils;
+
+public class ReadOnlyBatchIndexReducer implements Reducer<BytesWritable, BytesWritable, Text, Text> {
+
+ private DataOutputStream _indexFileStream = null;
+ private DataOutputStream _valueFileStream = null;
+
+ private long _position = 0;
+
+ private JobConf _conf = null;
+ private String _taskId = null;
+ private int _nodeId = -1;
+
+ String indexFileName;
+ String dataFileName;
+ String taskIndexFileName;
+ String taskValueFileName;
+
+ /**
+ * Reduce should get sorted MD5 keys here with a single value (appended in
+ * begining with 4 bits of nodeId)
+ */
+ public void reduce(BytesWritable key,
+ Iterator<BytesWritable> values,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) throws IOException {
+ byte[] keyBytes = ByteUtils.copy(key.get(), 0, key.getSize());
+
+ while(values.hasNext()) {
+ BytesWritable value = values.next();
+ byte[] valBytes = ByteUtils.copy(value.get(), 0, value.getSize());
+
+ if(_nodeId == -1) {
+ DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(valBytes));
+ _nodeId = buffer.readInt();
+ }
+ // strip first 4 bytes as node_id
+ byte[] value1 = ByteUtils.copy(valBytes, 4, valBytes.length);
+
+ // Write Index Key/ position
+ _indexFileStream.write(keyBytes);
+ _indexFileStream.writeLong(_position);
+ _valueFileStream.writeInt(value1.length);
+ _valueFileStream.write(value1);
+ _position += value1.length + 4;
+
+ if(_position < 0) {
+ throw new RuntimeException("Position bigger than Integer size, split input files.");
+ }
+ }
+
+ }
+
+ public void configure(JobConf job) {
+ try {
+ _position = 0;
+ _conf = job;
+
+ _taskId = job.get("mapred.task.id");
+
+ indexFileName = new Path(_conf.get("voldemort.index.filename")).getName();
+ taskIndexFileName = _conf.getLocalPath(indexFileName + "_" + _taskId).toUri().getPath();
+
+ dataFileName = new Path(_conf.get("voldemort.data.filename")).getName();
+ taskValueFileName = _conf.getLocalPath(dataFileName + "_" + _taskId).toUri().getPath();
+
+ _indexFileStream = new DataOutputStream(new java.io.BufferedOutputStream(new FileOutputStream(new File(taskIndexFileName))));
+ _valueFileStream = new DataOutputStream(new java.io.BufferedOutputStream(new FileOutputStream(new File(taskValueFileName))));
+ } catch(IOException e) {
+ new RuntimeException("Failed to open Input/OutputStream", e);
+ }
+ }
+
+ public void close() throws IOException {
+ // close the local file stream and copy it to HDFS
+ _indexFileStream.close();
+ _valueFileStream.close();
+
+ Path hdfsIndexFile = new Path(FileOutputFormat.getOutputPath(_conf), indexFileName + "_"
+ + _nodeId);
+ Path hdfsValueFile = new Path(FileOutputFormat.getOutputPath(_conf), dataFileName + "_"
+ + _nodeId);
+
+ hdfsIndexFile.getFileSystem(_conf).copyFromLocalFile(new Path(taskIndexFileName),
+ hdfsIndexFile);
+ hdfsValueFile.getFileSystem(_conf).copyFromLocalFile(new Path(taskValueFileName),
+ hdfsValueFile);
+ }
+}
View
132 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexer.java
@@ -0,0 +1,132 @@
+package voldemort.contrib.batchindexer;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.log4j.Logger;
+
+import voldemort.cluster.Cluster;
+import voldemort.contrib.utils.ContribUtils;
+
+/**
+ * Creates a simple Read-Only Voldemort store for easy batch update.
+ * <p>
+ * Creates two files
+ * <ul>
+ * <li><strong>Index</strong> File: Keeps the position Index for each key
+ * sorted by MD5(key) Tuple: <KEY_HASH_SIZE(16 bytes)><POSITION_SIZE(8 bytes)>
+ * </li>
+ * <li><strong>Values</strong> file: Keeps the variable length values Tuple:
+ * <SIZE_OF_VALUE(4 bytes)><VALUE(byte[])> </li>
+ * <ul>
+ * <p>
+ * Required Properties
+ * <ul>
+ * <li>job.name</li>
+ * <li>voldemort.cluster.local.filePath</li>
+ * <li>voldemort.store.local.filePath</li>
+ * <li>voldemort.store.name</li>
+ * <li>voldemort.store.version</li>
+ * <li>input.data.check.percent</li>
+ * </ul>
+ *
+ * @author bbansal
+ */
+public abstract class ReadOnlyBatchIndexer extends Configured implements Tool, JobConfigurable {
+
+ private static Logger logger = Logger.getLogger(ReadOnlyBatchIndexer.class);
+
+ public int run(String[] args) throws Exception {
+
+ JobConf conf = new JobConf(getConf(), ReadOnlyBatchIndexer.class);
+ configure(conf);
+
+ try {
+ // get the voldemort cluster definition
+ Cluster cluster = ContribUtils.getVoldemortClusterDetails(conf.get("voldemort.cluster.local.filePath"));
+ conf.setNumReduceTasks(cluster.getNumberOfNodes());
+ } catch(Exception e) {
+ logger.error("Failed to read Voldemort cluster details", e);
+ throw new RuntimeException("", e);
+ }
+
+ // set the partitioner
+ conf.setPartitionerClass(ReadOnlyBatchIndexPartitioner.class);
+
+ // set mapper Outputclasses
+ conf.setMapOutputKeyClass(BytesWritable.class);
+ conf.setMapOutputValueClass(BytesWritable.class);
+
+ // set reducer classes
+ conf.setReducerClass(ReadOnlyBatchIndexReducer.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputKeyClass(BytesWritable.class);
+ conf.setOutputValueClass(BytesWritable.class);
+
+ // get the store information
+ String storeName = conf.get("voldemort.store.name");
+ conf.setStrings("voldemort.index.filename", storeName + ".index");
+ conf.setStrings("voldemort.data.filename", storeName + ".data");
+
+ // get Local config files.
+ Path clusterFile = new Path(conf.get("voldemort.cluster.local.filePath"));
+ Path storeFile = new Path(conf.get("voldemort.store.local.filePath"));
+
+ // move files to HDFS if Hadoop run is not local
+ if(!conf.get("mapred.job.tracker").equals("local")) {
+
+ // set temp HDFS paths
+ Path clusterHdfs = new Path("/tmp/" + conf.getJobName() + "/" + "cluster.xml");
+ Path storeHdfs = new Path("/tmp/" + conf.getJobName() + "/" + "store.xml");
+
+ // get FileSystem & copy files to HDFS
+ // TODO LOW: Distributed cache should take care of this
+ FileSystem fs = clusterFile.getFileSystem(conf);
+ fs.copyFromLocalFile(clusterFile, clusterHdfs);
+ fs.copyFromLocalFile(storeFile, storeHdfs);
+
+ // add HDFS files to distributed cache
+ DistributedCache.addCacheFile(new URI(clusterHdfs.toString() + "#cluster.xml"), conf);
+ DistributedCache.addCacheFile(new URI(storeHdfs.toString() + "#store.xml"), conf);
+ } else {
+ // Add local files to distributed cache
+ DistributedCache.addCacheFile(new URI(clusterFile.toString() + "#cluster.xml"), conf);
+ DistributedCache.addCacheFile(new URI(storeFile.toString() + "#store.xml"), conf);
+ }
+
+ // run(conf);
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ /**
+ * <strong>configure must set:</strong>
+ * <ul>
+ * <li>Input Path List</li>
+ * <li>Output Path</li>
+ * <li>Mapper class <? extends {@link ReadOnlyBatchIndexMapper}></li>
+ * <li>Input Format class</li>
+ * </ul>
+ * <p>
+ * <strong>configure must set these properties.</strong>
+ * <ul>
+ * <li>job.name: String</li>
+ * <li>voldemort.cluster.local.filePath: String</li>
+ * <li>voldemort.store.local.filePath: String</li>
+ * <li>voldemort.store.name: String</li>
+ * </ul>
+ *
+ *
+ * @return
+ */
+ public abstract void configure(JobConf conf);
+}
View
69 contrib/test/common/config/nine-node-cluster.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0"?>
+<cluster>
+ <name>mycluster</name>
+ <server>
+ <id>0</id>
+ <host>localhost</host>
+ <http-port>8080</http-port>
+ <socket-port>6666</socket-port>
+ <partitions>26, 3, 21</partitions>
+ </server>
+ <server>
+ <id>1</id>
+ <host>localhost</host>
+ <http-port>8081</http-port>
+ <socket-port>6667</socket-port>
+ <partitions>23, 18, 13</partitions>
+ </server>
+ <server>
+ <id>2</id>
+ <host>localhost</host>
+ <http-port>8082</http-port>
+ <socket-port>6668</socket-port>
+ <partitions>22, 16, 12</partitions>
+ </server>
+ <server>
+ <id>3</id>
+ <host>localhost</host>
+ <http-port>8083</http-port>
+ <socket-port>6669</socket-port>
+ <partitions>2, 17, 24</partitions>
+ </server>
+ <server>
+ <id>4</id>
+ <host>localhost</host>
+ <http-port>8084</http-port>
+ <socket-port>6670</socket-port>
+ <partitions>11, 25, 10</partitions>
+ </server>
+ <server>
+ <id>5</id>
+ <host>localhost</host>
+ <http-port>8085</http-port>
+ <socket-port>6671</socket-port>
+ <partitions>
+ 4, 19, 5
+ </partitions>
+ </server>
+ <server>
+ <id>6</id>
+ <host>localhost</host>
+ <http-port>8086</http-port>
+ <socket-port>6672</socket-port>
+ <partitions>20, 0, 14</partitions>
+ </server>
+ <server>
+ <id>7</id>
+ <host>localhost</host>
+ <http-port>8087</http-port>
+ <socket-port>6673</socket-port>
+ <partitions>1, 9, 6</partitions>
+ </server>
+ <server>
+ <id>8</id>
+ <host>localhost</host>
+ <http-port>8088</http-port>
+ <socket-port>6674</socket-port>
+ <partitions>8, 7, 15</partitions>
+ </server>
+</cluster>
View
35 contrib/test/common/config/stores.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<stores>
+ <store>
+ <name>users</name>
+ <persistence>bdb</persistence>
+ <routing>client</routing>
+ <replication-factor>2</replication-factor>
+ <required-reads>1</required-reads>
+ <required-writes>2</required-writes>
+ <key-serializer>
+ <type>string</type>
+ <schema-info>UTF-8</schema-info>
+ </key-serializer>
+ <value-serializer>
+ <type>java-serialization</type>
+ </value-serializer>
+ </store>
+ <store>
+ <name>veggies</name>
+ <persistence>bdb</persistence>
+ <routing>client</routing>
+ <replication-factor>2</replication-factor>
+ <required-reads>1</required-reads>
+ <required-writes>2</required-writes>
+ <key-serializer>
+ <type>string</type>
+ <schema-info>UTF-8</schema-info>
+ </key-serializer>
+ <value-serializer>
+ <type>json</type>
+ <schema-info version="1">"int32"</schema-info>
+ <schema-info version="2">"int32"</schema-info>
+ </value-serializer>
+ </store>
+</stores>
View
4 contrib/test/common/test-data/usersCSV.txt
@@ -0,0 +1,4 @@
+member1|value1
+member2|-value2-
+member3 |value3
+member4|value4
View
65 contrib/utils/src/java/test/voldemort/contrib/utils/TestContribUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package test.voldemort.contrib.utils;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+
+import voldemort.contrib.utils.ContribUtils;
+
+public class TestContribUtils extends TestCase {
+
+ public void testGetFileFromPathList() {
+ Path path1 = new Path("simpleFileName");
+ Path path2 = new Path("somePath/simpleFileName");
+ Path path3 = new Path("somePath/someOtherPath/");
+ Path path4 = new Path("file:////somepath/simpleFileName");
+
+ assertEquals("Returned Path should match",
+ path1.toString(),
+ ContribUtils.getFileFromPathList(new Path[] { path1 }, "simpleFileName"));
+ assertEquals("Returned Path should match",
+ path2.toString(),
+ ContribUtils.getFileFromPathList(new Path[] { path2 }, "simpleFileName"));
+ assertEquals("Returned Path should be null",
+ null,
+ ContribUtils.getFileFromPathList(new Path[] { path3 }, "simpleFileName"));
+
+ assertNotSame("Returned Path should Not match schema part",
+ path4.toString(),
+ ContribUtils.getFileFromPathList(new Path[] { path4 }, "simpleFileName"));
+
+ assertEquals("Returned path should match core path w/o schema",
+ path4.toUri().getPath(),
+ ContribUtils.getFileFromPathList(new Path[] { path4 }, "simpleFileName"));
+
+ assertEquals("Returned Path should match",
+ path1.toString(),
+ ContribUtils.getFileFromPathList(new Path[] { path1, path2 }, "simpleFileName"));
+
+ assertEquals("Returned Path should match",
+ path1.toString(),
+ ContribUtils.getFileFromPathList(new Path[] { path3, path1, path2 },
+ "simpleFileName"));
+
+ assertEquals("Returned Path should match",
+ path2.toString(),
+ ContribUtils.getFileFromPathList(new Path[] { path3, path2, path1 },
+ "simpleFileName"));
+ }
+}
View
73 contrib/utils/src/java/voldemort/contrib/utils/ContribUtils.java
@@ -0,0 +1,73 @@
+package voldemort.contrib.utils;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.log4j.Logger;
+
+import voldemort.cluster.Cluster;
+import voldemort.store.StoreDefinition;
+import voldemort.xml.ClusterMapper;
+import voldemort.xml.StoreDefinitionsMapper;
+
+public class ContribUtils {
+
+ private static Logger logger = Logger.getLogger(ContribUtils.class);
+
+ public static String getFileFromPathList(Path[] pathList, String fileName) {
+ for(Path file: pathList) {
+ if(file.getName().equals(fileName)) {
+ return file.toUri().getPath();
+ }
+ }
+
+ logger.warn("File:" + fileName + " not found in given PathList");
+ return null;
+ }
+
+ public static Cluster getVoldemortClusterDetails(String clusterFile) throws IOException {
+ // create voldemort cluster definition using Distributed cache file
+ logger.info("Reading cluster details from file:" + clusterFile);
+ return new ClusterMapper().readCluster(new BufferedReader(new FileReader(clusterFile),
+ 1000000));
+ }
+
+ public static StoreDefinition getVoldemortStoreDetails(String storeFile, String storeName)
+ throws IOException {
+ logger.info("Reading Store details from file:" + storeFile + " for storeName:" + storeName);
+ List<StoreDefinition> stores = new StoreDefinitionsMapper().readStoreList(new BufferedReader(new FileReader(storeFile)));
+ for(StoreDefinition def: stores) {
+ if(def.getName().equals(storeName))
+ return def;
+ }
+ logger.info("Can't Find Store:" + storeName);
+ return null;
+ }
+
+ public static String getFileFromCache(JobConf conf, String fileName) throws IOException {
+ if("local".equals(conf.get("mapred.job.tracker"))) {
+ // For local mode Distributed cache is not set.
+ // try getting the raw file path.
+ URI[] uris = DistributedCache.getCacheFiles(conf);
+ Path[] paths = new Path[uris.length];
+ int index = 0;
+ for(URI uri: uris) {
+ logger.info("Adding uri:" + uri);
+ if(uri.getFragment().equals(fileName)) {
+ return uri.getPath();
+ }
+ }
+ return getFileFromPathList(paths, fileName);
+ } else {
+ // For Distributed filesystem.
+ Path[] pathList = DistributedCache.getLocalCacheFiles(conf);
+ return getFileFromPathList(pathList, fileName);
+ }
+ }
+}
View
6 src/java/voldemort/server/VoldemortConfig.java
@@ -67,6 +67,7 @@
private long readOnlyFileWaitTimeoutMs;
private int readOnlyBackups;
private String readOnlyStorageDir;
+ private long readOnlyCacheSize;
private int coreThreads;
private int maxThreads;
@@ -133,6 +134,7 @@ public VoldemortConfig(Props props) {
this.readOnlyStorageDir = props.getString("readonly.data.directory", this.dataDirectory
+ File.separator
+ "read-only");
+ this.readOnlyCacheSize = props.getInt("readonly.cache.size", 100 * 1000 * 1000);
this.slopStoreType = StorageEngineType.fromDisplay(props.getString("slop.store.engine",
StorageEngineType.BDB.toDisplay()));
@@ -557,6 +559,10 @@ public int getReadOnlyBackups() {
return readOnlyBackups;
}
+ public long getReadOnlyCacheSize() {
+ return readOnlyCacheSize;
+ }
+
public void setReadOnlyBackups(int readOnlyBackups) {
this.readOnlyBackups = readOnlyBackups;
}
View
5 src/java/voldemort/store/readonly/RandomAccessFileStorageConfiguration.java
@@ -38,6 +38,7 @@
private final long fileAccessWaitTimeoutMs;
private final File storageDir;
private final Set<ObjectName> registeredBeans;
+ private final int cacheSize;
public RandomAccessFileStorageConfiguration(VoldemortConfig config) {
this.numFileHandles = config.getReadOnlyStorageFileHandles();
@@ -45,6 +46,7 @@ public RandomAccessFileStorageConfiguration(VoldemortConfig config) {
this.fileAccessWaitTimeoutMs = config.getReadOnlyFileWaitTimeoutMs();
this.numBackups = config.getReadOnlyBackups();
this.registeredBeans = Collections.synchronizedSet(new HashSet<ObjectName>());
+ this.cacheSize = config.getReadOnlyBackups();
}
public void close() {
@@ -58,7 +60,8 @@ public void close() {
storageDir,
numBackups,
numFileHandles,
- fileAccessWaitTimeoutMs);
+ fileAccessWaitTimeoutMs,
+ cacheSize);
ObjectName objName = JmxUtils.createObjectName(JmxUtils.getPackageName(store.getClass()),
name);
JmxUtils.registerMbean(ManagementFactory.getPlatformMBeanServer(),
View
44 src/java/voldemort/store/readonly/RandomAccessFileStore.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -67,6 +68,9 @@
private final File dataFile;
private final File indexFile;
private final ReadWriteLock fileModificationLock;
+ private final ConcurrentHashMap<Long, byte[]> _cache;
+ private final int _maxDepth;
+
private BlockingQueue<RandomAccessFile> indexFiles;
private BlockingQueue<RandomAccessFile> dataFiles;
@@ -74,7 +78,8 @@ public RandomAccessFileStore(String name,
File storageDir,
int numBackups,
int numFileHandles,
- long waitTimeoutMs) {
+ long waitTimeoutMs,
+ long cacheSize) {
this.storageDir = storageDir;
this.numBackups = numBackups;
this.indexFile = new File(storageDir, name + ".index");
@@ -85,6 +90,11 @@ public RandomAccessFileStore(String name,
this.indexFiles = new ArrayBlockingQueue<RandomAccessFile>(numFileHandles);
this.numFileHandles = numFileHandles;
this.fileModificationLock = new ReentrantReadWriteLock();
+
+ int cacheElements = (int) Math.floor(cacheSize / 30);
+ _maxDepth = (int) Math.floor(Math.log(cacheElements) / Math.log(2));
+ _cache = new ConcurrentHashMap<Long, byte[]>(cacheElements);
+ logger.info("Cache configuration entries:" + cacheElements + " nIters:" + _maxDepth);
open();
}
@@ -229,13 +239,28 @@ private long getValueLocation(RandomAccessFile index, byte[] key) throws IOExcep
int chunkSize = KEY_HASH_SIZE + POSITION_SIZE;
long low = 0;
long high = indexFileSize / chunkSize - 1;
+ int iteration = 0;
while(low <= high) {
+ iteration++;
long mid = (low + high) / 2;
- index.seek(mid * chunkSize);
- index.readFully(foundKey);
+ boolean cached = true;
+
+ if(iteration < _maxDepth) {
+ // do cached lookup
+ readCachedKey(index, mid * chunkSize, foundKey);
+ } else {
+ // do direct lookup
+ index.seek(mid * chunkSize);
+ index.readFully(foundKey);
+ cached = false;
+ }
int cmp = ByteUtils.compare(foundKey, keyMd5);
if(cmp == 0) {
// they are equal, return the location stored here
+ if(cached) {
+ index.seek(mid * chunkSize);
+ index.readFully(foundKey);
+ }
return index.readLong();
} else if(cmp > 0) {
// midVal is bigger
@@ -249,6 +274,19 @@ private long getValueLocation(RandomAccessFile index, byte[] key) throws IOExcep
return -1;
}
+ private void readCachedKey(RandomAccessFile index, long seekPoint, byte[] foundKey)
+ throws IOException {
+ Object keyValue = _cache.get(seekPoint);
+ if(keyValue != null) {
+ System.arraycopy((byte[]) keyValue, 0, foundKey, 0, foundKey.length);
+ } else {
+ index.seek(seekPoint);
+ index.readFully(foundKey);
+
+ _cache.put(seekPoint, ByteUtils.copy(foundKey, 0, foundKey.length));
+ }
+ }
+
/**
* Not supported, throws UnsupportedOperationException if called
*/
View
71 test/integration/voldemort/performance/ReadOnlyStorePerformanceTest.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.performance;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import voldemort.server.VoldemortConfig;
+import voldemort.store.Store;
+import voldemort.store.readonly.RandomAccessFileStorageConfiguration;
+import voldemort.utils.Props;
+import voldemort.utils.Utils;
+import voldemort.versioning.ObsoleteVersionException;
+
+public class ReadOnlyStorePerformanceTest {
+
+ public static void main(String[] args) throws FileNotFoundException, IOException {
+ if(args.length != 4)
+ Utils.croak("USAGE: java " + ReadOnlyStorePerformanceTest.class.getName()
+ + " num-threads num-requests server-properties-file storeName");
+ int numThreads = Integer.parseInt(args[0]);
+ int numRequests = Integer.parseInt(args[1]);
+ String serverPropsFile = args[2];
+ String storeName = args[3];
+
+ final Store<byte[], byte[]> store = new RandomAccessFileStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(storeName);
+
+ final AtomicInteger obsoletes = new AtomicInteger(0);
+ final AtomicInteger nullResults = new AtomicInteger(0);
+ final AtomicInteger totalResults = new AtomicInteger(0);
+
+ PerformanceTest readWriteTest = new PerformanceTest() {
+
+ private final int MaxMemberID = (int) (30 * 1000 * 1000);
+
+ public void doOperation(int index) throws Exception {
+ try {
+ byte[] bytes = (new Integer((int) (Math.random() * MaxMemberID))).toString()
+ .getBytes();
+ totalResults.incrementAndGet();
+ if(null == store.get(bytes)) {
+ nullResults.incrementAndGet();
+ }
+ } catch(ObsoleteVersionException e) {
+ obsoletes.incrementAndGet();
+ }
+ }
+ };
+ readWriteTest.run(numRequests, numThreads);
+ System.out.println("Random Access Read Only store Results:");
+ System.out.println("null Reads ratio:" + (nullResults.doubleValue())
+ / totalResults.doubleValue());
+ readWriteTest.printStats();
+ }
+}
View
3  test/unit/voldemort/store/readonly/JsonStoreBuilderTest.java
@@ -116,7 +116,8 @@ public void setUp() throws Exception {
this.dataDir,
1,
3,
- 1000),
+ 1000,
+ 100 * 1000 * 1000),
serializer,
serializer);
}
View
7 test/unit/voldemort/store/readonly/TestRandomAccessFileStore.java
@@ -46,7 +46,12 @@ public void testOpenInvalidStoreFails(int indexBytes, int dataBytes, boolean sho
indexOs.close();
try {
- RandomAccessFileStore store = new RandomAccessFileStore("test", testDir, 1, 1, 1000);
+ RandomAccessFileStore store = new RandomAccessFileStore("test",
+ testDir,
+ 1,
+ 1,
+ 1000,
+ 100 * 1000 * 1000);
if(!shouldWork)
fail("Able to open corrupt read-only store (index size = " + indexBytes
+ ", data bytes = " + dataBytes + ").");
Please sign in to comment.
Something went wrong with that request. Please try again.