Permalink
Browse files

Hadoop Voldemort R/W Push

  • Loading branch information...
rsumbaly committed Jul 31, 2010
1 parent 246bcac commit ac255d25fde99ed258b7e6b12424fb4036a7c9b5
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+#
+# Copyright 2010 LinkedIn, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+base_dir=$(dirname $0)/..
+
+for file in $base_dir/dist/*.jar;
+do
+HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$file
+done
+
+for file in $base_dir/lib/*.jar;
+do
+HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$file
+done
+
+HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$base_dir/dist/resources
+export HADOOP_CLASSPATH
+
+$HADOOP_HOME/bin/hadoop voldemort.store.readwrite.mr.HadoopRWStoreJobRunner $@
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.store.readwrite.benchmark;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.jdom.JDOMException;
+
+import voldemort.VoldemortException;
+import voldemort.cluster.Cluster;
+import voldemort.server.VoldemortConfig;
+import voldemort.store.StoreDefinition;
+import voldemort.store.readwrite.mr.AbstractRWHadoopStoreBuilderMapper;
+import voldemort.store.readwrite.mr.HadoopRWStoreBuilder;
+import voldemort.store.readwrite.mr.HadoopRWStoreJobRunner;
+import voldemort.utils.ByteUtils;
+import voldemort.utils.Utils;
+import voldemort.xml.ClusterMapper;
+import voldemort.xml.StoreDefinitionsMapper;
+
+import com.google.common.collect.ImmutableCollection;
+
+/**
+ * Build a test read-write store from the generated data. Should have the
+ * Voldemort server running.
+ *
+ * We can use the data generated by
+ * {@link voldemort.store.readonly.benchmark.BuildTestStore}
+ *
+ */
+@SuppressWarnings("deprecation")
+public class BuildTestRWStore extends Configured implements Tool {
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new BuildTestRWStore(), args);
+ System.exit(res);
+ }
+
+ @SuppressWarnings("unchecked")
+ public int run(String[] args) throws Exception {
+ if(args.length != 4)
+ Utils.croak("Expected arguments store_name config_dir input_path temp_path");
+ String storeName = args[0];
+ String configDir = args[1];
+ String inputDir = args[2];
+ String tempDir = args[3];
+
+ List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new File(configDir,
+ "stores.xml"));
+ StoreDefinition def = null;
+ for(StoreDefinition d: storeDefs)
+ if(d.getName().equals(storeName))
+ def = d;
+ Cluster cluster = new ClusterMapper().readCluster(new File(configDir, "cluster.xml"));
+
+ Configuration config = this.getConf();
+ config.set("mapred.job.name", "test-store-builder");
+
+ Class[] deps = new Class[] { ImmutableCollection.class, JDOMException.class,
+ VoldemortConfig.class, HadoopRWStoreJobRunner.class, VoldemortException.class };
+
+ Configuration conf = getConf();
+ HadoopRWStoreJobRunner.addDepJars(conf, deps, new ArrayList<String>());
+
+ HadoopRWStoreBuilder builder = new HadoopRWStoreBuilder(config,
+ BuildTestStoreMapper.class,
+ SequenceFileInputFormat.class,
+ cluster,
+ def,
+ (long) (1.5 * 1024 * 1024 * 1024),
+ new Path(tempDir),
+ new Path(inputDir));
+ builder.build();
+ return 0;
+ }
+
+ public static class BuildTestStoreMapper extends
+ AbstractRWHadoopStoreBuilderMapper<BytesWritable, BytesWritable> {
+
+ @Override
+ public Object makeKey(BytesWritable key, BytesWritable value) {
+ return getValid(key);
+ }
+
+ @Override
+ public Object makeValue(BytesWritable key, BytesWritable value) {
+ return getValid(value);
+ }
+
+ private byte[] getValid(BytesWritable writable) {
+ if(writable.getSize() == writable.getCapacity())
+ return writable.get();
+ else
+ return ByteUtils.copy(writable.get(), 0, writable.getSize());
+ }
+
+ }
+}
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.store.readwrite.mr;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import voldemort.cluster.Node;
+import voldemort.routing.RoutingStrategy;
+import voldemort.routing.RoutingStrategyFactory;
+import voldemort.serialization.DefaultSerializerFactory;
+import voldemort.serialization.Serializer;
+import voldemort.serialization.SerializerDefinition;
+import voldemort.serialization.SerializerFactory;
+import voldemort.store.compress.CompressionStrategy;
+import voldemort.store.compress.CompressionStrategyFactory;
+import voldemort.store.readonly.ReadOnlyUtils;
+import voldemort.store.readonly.mr.AbstractStoreBuilderConfigurable;
+import voldemort.utils.ByteUtils;
+
+/**
+ * A base class that can be used for building voldemort read-only stores. To use
+ * it you need to override the makeKey and makeValue methods which specify how
+ * to construct the key and value from the values given in map().
+ *
+ * The values given by makeKey and makeValue will then be serialized with the
+ * appropriate voldemort Serializer.
+ *
+ *
+ */
+@SuppressWarnings("deprecation")
+public abstract class AbstractRWHadoopStoreBuilderMapper<K, V> extends
+ AbstractStoreBuilderConfigurable implements Mapper<K, V, BytesWritable, BytesWritable> {
+
+ private MessageDigest md5er;
+ private RoutingStrategy routingStrategy;
+ private Serializer<Object> keySerializer;
+ private Serializer<Object> valueSerializer;
+ private CompressionStrategy valueCompressor;
+ private CompressionStrategy keyCompressor;
+ private SerializerDefinition keySerializerDefinition;
+ private SerializerDefinition valueSerializerDefinition;
+ private int sizeInt = ByteUtils.SIZE_OF_INT;
+
+ public abstract Object makeKey(K key, V value);
+
+ public abstract Object makeValue(K key, V value);
+
+ /**
+ * Create the voldemort key and value from the input key and value and map
+ * it out for each of the responsible voldemort nodes
+ *
+ * The output key is the nodeId + chunkId. The output value is the voldemort
+ * key + voldemort value.
+ */
+ public void map(K key,
+ V value,
+ OutputCollector<BytesWritable, BytesWritable> output,
+ Reporter reporter) throws IOException {
+ byte[] keyBytes = keySerializer.toBytes(makeKey(key, value));
+ byte[] valBytes = valueSerializer.toBytes(makeValue(key, value));
+
+ // compress key and values if required
+ if(keySerializerDefinition.hasCompression()) {
+ keyBytes = keyCompressor.deflate(keyBytes);
+ }
+
+ if(valueSerializerDefinition.hasCompression()) {
+ valBytes = valueCompressor.deflate(valBytes);
+ }
+
+ // Generate value
+ byte[] outputValBytes = new byte[keyBytes.length + sizeInt + valBytes.length + sizeInt];
+ ByteUtils.writeInt(outputValBytes, keyBytes.length, 0);
+ System.arraycopy(keyBytes, 0, outputValBytes, sizeInt, keyBytes.length);
+ ByteUtils.writeInt(outputValBytes, valBytes.length, sizeInt + keyBytes.length);
+ System.arraycopy(valBytes,
+ 0,
+ outputValBytes,
+ sizeInt + sizeInt + keyBytes.length,
+ valBytes.length);
+ BytesWritable outputVal = new BytesWritable(outputValBytes);
+
+ // Generate key
+ int chunkId = ReadOnlyUtils.chunk(md5er.digest(keyBytes), getNumChunks());
+ List<Node> nodeList = routingStrategy.routeRequest(keyBytes);
+ for(Node node: nodeList) {
+ byte[] outputKeyBytes = new byte[sizeInt + sizeInt];
+ ByteUtils.writeInt(outputKeyBytes, node.getId(), 0);
+ ByteUtils.writeInt(outputKeyBytes, chunkId, sizeInt);
+ BytesWritable outputKey = new BytesWritable(outputKeyBytes);
+ output.collect(outputKey, outputVal);
+ }
+ md5er.reset();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void configure(JobConf conf) {
+ super.configure(conf);
+
+ md5er = ByteUtils.getDigest("md5");
+ keySerializerDefinition = getStoreDef().getKeySerializer();
+ valueSerializerDefinition = getStoreDef().getValueSerializer();
+
+ try {
+ SerializerFactory factory = new DefaultSerializerFactory();
+
+ if(conf.get("serializer.factory") != null) {
+ factory = (SerializerFactory) Class.forName(conf.get("serializer.factory"))
+ .newInstance();
+ }
+
+ keySerializer = (Serializer<Object>) factory.getSerializer(keySerializerDefinition);
+ valueSerializer = (Serializer<Object>) factory.getSerializer(valueSerializerDefinition);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ keyCompressor = new CompressionStrategyFactory().get(keySerializerDefinition.getCompression());
+ valueCompressor = new CompressionStrategyFactory().get(valueSerializerDefinition.getCompression());
+
+ RoutingStrategyFactory factory = new RoutingStrategyFactory();
+ routingStrategy = factory.updateRoutingStrategy(getStoreDef(), getCluster());
+ }
+}
Oops, something went wrong.

0 comments on commit ac255d2

Please sign in to comment.