Permalink
Browse files

Get builds working and tests passing.

  • Loading branch information...
tomwhite committed Feb 2, 2012
1 parent f49e803 commit be9083c87732c410a4e8a1b27e19f97b6e36293f
View
@@ -152,6 +152,12 @@
<artifactId>mrunit</artifactId>
<version>0.8.0-incubating</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -30,9 +30,8 @@ public static void main(String[] args) throws IOException {
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
- conf.setBoolean("mapred.output.compress", true);
- conf.setClass("mapred.output.compression.codec", GzipCodec.class,
- CompressionCodec.class);
+ /*[*/FileOutputFormat.setCompressOutput(conf, true);
+ FileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);/*]*/
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
@@ -1,3 +1,4 @@
+// == OldMaxTemperatureWithMapOutputCompression
package oldapi;
import java.io.IOException;
@@ -29,8 +30,10 @@ public static void main(String[] args) throws IOException {
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
+ // vv OldMaxTemperatureWithMapOutputCompression
conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);
+ // ^^ OldMaxTemperatureWithMapOutputCompression
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
@@ -1,10 +1,6 @@
// cc MissingTemperatureFields Application to calculate the proportion of records with missing temperature fields
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class MissingTemperatureFields extends Configured implements Tool {
@@ -15,9 +11,9 @@ public int run(String[] args) throws Exception {
JobBuilder.printUsage(this, "<job ID>");
return -1;
}
- Cluster cluster = new Cluster(getConf());
String jobID = args[0];
- Job job = cluster.getJob(JobID.forName(jobID));
+ JobClient jobClient = new JobClient(new JobConf(getConf()));
+ RunningJob job = jobClient.getJob(JobID.forName(jobID));
if (job == null) {
System.err.printf("No job with ID %s found.\n", jobID);
return -1;
@@ -28,10 +24,10 @@ public int run(String[] args) throws Exception {
}
Counters counters = job.getCounters();
- long missing = counters.findCounter(
- MaxTemperatureWithCounters.Temperature.MISSING).getValue();
+ long missing = counters.getCounter(
+ MaxTemperatureWithCounters.Temperature.MISSING);
- long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
+ long total = counters.getCounter(Task.Counter.MAP_INPUT_RECORDS);
System.out.printf("Records with missing temperature fields: %.2f%%\n",
100.0 * missing / total);
@@ -0,0 +1,48 @@
+// == NewMissingTemperatureFields Application to calculate the proportion of records with missing temperature fields
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.util.*;
+
+public class NewMissingTemperatureFields extends Configured implements Tool {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 1) {
+ JobBuilder.printUsage(this, "<job ID>");
+ return -1;
+ }
+ String jobID = args[0];
+ // vv NewMissingTemperatureFields
+ Cluster cluster = new Cluster(getConf());
+ Job job = cluster.getJob(JobID.forName(jobID));
+ // ^^ NewMissingTemperatureFields
+ if (job == null) {
+ System.err.printf("No job with ID %s found.\n", jobID);
+ return -1;
+ }
+ if (!job.isComplete()) {
+ System.err.printf("Job %s is not complete.\n", jobID);
+ return -1;
+ }
+
+ // vv NewMissingTemperatureFields
+ Counters counters = job.getCounters();
+ long missing = counters.findCounter(
+ MaxTemperatureWithCounters.Temperature.MISSING).getValue();
+ long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
+ // ^^ NewMissingTemperatureFields
+
+ System.out.printf("Records with missing temperature fields: %.2f%%\n",
+ 100.0 * missing / total);
+ return 0;
+ }
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new NewMissingTemperatureFields(), args);
+ System.exit(exitCode);
+ }
+}
@@ -12,8 +12,8 @@ public int run(String[] args) throws Exception {
JobBuilder.printUsage(this, "<job ID>");
return -1;
}
- JobClient jobClient = new JobClient(new JobConf(getConf()));
String jobID = args[0];
+ JobClient jobClient = new JobClient(new JobConf(getConf()));
RunningJob job = jobClient.getJob(JobID.forName(jobID));
if (job == null) {
System.err.printf("No job with ID %s found.\n", jobID);
@@ -0,0 +1,88 @@
+package oldapi;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+
+public class JobBuilder {
+
+ private final Class<?> driverClass;
+ private final JobConf conf;
+ private final int extraArgCount;
+ private final String extrArgsUsage;
+
+ private String[] extraArgs;
+
+ public JobBuilder(Class<?> driverClass) {
+ this(driverClass, 0, "");
+ }
+
+ public JobBuilder(Class<?> driverClass, int extraArgCount, String extrArgsUsage) {
+ this.driverClass = driverClass;
+ this.extraArgCount = extraArgCount;
+ this.conf = new JobConf(driverClass);
+ this.extrArgsUsage = extrArgsUsage;
+ }
+
+ public static JobConf parseInputAndOutput(Tool tool, Configuration conf,
+ String[] args) {
+
+ if (args.length != 2) {
+ printUsage(tool, "<input> <output>");
+ return null;
+ }
+ JobConf jobConf = new JobConf(conf, tool.getClass());
+ FileInputFormat.addInputPath(jobConf, new Path(args[0]));
+ FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
+ return jobConf;
+ }
+
+ public static void printUsage(Tool tool, String extraArgsUsage) {
+ System.err.printf("Usage: %s [genericOptions] %s\n\n",
+ tool.getClass().getSimpleName(), extraArgsUsage);
+ GenericOptionsParser.printGenericCommandUsage(System.err);
+ }
+
+ public JobBuilder withCommandLineArgs(String... args) throws IOException {
+ GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+ String[] otherArgs = parser.getRemainingArgs();
+ if (otherArgs.length < 2 && otherArgs.length > 3 + extraArgCount) {
+ System.err.printf("Usage: %s [genericOptions] [-overwrite] <input path> <output path> %s\n\n",
+ driverClass.getSimpleName(), extrArgsUsage);
+ GenericOptionsParser.printGenericCommandUsage(System.err);
+ System.exit(-1);
+ }
+ int index = 0;
+ boolean overwrite = false;
+ if (otherArgs[index].equals("-overwrite")) {
+ overwrite = true;
+ index++;
+ }
+ Path input = new Path(otherArgs[index++]);
+ Path output = new Path(otherArgs[index++]);
+
+ if (index < otherArgs.length) {
+ extraArgs = new String[otherArgs.length - index];
+ System.arraycopy(otherArgs, index, extraArgs, 0, otherArgs.length - index);
+ }
+
+ if (overwrite) {
+ output.getFileSystem(conf).delete(output, true);
+ }
+
+ FileInputFormat.addInputPath(conf, input);
+ FileOutputFormat.setOutputPath(conf, output);
+ return this;
+ }
+
+ public JobConf build() {
+ return conf;
+ }
+
+ public String[] getExtraArgs() {
+ return extraArgs;
+ }
+}
@@ -0,0 +1,58 @@
+package oldapi;
+
+import java.math.*;
+import org.apache.hadoop.io.Text;
+
+public class MetOfficeRecordParser {
+
+ private String year;
+ private String airTemperatureString;
+ private int airTemperature;
+ private boolean airTemperatureValid;
+
+ public void parse(String record) {
+ if (record.length() < 18) {
+ return;
+ }
+ year = record.substring(3, 7);
+ if (isValidRecord(year)) {
+ airTemperatureString = record.substring(13, 18);
+ if (!airTemperatureString.trim().equals("---")) {
+ BigDecimal temp = new BigDecimal(airTemperatureString.trim());
+ temp = temp.multiply(new BigDecimal(BigInteger.TEN));
+ airTemperature = temp.intValueExact();
+ airTemperatureValid = true;
+ }
+ }
+ }
+
+ private boolean isValidRecord(String year) {
+ try {
+ Integer.parseInt(year);
+ return true;
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ }
+
+ public void parse(Text record) {
+ parse(record.toString());
+ }
+
+ public String getYear() {
+ return year;
+ }
+
+ public int getAirTemperature() {
+ return airTemperature;
+ }
+
+ public String getAirTemperatureString() {
+ return airTemperatureString;
+ }
+
+ public boolean isValidTemperature() {
+ return airTemperatureValid;
+ }
+
+}
@@ -0,0 +1,92 @@
+package oldapi;
+
+import java.text.*;
+import java.util.Date;
+
+import org.apache.hadoop.io.Text;
+
+public class NcdcRecordParser {
+
+ private static final int MISSING_TEMPERATURE = 9999;
+
+ private static final DateFormat DATE_FORMAT =
+ new SimpleDateFormat("yyyyMMddHHmm");
+
+ private String stationId;
+ private String observationDateString;
+ private String year;
+ private String airTemperatureString;
+ private int airTemperature;
+ private boolean airTemperatureMalformed;
+ private String quality;
+
+ public void parse(String record) {
+ stationId = record.substring(4, 10) + "-" + record.substring(10, 15);
+ observationDateString = record.substring(15, 27);
+ year = record.substring(15, 19);
+ airTemperatureMalformed = false;
+ // Remove leading plus sign as parseInt doesn't like them
+ if (record.charAt(87) == '+') {
+ airTemperatureString = record.substring(88, 92);
+ airTemperature = Integer.parseInt(airTemperatureString);
+ } else if (record.charAt(87) == '-') {
+ airTemperatureString = record.substring(87, 92);
+ airTemperature = Integer.parseInt(airTemperatureString);
+ } else {
+ airTemperatureMalformed = true;
+ }
+ airTemperature = Integer.parseInt(airTemperatureString);
+ quality = record.substring(92, 93);
+ }
+
+ public void parse(Text record) {
+ parse(record.toString());
+ }
+
+ public boolean isValidTemperature() {
+ return !airTemperatureMalformed && airTemperature != MISSING_TEMPERATURE
+ && quality.matches("[01459]");
+ }
+
+ public boolean isMalformedTemperature() {
+ return airTemperatureMalformed;
+ }
+
+ public boolean isMissingTemperature() {
+ return airTemperature == MISSING_TEMPERATURE;
+ }
+
+ public String getStationId() {
+ return stationId;
+ }
+
+ public Date getObservationDate() {
+ try {
+ System.out.println(observationDateString);
+ return DATE_FORMAT.parse(observationDateString);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public String getYear() {
+ return year;
+ }
+
+ public int getYearInt() {
+ return Integer.parseInt(year);
+ }
+
+ public int getAirTemperature() {
+ return airTemperature;
+ }
+
+ public String getAirTemperatureString() {
+ return airTemperatureString;
+ }
+
+ public String getQuality() {
+ return quality;
+ }
+
+}
Oops, something went wrong.

0 comments on commit be9083c

Please sign in to comment.