Skip to content

Commit

Permalink
Add JoinRecordWithStationNameCrunchTest
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Feb 11, 2014
1 parent 0bc16cb commit 359ef3f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 0 deletions.
Binary file removed ch05/output/._SUCCESS.crc
Binary file not shown.
@@ -0,0 +1,58 @@
package crunch;

import java.io.IOException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.lib.Join;
import org.junit.Test;

import static org.apache.crunch.types.writable.Writables.strings;
import static org.apache.crunch.types.writable.Writables.tableOf;

public class JoinRecordWithStationNameCrunchTest {

@Test
public void test() throws IOException {
Pipeline pipeline = new MRPipeline(MaxTemperatureAvroCrunchTest.class);
PCollection<String> records = pipeline.readTextFile("input/ncdc/sample.txt");
PCollection<String> stations = pipeline.readTextFile
("input/ncdc/metadata/stations-fixed-width.txt");
PTable<String, String> stationIdToRecord = records
.parallelDo(toStationIdRecordPairsFn(), tableOf(strings(), strings()));
PTable<String, String> stationIdToName = stations
.parallelDo(toStationIdNamePairsFn(), tableOf(strings(), strings()));

PTable<String, Pair<String, String>> joined =
Join.join(stationIdToRecord, stationIdToName);

pipeline.writeTextFile(joined, "output");
pipeline.run();
}

private static DoFn<String, Pair<String, String>> toStationIdRecordPairsFn() {
return new DoFn<String, Pair<String, String>>() {
NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void process(String input, Emitter<Pair<String, String>> emitter) {
parser.parse(input);
emitter.emit(Pair.of(parser.getStationId(), input));
}
};
}
private static DoFn<String, Pair<String, String>> toStationIdNamePairsFn() {
return new DoFn<String, Pair<String, String>>() {
NcdcStationMetadataParser parser = new NcdcStationMetadataParser();
@Override
public void process(String input, Emitter<Pair<String, String>> emitter) {
if (parser.parse(input)) {
emitter.emit(Pair.of(parser.getStationId(), parser.getStationName()));
}
}
};
}
}
39 changes: 39 additions & 0 deletions experimental/src/test/java/crunch/NcdcStationMetadataParser.java
@@ -0,0 +1,39 @@
package crunch;

import java.io.Serializable;
import org.apache.hadoop.io.Text;

public class NcdcStationMetadataParser implements Serializable {

private String stationId;
private String stationName;

public boolean parse(String record) {
if (record.length() < 42) { // header
return false;
}
String usaf = record.substring(0, 6);
String wban = record.substring(7, 12);
stationId = usaf + "-" + wban;
stationName = record.substring(13, 42);
try {
Integer.parseInt(usaf); // USAF identifiers are numeric
return true;
} catch (NumberFormatException e) {
return false;
}
}

public boolean parse(Text record) {
return parse(record.toString());
}

public String getStationId() {
return stationId;
}

public String getStationName() {
return stationName;
}

}
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -26,5 +26,6 @@
<module>ch16</module>
<module>hadoop-examples</module>
<module>snippet</module>
<module>experimental</module>
</modules>
</project>

0 comments on commit 359ef3f

Please sign in to comment.