Permalink
Browse files

Added ability to keep retrying with bumped up chunk size ( because ge…

…nerally that is the cause of the job failing )
  • Loading branch information...
rsumbaly committed Aug 16, 2011
1 parent 7582d55 commit 3c0a7117344d7e85210f879d509ebc589d93fd1a
@@ -404,31 +404,26 @@ public HadoopStoreBuilder(Configuration conf,
*/
public void build() {
try {
+ RunningJob job = null;
+ boolean tryAgain = false;
+ int attemptId = 1;
+
JobConf conf = new JobConf(config);
- conf.setInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
- conf.set("cluster.xml", new ClusterMapper().writeCluster(cluster));
- conf.set("stores.xml",
- new StoreDefinitionsMapper().writeStoreList(Collections.singletonList(storeDef)));
- conf.setBoolean("save.keys", saveKeys);
- conf.setBoolean("reducer.per.bucket", reducerPerBucket);
- conf.setPartitionerClass(HadoopStoreBuilderPartitioner.class);
- conf.setMapperClass(mapperClass);
- conf.setMapOutputKeyClass(BytesWritable.class);
- conf.setMapOutputValueClass(BytesWritable.class);
- if(reducerPerBucket) {
- conf.setReducerClass(HadoopStoreBuilderReducerPerBucket.class);
- } else {
- conf.setReducerClass(HadoopStoreBuilderReducer.class);
+ FileSystem outputFs = outputDir.getFileSystem(conf);
+ if(outputFs.exists(outputDir)) {
+ throw new IOException("Final output directory already exists.");
}
CompressionStrategy strategy = null;
+ boolean compressionExists = true;
- // Do compression only if we haven't done a row level compression
+ // Do compression only if we haven't done a row level
+ // compression
if(compression != null && storeDef.getValueSerializer().getCompression() == null
&& storeDef.getKeySerializer().getCompression() == null) {
- boolean compressionExists = true;
- // Create a compression factory to check if the algorithm exists
+ // Create a compression factory to check if the algorithm
+ // exists
try {
strategy = new CompressionStrategyFactory().get(compression);
logger.info("Successful instantiated compression type " + compression.getType());
@@ -438,79 +433,107 @@ public void build() {
compressionExists = false;
}
- if(compressionExists && strategy != null) {
- conf.set("push.compress", compression.getType());
- }
}
- conf.setInputFormat(inputFormatClass);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
- conf.setOutputKeyClass(BytesWritable.class);
- conf.setOutputValueClass(BytesWritable.class);
- conf.setJarByClass(getClass());
- conf.setReduceSpeculativeExecution(false);
- FileInputFormat.setInputPaths(conf, inputPath);
- conf.set("final.output.dir", outputDir.toString());
- conf.set("checksum.type", CheckSum.toString(checkSumType));
- FileOutputFormat.setOutputPath(conf, tempDir);
-
- FileSystem outputFs = outputDir.getFileSystem(conf);
- if(outputFs.exists(outputDir)) {
- throw new IOException("Final output directory already exists.");
- }
-
- // delete output dir if it already exists
- FileSystem tempFs = tempDir.getFileSystem(conf);
- tempFs.delete(tempDir, true);
-
- long size = sizeOfPath(tempFs, inputPath);
- logger.info("Data size = " + size + ", replication factor = "
- + storeDef.getReplicationFactor() + ", numNodes = "
- + cluster.getNumberOfNodes() + ", chunk size = " + chunkSizeBytes);
-
- // Derive "rough" number of chunks and reducers
- int numReducers;
- if(saveKeys) {
-
- if(this.numChunks == -1) {
- this.numChunks = Math.max((int) (storeDef.getReplicationFactor() * size
- / cluster.getNumberOfPartitions()
- / storeDef.getReplicationFactor() / chunkSizeBytes),
- 1);
- } else {
- logger.info("Overriding chunk size byte and taking num chunks ("
- + this.numChunks + ") directly");
- }
+ do {
+ conf.setInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ conf.set("cluster.xml", new ClusterMapper().writeCluster(cluster));
+ conf.set("stores.xml",
+ new StoreDefinitionsMapper().writeStoreList(Collections.singletonList(storeDef)));
+ conf.setBoolean("save.keys", saveKeys);
+ conf.setBoolean("reducer.per.bucket", reducerPerBucket);
+ conf.setPartitionerClass(HadoopStoreBuilderPartitioner.class);
+ conf.setMapperClass(mapperClass);
+ conf.setMapOutputKeyClass(BytesWritable.class);
+ conf.setMapOutputValueClass(BytesWritable.class);
if(reducerPerBucket) {
- numReducers = cluster.getNumberOfPartitions() * storeDef.getReplicationFactor();
+ conf.setReducerClass(HadoopStoreBuilderReducerPerBucket.class);
} else {
- numReducers = cluster.getNumberOfPartitions() * storeDef.getReplicationFactor()
- * numChunks;
+ conf.setReducerClass(HadoopStoreBuilderReducer.class);
}
- } else {
-
- if(this.numChunks == -1) {
- this.numChunks = Math.max((int) (storeDef.getReplicationFactor() * size
- / cluster.getNumberOfPartitions() / chunkSizeBytes),
- 1);
- } else {
- logger.info("Overriding chunk size byte and taking num chunks ("
- + this.numChunks + ") directly");
+ if(compressionExists && strategy != null) {
+ conf.set("push.compress", compression.getType());
}
- if(reducerPerBucket) {
- numReducers = cluster.getNumberOfPartitions();
+ conf.setInputFormat(inputFormatClass);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputKeyClass(BytesWritable.class);
+ conf.setOutputValueClass(BytesWritable.class);
+ conf.setJarByClass(getClass());
+ conf.setReduceSpeculativeExecution(false);
+ conf.setMaxReduceAttempts(1);
+ FileInputFormat.setInputPaths(conf, inputPath);
+ conf.set("final.output.dir", outputDir.toString());
+ conf.set("checksum.type", CheckSum.toString(checkSumType));
+ FileOutputFormat.setOutputPath(conf, tempDir);
+
+ // delete output dir if it already exists
+ FileSystem tempFs = tempDir.getFileSystem(conf);
+ tempFs.delete(tempDir, true);
+
+ long size = sizeOfPath(tempFs, inputPath);
+ logger.info("Data size = " + size + ", replication factor = "
+ + storeDef.getReplicationFactor() + ", numNodes = "
+ + cluster.getNumberOfNodes() + ", chunk size = " + chunkSizeBytes);
+
+ // Derive "rough" number of chunks and reducers
+ int numReducers;
+ if(saveKeys) {
+ if(this.numChunks == -1) {
+ this.numChunks = Math.max((int) (storeDef.getReplicationFactor() * size
+ / cluster.getNumberOfPartitions()
+ / storeDef.getReplicationFactor() / chunkSizeBytes),
+ 1);
+ } else {
+ logger.info("Overriding chunk size byte and taking num chunks ("
+ + this.numChunks + ") directly");
+ }
+ if(reducerPerBucket) {
+ numReducers = cluster.getNumberOfPartitions()
+ * storeDef.getReplicationFactor();
+ } else {
+ numReducers = cluster.getNumberOfPartitions()
+ * storeDef.getReplicationFactor() * numChunks;
+ }
} else {
- numReducers = cluster.getNumberOfPartitions() * numChunks;
+
+ if(this.numChunks == -1) {
+ this.numChunks = Math.max((int) (storeDef.getReplicationFactor() * size
+ / cluster.getNumberOfPartitions() / chunkSizeBytes),
+ 1);
+ } else {
+ logger.info("Overriding chunk size byte and taking num chunks ("
+ + this.numChunks + ") directly");
+ }
+ if(reducerPerBucket) {
+ numReducers = cluster.getNumberOfPartitions();
+ } else {
+ numReducers = cluster.getNumberOfPartitions() * numChunks;
+ }
}
- }
- conf.setInt("num.chunks", numChunks);
- conf.setNumReduceTasks(numReducers);
+ conf.setInt("num.chunks", numChunks);
+ conf.setNumReduceTasks(numReducers);
- logger.info("Number of chunks: " + numChunks + ", number of reducers: " + numReducers
- + ", save keys: " + saveKeys + ", reducer per bucket: " + reducerPerBucket);
- logger.info("Building store...");
- RunningJob job = JobClient.runJob(conf);
+ logger.info("Attempt " + attemptId + "] Number of chunks: " + numChunks
+ + ", number of reducers: " + numReducers + ", save keys: " + saveKeys
+ + ", reducer per bucket: " + reducerPerBucket);
+ logger.info("Building store...");
+ try {
+ job = JobClient.runJob(conf);
+ tryAgain = false;
+ } catch(Exception e) {
+ logger.error("Got exception during attempt " + attemptId
+ + ". Trying again with chunks = " + (numChunks + 1), e);
+ attemptId++;
+ if(attemptId < 3) {
+ tryAgain = true;
+ } else {
+ logger.error("Completed 3 attempts. Failing the full job");
+ throw e;
+ }
+ }
+ numChunks++;
+ } while(tryAgain);
// Once the job has completed log the counter
Counters counters = job.getCounters();

0 comments on commit 3c0a711

Please sign in to comment.