Permalink
Browse files

Hadoop + Pig integration

-> ./contrib/hadoop/bin/hadoop-word-count.sh <store_name> <adminclient_url>
-> ./contrib/hadoop/bin/voldemort-pig.sh
  • Loading branch information...
1 parent 8109199 commit 2c86009527261562ce02fca2665492f317af38aa @rsumbaly rsumbaly committed Jun 11, 2010
Showing with 875 additions and 87 deletions.
  1. +4 −1 .classpath
  2. +7 −0 clients/python/voldemort_admin_pb2.py
  3. +1 −0 contrib/hadoop-store-builder/lib/README
  4. BIN contrib/hadoop-store-builder/lib/hadoop-0.18.1-core.jar
  5. BIN contrib/hadoop-store-builder/lib/hadoop-0.20.2-core.jar
  6. +48 −0 contrib/hadoop/bin/hadoop-word-count.sh
  7. +40 −0 contrib/hadoop/bin/voldemort-pig.sh
  8. BIN contrib/hadoop/lib/pig-0.7.0-core.jar
  9. +46 −0 contrib/hadoop/src/java/voldemort/hadoop/VoldemortHadoopConfig.java
  10. +81 −0 contrib/hadoop/src/java/voldemort/hadoop/VoldemortInputFormat.java
  11. +97 −0 contrib/hadoop/src/java/voldemort/hadoop/VoldemortInputSplit.java
  12. +85 −0 contrib/hadoop/src/java/voldemort/hadoop/VoldemortRecordReader.java
  13. +90 −0 contrib/hadoop/src/java/voldemort/hadoop/pig/VoldemortStore.java
  14. +186 −0 contrib/hadoop/test/voldemort/hadoop/VoldemortWordCount.java
  15. +3 −2 src/java/voldemort/VoldemortAdminTool.java
  16. +20 −5 src/java/voldemort/client/protocol/admin/AdminClient.java
  17. +29 −0 src/java/voldemort/client/protocol/admin/filter/MasterOnlyVoldemortFilter.java
  18. +106 −66 src/java/voldemort/client/protocol/pb/VAdminProto.java
  19. +2 −1 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
  20. +12 −4 src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java
  21. +2 −2 src/java/voldemort/utils/VoldemortAdminClientShell.java
  22. +1 −0 src/proto/voldemort-admin.proto
  23. +2 −1 test/integration/voldemort/performance/AdminTest.java
  24. +2 −1 test/unit/voldemort/client/AbstractAdminServiceFilterTest.java
  25. +4 −2 test/unit/voldemort/client/AdminServiceBasicTest.java
  26. +7 −2 test/unit/voldemort/client/AdminServiceFailureTest.java
View
@@ -15,6 +15,8 @@
<classpathentry kind="src" path="example/java"/>
<classpathentry kind="src" path="contrib/krati/src/java"/>
<classpathentry kind="src" path="contrib/krati/test"/>
+ <classpathentry kind="src" path="contrib/hadoop/test"/>
+ <classpathentry kind="src" path="contrib/hadoop/src/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="lib/catalina-ant.jar"/>
<classpathentry kind="lib" path="lib/commons-codec-1.3.jar"/>
@@ -23,7 +25,7 @@
<classpathentry kind="lib" path="lib/colt-1.2.0.jar"/>
<classpathentry kind="lib" path="contrib/mongodb/lib/mongo-xjdm.jar"/>
<classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/commons-cli-2.0-SNAPSHOT.jar"/>
- <classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-0.18.1-core.jar"/>
+ <classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-0.20.2-core.jar"/>
<classpathentry kind="lib" path="lib/junit-4.6.jar"/>
<classpathentry kind="lib" path="lib/log4j-1.2.15.jar"/>
<classpathentry kind="lib" path="lib/jetty-6.1.18.jar"/>
@@ -51,5 +53,6 @@
<classpathentry kind="lib" path="lib/jackson-core-asl-1.4.0.jar"/>
<classpathentry kind="lib" path="lib/avro-modified-jdk5-1.3.0.jar"/>
<classpathentry kind="lib" path="contrib/krati/lib/krati-0.3.3.jar"/>
+ <classpathentry kind="lib" path="contrib/hadoop/lib/pig-0.7.0-core.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
@@ -0,0 +1 @@
+Java 5 users would need to exclude Hadoop 0.20.2 manually while replacing it with Hadoop 0.18.*
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,48 @@
+#!/bin/bash
+
+#
+# Copyright 2010 LinkedIn, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ $# -ne 2 ];
+then
+ echo 'USAGE: bin/hadoop-word-count.sh store_name admin_bootstrap_url'
+ exit 1
+fi
+
+base_dir=$(dirname $0)/../../..
+
+for file in $base_dir/dist/*.jar;
+do
+ HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$file
+done
+
+for file in $base_dir/lib/*.jar;
+do
+ HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$file
+done
+
+if [ -n "${HADOOP_HOME}" ];
+then
+ HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME
+else
+ echo 'Missing HADOOP_HOME'
+ exit 1
+fi
+
+HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$base_dir/dist/resources:$VOLDEMORT_HOME/lib/
+export HADOOP_CLASSPATH
+
+$HADOOP_HOME/bin/hadoop voldemort.hadoop.VoldemortWordCount $@
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Copyright 2010 LinkedIn, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+base_dir=$(dirname $0)/../../..
+
+CLASSPATH=$home_dir/common/hadoop-conf/dev/conf:$home_dir/dist/jobs/
+
+for file in $base_dir/dist/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/lib/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/contrib/hadoop/lib/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+export CLASSPATH
+
+java -Xmx4G $PIG_JAVA_OPTS -Dudf.import.list=voldemort.hadoop.pig -cp $CLASSPATH org.apache.pig.Main $*
Binary file not shown.
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Common class shared by InputFormat
+ *
+ */
+public class VoldemortHadoopConfig {
+
+ private static final String URL = "voldemort.url";
+ private static final String STORE_NAME = "voldemort.store.name";
+
+ public static void setVoldemortURL(Configuration conf, String url) {
+ conf.set(URL, url);
+ }
+
+ public static void setVoldemortStoreName(Configuration conf, String storeName) {
+ conf.set(STORE_NAME, storeName);
+ }
+
+ public static String getVoldemortURL(Configuration conf) {
+ return conf.get(URL);
+ }
+
+ public static String getVoldemortStoreName(Configuration conf) {
+ return conf.get(STORE_NAME);
+ }
+
+}
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import voldemort.client.protocol.admin.AdminClient;
+import voldemort.client.protocol.admin.AdminClientConfig;
+import voldemort.cluster.Cluster;
+import voldemort.cluster.Node;
+import voldemort.utils.ByteArray;
+import voldemort.versioning.Versioned;
+
+public class VoldemortInputFormat extends InputFormat<ByteArray, Versioned<byte[]>> {
+
+ /**
+ * Create a new connection to admin client and give it to RecordReader.
+ * Called on the TaskTracker
+ */
+ @Override
+ public RecordReader<ByteArray, Versioned<byte[]>> createRecordReader(InputSplit currentSplit,
+ TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ return new VoldemortRecordReader();
+ }
+
+ /**
+ * One mapper for every node. Every InputSplit then connects to the
+ * particular node. Called on JobClient
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ String bootstrapURL = VoldemortHadoopConfig.getVoldemortURL(conf);
+ String storeName = VoldemortHadoopConfig.getVoldemortStoreName(conf);
+
+ AdminClient adminClient = new AdminClient(bootstrapURL, new AdminClientConfig());
+
+ /**
+ * TODO: To put check to see if store exists
+ */
+ Cluster cluster = adminClient.getAdminClientCluster();
+ Collection<Node> nodes = cluster.getNodes();
+ Iterator<Node> nodeIter = nodes.iterator();
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ while(nodeIter.hasNext()) {
+ Node currentNode = nodeIter.next();
+ VoldemortInputSplit split = new VoldemortInputSplit(storeName, currentNode);
+ splits.add(split);
+ }
+
+ adminClient.stop();
+ return splits;
+ }
+}
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import voldemort.cluster.Node;
+
+public class VoldemortInputSplit extends InputSplit implements Writable {
+
+ private String storeName;
+ private String hostName;
+ private Integer nodeId;
+ private Integer adminPort;
+
+ public VoldemortInputSplit(String storeName, Node node) {
+ this.storeName = storeName;
+ this.hostName = node.getHost();
+ this.nodeId = node.getId();
+ this.adminPort = node.getAdminPort();
+ }
+
+ /**
+ * Is used to order the splits so that the largest get processed first, in
+ * an attempt to minimize the job runtime...Voldemort doesn't care!
+ */
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ public String getStoreName() {
+ return this.storeName;
+ }
+
+ public String getHostName() {
+ return this.hostName;
+ }
+
+ public Integer getNodeId() {
+ return this.nodeId;
+ }
+
+ public Integer getAdminPort() {
+ return this.adminPort;
+ }
+
+ /**
+ * Returns the location of the split. Since the current scheme is only one
+ * mapper per node, this is an array of one node only.
+ */
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[] { this.hostName };
+ }
+
+ public void readFields(DataInput inputStream) throws IOException {
+ this.storeName = inputStream.readUTF();
+ this.hostName = inputStream.readUTF();
+ this.nodeId = inputStream.readInt();
+ this.adminPort = inputStream.readInt();
+ }
+
+ public void write(DataOutput outputStream) throws IOException {
+ outputStream.writeUTF(storeName);
+ outputStream.writeUTF(hostName);
+ outputStream.writeInt(nodeId);
+ outputStream.writeInt(adminPort);
+ }
+
+ protected VoldemortInputSplit() {}
+
+ public static VoldemortInputSplit read(DataInput in) throws IOException {
+ VoldemortInputSplit split = new VoldemortInputSplit();
+ split.readFields(in);
+ return split;
+ }
+}
Oops, something went wrong.

0 comments on commit 2c86009

Please sign in to comment.