Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Some more fixes - This version works! :)

  • Loading branch information...
commit fbaf2b1416a60d048ffff757ff59ec4c1820a85e 1 parent c1f65ce
@rsumbaly authored
View
2  contrib/hadoop-store-builder/src/java/voldemort/store/btree/mr/AbstractBTreeStoreBuilderConfigurable.java
@@ -47,7 +47,7 @@ public void configure(JobConf conf) {
if(storeDefs.size() != 1)
throw new IllegalStateException("Expected to find only a single store, but found multiple!");
this.storeDef = storeDefs.get(0);
- this.numChunks = conf.getInt("num.chunks", 1);
+ this.numChunks = conf.getInt("num.chunks", -1);
}
@SuppressWarnings("unused")
View
8 contrib/hadoop-store-builder/src/java/voldemort/store/btree/mr/AbstractHadoopBTreeStoreBuilderMapper.java
@@ -17,6 +17,7 @@
package voldemort.store.btree.mr;
import java.io.IOException;
+import java.security.MessageDigest;
import java.util.List;
import org.apache.hadoop.io.BytesWritable;
@@ -49,6 +50,7 @@
public abstract class AbstractHadoopBTreeStoreBuilderMapper<K, V> extends
AbstractBTreeStoreBuilderConfigurable implements Mapper<K, V, BytesWritable, BytesWritable> {
+ protected MessageDigest md5er;
protected ConsistentRoutingStrategy routingStrategy;
protected Serializer<Object> keySerializer;
protected Serializer<Object> valueSerializer;
@@ -117,7 +119,9 @@ public void map(K key,
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length);
// Generate MR key - 16 byte md5
- outputKey = new BytesWritable(keyBytes);
+ outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes),
+ 0,
+ 2 * ByteUtils.SIZE_OF_INT));
// Generate partition and node list this key is destined for
List<Integer> partitionList = routingStrategy.getPartitionList(keyBytes);
@@ -144,6 +148,7 @@ public void map(K key,
output.collect(outputKey, outputVal);
}
+ md5er.reset();
}
@Override
@@ -151,6 +156,7 @@ public void map(K key,
public void configure(JobConf conf) {
super.configure(conf);
+ md5er = ByteUtils.getDigest("md5");
keySerializerDefinition = getStoreDef().getKeySerializer();
valueSerializerDefinition = getStoreDef().getValueSerializer();
View
25 contrib/hadoop-store-builder/src/java/voldemort/store/btree/mr/HadoopBTreeStoreBuilder.java
@@ -65,6 +65,7 @@
private final Path inputPath;
private final Path outputDir;
private final Path tempDir;
+ private final int numChunks;
/**
* Create the store builder
@@ -81,13 +82,14 @@
*/
@SuppressWarnings("unchecked")
public HadoopBTreeStoreBuilder(Configuration conf,
- Class<? extends AbstractHadoopBTreeStoreBuilderMapper<?, ?>> mapperClass,
- Class<? extends InputFormat> inputFormatClass,
- Cluster cluster,
- StoreDefinition storeDef,
- Path tempDir,
- Path outputDir,
- Path inputPath) {
+ Class<? extends AbstractHadoopBTreeStoreBuilderMapper<?, ?>> mapperClass,
+ Class<? extends InputFormat> inputFormatClass,
+ Cluster cluster,
+ StoreDefinition storeDef,
+ Path tempDir,
+ Path outputDir,
+ Path inputPath,
+ int numChunks) {
super();
this.config = conf;
this.mapperClass = Utils.notNull(mapperClass);
@@ -97,6 +99,7 @@ public HadoopBTreeStoreBuilder(Configuration conf,
this.storeDef = Utils.notNull(storeDef);
this.tempDir = tempDir;
this.outputDir = Utils.notNull(outputDir);
+ this.numChunks = numChunks;
}
/**
@@ -123,6 +126,7 @@ public void build() {
FileInputFormat.setInputPaths(conf, inputPath);
conf.set("final.output.dir", outputDir.toString());
FileOutputFormat.setOutputPath(conf, tempDir);
+ conf.setInt("num.chunks", numChunks);
FileSystem outputFs = outputDir.getFileSystem(conf);
if(outputFs.exists(outputDir)) {
@@ -139,10 +143,11 @@ public void build() {
+ cluster.getNumberOfNodes());
// Derive "rough" number of chunks and reducers
- int numReducers = cluster.getNumberOfPartitions() * storeDef.getReplicationFactor();
+ int numReducers = cluster.getNumberOfPartitions() * storeDef.getReplicationFactor()
+ * numChunks;
conf.setNumReduceTasks(numReducers);
- logger.info("Number of reducer: " + numReducers);
+ logger.info("Number of reducer: " + numReducers + " , chunks: " + numChunks);
logger.info("Building store...");
JobClient.runJob(conf);
@@ -151,7 +156,7 @@ public void build() {
ReadOnlyStorageMetadata metadata = new ReadOnlyStorageMetadata();
- metadata.add(ReadOnlyStorageMetadata.FORMAT, ReadOnlyStorageFormat.BDB.getCode());
+ metadata.add(ReadOnlyStorageMetadata.FORMAT, ReadOnlyStorageFormat.BTREE.getCode());
Path nodePath = new Path(outputDir.toString(), "node-" + node.getId());
View
3  contrib/hadoop-store-builder/src/java/voldemort/store/btree/mr/HadoopBTreeStoreBuilderReducer.java
@@ -222,6 +222,9 @@ public void configure(JobConf job) {
this.indexBlockSizePerLevel = Maps.newHashMap();
this.indexBlockSizePerLevel.put(maxLevel, 0L);
+ logger.info("Opening " + this.taskValueFileName + " and " + this.indexPaths.get(0L)
+ + " for writing.");
+
} catch(IOException e) {
throw new RuntimeException("Failed to open Input/OutputStream", e);
}
View
20 contrib/hadoop-store-builder/src/java/voldemort/store/btree/mr/HadoopBTreeStoreJobRunner.java
@@ -70,6 +70,7 @@ private static OptionParser configureParser() {
parser.accepts("storename", "store name from store definition.").withRequiredArg();
parser.accepts("inputformat", "JavaClassName (default=text).").withRequiredArg();
parser.accepts("jar", "mapper class jar if not in $HADOOP_CLASSPATH.").withRequiredArg();
+ parser.accepts("chunks", "number of chunks ( overrides the chunk size )");
parser.accepts("help", "print usage information");
return parser;
}
@@ -102,7 +103,7 @@ public int run(String[] args) throws Exception {
File clusterFile = new File((String) options.valueOf("cluster"));
Cluster cluster = new ClusterMapper().readCluster(new BufferedReader(new FileReader(clusterFile)));
-
+ int chunks = CmdUtils.valueOf(options, "chunks", 1);
File storeDefFile = new File((String) options.valueOf("storedefinitions"));
String storeName = (String) options.valueOf("storename");
List<StoreDefinition> stores;
@@ -129,7 +130,7 @@ public int run(String[] args) throws Exception {
}
Class<? extends AbstractHadoopBTreeStoreBuilderMapper<?, ?>> mapperClass = (Class<? extends AbstractHadoopBTreeStoreBuilderMapper<?, ?>>) ReflectUtils.loadClass((String) options.valueOf("mapper"),
- cl);
+ cl);
Class<? extends InputFormat<?, ?>> inputFormatClass = TextInputFormat.class;
if(options.has("inputformat")) {
@@ -158,13 +159,14 @@ public int run(String[] args) throws Exception {
addDepJars(conf, deps, addJars);
HadoopBTreeStoreBuilder builder = new HadoopBTreeStoreBuilder(conf,
- mapperClass,
- inputFormatClass,
- cluster,
- storeDef,
- tempDir,
- outputDir,
- inputPath);
+ mapperClass,
+ inputFormatClass,
+ cluster,
+ storeDef,
+ tempDir,
+ outputDir,
+ inputPath,
+ chunks);
builder.build();
return 0;
Please sign in to comment.
Something went wrong with that request. Please try again.