Permalink
Browse files

Moved ch05 to new MR API.

  • Loading branch information...
1 parent bbadb8e commit 545166b971983a4edf2d2b32bcfc3a3f3f28c13c @tomwhite committed Sep 18, 2011
Showing with 342 additions and 266 deletions.
  1. +1 −0 ch02/src/main/java/MaxTemperatureMapper.java
  2. +1 −0 ch02/src/main/java/MaxTemperatureReducer.java
  3. +5 −0 ch05/input/ncdc/micro/sample.txt
  4. +7 −7 ch05/src/main/java/v1/MaxTemperatureMapper.java
  5. +10 −13 ch05/src/main/java/v1/MaxTemperatureReducer.java
  6. +19 −16 ch05/src/main/java/v2/MaxTemperatureDriver.java
  7. +11 −8 ch05/src/main/java/v2/MaxTemperatureMapper.java
  8. +21 −17 ch05/src/main/java/v3/MaxTemperatureDriver.java
  9. +11 −8 ch05/src/main/java/v3/MaxTemperatureMapper.java
  10. +21 −17 ch05/src/main/java/v4/MaxTemperatureDriver.java
  11. +10 −12 ch05/src/main/java/v4/MaxTemperatureMapper.java
  12. +21 −17 ch05/src/main/java/v5/MaxTemperatureDriver.java
  13. +8 −11 ch05/src/main/java/v5/MaxTemperatureMapper.java
  14. +34 −22 ch05/src/main/java/v6/MaxTemperatureDriver.java
  15. +21 −17 ch05/src/main/java/v7/MaxTemperatureDriver.java
  16. +10 −12 ch05/src/main/java/v7/MaxTemperatureMapper.java
  17. +19 −15 ch05/src/{main → test}/java/v1/MaxTemperatureMapperTest.java
  18. +8 −7 ch05/src/{main → test}/java/v1/MaxTemperatureReducerTest.java
  19. +14 −12 ch05/src/{main → test}/java/v2/MaxTemperatureMapperTest.java
  20. +14 −4 ch05/src/{main → test}/java/v3/MaxTemperatureDriverMiniTest.java
  21. +15 −7 ch05/src/{main → test}/java/v3/MaxTemperatureDriverTest.java
  22. +28 −22 ch05/src/{main → test}/java/v3/MaxTemperatureMapperTest.java
  23. +26 −19 ch05/src/{main → test}/java/v5/MaxTemperatureMapperTest.java
  24. 0 ch05/src/{main/java/v3 → test/resources}/expected.txt
  25. +0 −1 common/src/main/java/JobBuilder.java
  26. +2 −0 snippet/bin/check_manuscript.sh
  27. +4 −1 snippet/bin/generate_listings.sh
  28. +1 −1 snippet/src/test/java/ExamplesIT.java
@@ -12,6 +12,7 @@
private static final int MISSING = 9999;
+ @Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
@@ -11,6 +11,7 @@
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
+ @Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
@@ -0,0 +1,5 @@
+0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
+0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
+0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
+0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
+0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
@@ -2,19 +2,19 @@
// cc MaxTemperatureMapperV1 First version of a Mapper that passes MaxTemperatureMapperTest
import java.io.IOException;
import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.*;
//vv MaxTemperatureMapperV1
-public class MaxTemperatureMapper extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, IntWritable> {
+public class MaxTemperatureMapper
+ extends Mapper<LongWritable, Text, Text, IntWritable> {
- public void map(LongWritable key, Text value,
- OutputCollector<Text, IntWritable> output, Reporter reporter)
- throws IOException {
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature = Integer.parseInt(line.substring(87, 92));
- output.collect(new Text(year), new IntWritable(airTemperature));
+ context.write(new Text(year), new IntWritable(airTemperature));
}
}
//^^ MaxTemperatureMapperV1
@@ -1,28 +1,25 @@
package v1;
// == MaxTemperatureReducerV1
import java.io.IOException;
-import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Reducer;
// vv MaxTemperatureReducerV1
-public class MaxTemperatureReducer extends MapReduceBase
- implements Reducer<Text, IntWritable, Text, IntWritable> {
+public class MaxTemperatureReducer
+ extends Reducer<Text, IntWritable, Text, IntWritable> {
- public void reduce(Text key, Iterator<IntWritable> values,
- OutputCollector<Text, IntWritable> output, Reporter reporter)
- throws IOException {
+ @Override
+ public void reduce(Text key, Iterable<IntWritable> values,
+ Context context)
+ throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
- while (values.hasNext()) {
- maxValue = Math.max(maxValue, values.next().get());
+ for (IntWritable value : values) {
+ maxValue = Math.max(maxValue, value.get());
}
- output.collect(key, new IntWritable(maxValue));
+ context.write(key, new IntWritable(maxValue));
}
}
// ^^ MaxTemperatureReducerV1
@@ -1,11 +1,15 @@
package v2;
// cc MaxTemperatureDriverV2 Application to find the maximum temperature
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import v1.MaxTemperatureReducer;
@@ -21,21 +25,20 @@ public int run(String[] args) throws Exception {
return -1;
}
- JobConf conf = new JobConf(getConf(), getClass());
- conf.setJobName("Max temperature");
+ Job job = new Job(getConf(), "Max temperature");
+ job.setJarByClass(getClass());
- FileInputFormat.addInputPath(conf, new Path(args[0]));
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(IntWritable.class);
+ job.setMapperClass(MaxTemperatureMapper.class);
+ job.setCombinerClass(MaxTemperatureReducer.class);
+ job.setReducerClass(MaxTemperatureReducer.class);
- conf.setMapperClass(MaxTemperatureMapper.class);
- conf.setCombinerClass(MaxTemperatureReducer.class);
- conf.setReducerClass(MaxTemperatureReducer.class);
-
- JobClient.runJob(conf);
- return 0;
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
@@ -2,23 +2,26 @@
//== MaxTemperatureMapperV2
import java.io.IOException;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-public class MaxTemperatureMapper extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, IntWritable> {
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class MaxTemperatureMapper
+ extends Mapper<LongWritable, Text, Text, IntWritable> {
//vv MaxTemperatureMapperV2
- public void map(LongWritable key, Text value,
- OutputCollector<Text, IntWritable> output, Reporter reporter)
- throws IOException {
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
/*[*/String temp = line.substring(87, 92);
if (!missing(temp)) {/*]*/
int airTemperature = Integer.parseInt(temp);
- output.collect(new Text(year), new IntWritable(airTemperature));
+ context.write(new Text(year), new IntWritable(airTemperature));
/*[*/}/*]*/
}
@@ -2,14 +2,19 @@
// vv MaxTemperatureDriverV3
package v3;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import v1.MaxTemperatureReducer;
+// Identical to v2 except for v3 mapper
public class MaxTemperatureDriver extends Configured implements Tool {
@Override
@@ -21,21 +26,20 @@ public int run(String[] args) throws Exception {
return -1;
}
- JobConf conf = new JobConf(getConf(), getClass());
- conf.setJobName("Max temperature");
-
- FileInputFormat.addInputPath(conf, new Path(args[0]));
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(IntWritable.class);
+ Job job = new Job(getConf(), "Max temperature");
+ job.setJarByClass(getClass());
- conf.setMapperClass(MaxTemperatureMapper.class);
- conf.setCombinerClass(MaxTemperatureReducer.class);
- conf.setReducerClass(MaxTemperatureReducer.class);
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.setMapperClass(MaxTemperatureMapper.class);
+ job.setCombinerClass(MaxTemperatureReducer.class);
+ job.setReducerClass(MaxTemperatureReducer.class);
- JobClient.runJob(conf);
- return 0;
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
@@ -2,22 +2,25 @@
// cc MaxTemperatureMapperV3 A Mapper that uses a utility class to parse records
import java.io.IOException;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
// vv MaxTemperatureMapperV3
-public class MaxTemperatureMapper extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, IntWritable> {
+public class MaxTemperatureMapper
+ extends Mapper<LongWritable, Text, Text, IntWritable> {
/*[*/private NcdcRecordParser parser = new NcdcRecordParser();/*]*/
- public void map(LongWritable key, Text value,
- OutputCollector<Text, IntWritable> output, Reporter reporter)
- throws IOException {
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
/*[*/parser.parse(value);/*]*/
if (/*[*/parser.isValidTemperature()/*]*/) {
- output.collect(new Text(/*[*/parser.getYear()/*]*/),
+ context.write(new Text(/*[*/parser.getYear()/*]*/),
new IntWritable(/*[*/parser.getAirTemperature()/*]*/));
}
}
@@ -2,14 +2,19 @@
// vv MaxTemperatureDriverV4
package v4;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import v1.MaxTemperatureReducer;
+//Identical to v3 except for v4 mapper
public class MaxTemperatureDriver extends Configured implements Tool {
@Override
@@ -21,21 +26,20 @@ public int run(String[] args) throws Exception {
return -1;
}
- JobConf conf = new JobConf(getConf(), getClass());
- conf.setJobName("Max temperature");
-
- FileInputFormat.addInputPath(conf, new Path(args[0]));
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(IntWritable.class);
+ Job job = new Job(getConf(), "Max temperature");
+ job.setJarByClass(getClass());
- conf.setMapperClass(MaxTemperatureMapper.class);
- conf.setCombinerClass(MaxTemperatureReducer.class);
- conf.setReducerClass(MaxTemperatureReducer.class);
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.setMapperClass(MaxTemperatureMapper.class);
+ job.setCombinerClass(MaxTemperatureReducer.class);
+ job.setReducerClass(MaxTemperatureReducer.class);
- JobClient.runJob(conf);
- return 0;
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
@@ -6,35 +6,33 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
+
import v3.NcdcRecordParser;
//vv MaxTemperatureMapperV4
-public class MaxTemperatureMapper extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, IntWritable> {
+public class MaxTemperatureMapper
+ extends Mapper<LongWritable, Text, Text, IntWritable> {
/*[*/enum Temperature {
OVER_100
}/*]*/
private NcdcRecordParser parser = new NcdcRecordParser();
- public void map(LongWritable key, Text value,
- OutputCollector<Text, IntWritable> output, Reporter reporter)
- throws IOException {
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
int airTemperature = parser.getAirTemperature();
/*[*/if (airTemperature > 1000) {
System.err.println("Temperature over 100 degrees for input: " + value);
- reporter.setStatus("Detected possibly corrupt record: see logs.");
- reporter.incrCounter(Temperature.OVER_100, 1);
+ context.setStatus("Detected possibly corrupt record: see logs.");
+ context.getCounter(Temperature.OVER_100).increment(1);
}/*]*/
- output.collect(new Text(parser.getYear()), new IntWritable(airTemperature));
+ context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
}
}
}
Oops, something went wrong.

0 comments on commit 545166b

Please sign in to comment.