Skip to content
Browse files

Add more experimental code.

  • Loading branch information...
1 parent e54d627 commit 1ec832cd2c864ca3404b3eb38dc63fda9c8539f6 @tomwhite committed Jan 11, 2012
View
5 experimental/input/sample.txt
@@ -0,0 +1,5 @@
+0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
+0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
+0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
+0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
+0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
View
4 experimental/join/A
@@ -0,0 +1,4 @@
+2 Tie
+4 Coat
+3 Hat
+1 Scarf
View
5 experimental/join/B
@@ -0,0 +1,5 @@
+Joe 2
+Hank 4
+Ali 0
+Eve 3
+Hank 2
View
18 experimental/pom.xml
@@ -23,5 +23,23 @@
<artifactId>ch04</artifactId>
<version>3.0</version>
</dependency>
+ <dependency>
+ <groupId>com.cloudera.crunch</groupId>
+ <artifactId>crunch</artifactId>
+ <version>0.1.0</version>
+ </dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <testExcludes>
+ <exclude>SplitTest.java</exclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
View
3 experimental/sort/A
@@ -0,0 +1,3 @@
+2 3
+1 2
+2 4
View
55 experimental/src/test/java/crunch/CogroupCrunchTest.java
@@ -0,0 +1,55 @@
+package crunch;
+import static com.cloudera.crunch.type.writable.Writables.strings;
+import static com.cloudera.crunch.type.writable.Writables.tableOf;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PTable;
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.lib.Cogroup;
+import com.cloudera.crunch.lib.Join;
+import com.google.common.base.Splitter;
+
+public class CogroupCrunchTest implements Serializable {
+
+ @Test
+ public void test() throws IOException {
+ Pipeline pipeline = new MRPipeline(CogroupCrunchTest.class);
+ PCollection<String> a = pipeline.readTextFile("join/A");
+ PCollection<String> b = pipeline.readTextFile("join/B");
+
+ PTable<String, String> aTable = a.parallelDo(new DoFn<String, Pair<String, String>>() {
+ @Override
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ Iterator<String> split = Splitter.on('\t').split(input).iterator();
+ emitter.emit(Pair.of(split.next(), split.next()));
+ }
+ }, tableOf(strings(),strings()));
+
+ PTable<String, String> bTable = b.parallelDo(new DoFn<String, Pair<String, String>>() {
+ @Override
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ Iterator<String> split = Splitter.on('\t').split(input).iterator();
+ String l = split.next();
+ String r = split.next();
+ emitter.emit(Pair.of(r, l));
+ }
+ }, tableOf(strings(),strings()));
+
+ PTable<String, Pair<Collection<String>, Collection<String>>> cogroup = Cogroup.cogroup(aTable, bTable);
+
+ pipeline.writeTextFile(cogroup, "output-cogrouped");
+ pipeline.run();
+ }
+
+}
View
53 experimental/src/test/java/crunch/JoinCrunchTest.java
@@ -0,0 +1,53 @@
+package crunch;
+import static com.cloudera.crunch.type.writable.Writables.strings;
+import static com.cloudera.crunch.type.writable.Writables.tableOf;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PTable;
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.lib.Join;
+import com.google.common.base.Splitter;
+
+public class JoinCrunchTest implements Serializable {
+
+ @Test
+ public void test() throws IOException {
+ Pipeline pipeline = new MRPipeline(JoinCrunchTest.class);
+ PCollection<String> a = pipeline.readTextFile("join/A");
+ PCollection<String> b = pipeline.readTextFile("join/B");
+
+ PTable<String, String> aTable = a.parallelDo(new DoFn<String, Pair<String, String>>() {
+ @Override
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ Iterator<String> split = Splitter.on('\t').split(input).iterator();
+ emitter.emit(Pair.of(split.next(), split.next()));
+ }
+ }, tableOf(strings(),strings()));
+
+ PTable<String, String> bTable = b.parallelDo(new DoFn<String, Pair<String, String>>() {
+ @Override
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ Iterator<String> split = Splitter.on('\t').split(input).iterator();
+ String l = split.next();
+ String r = split.next();
+ emitter.emit(Pair.of(r, l));
+ }
+ }, tableOf(strings(),strings()));
+
+ PTable<String, Pair<String, String>> join = Join.join(aTable, bTable);
+
+ pipeline.writeTextFile(join, "output-joined");
+ pipeline.run();
+ }
+
+}
View
57 experimental/src/test/java/crunch/MaxTemperatureCrunchTest.java
@@ -0,0 +1,57 @@
+package crunch;
+import static com.cloudera.crunch.type.writable.Writables.ints;
+import static com.cloudera.crunch.type.writable.Writables.strings;
+import static com.cloudera.crunch.type.writable.Writables.tableOf;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.CombineFn;
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PTable;
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+
+public class MaxTemperatureCrunchTest {
+
+ private static final int MISSING = 9999;
+
+ @Test
+ public void test() throws IOException {
+ Pipeline pipeline = new MRPipeline(MaxTemperatureCrunchTest.class);
+ PCollection<String> records = pipeline.readTextFile("input");
+
+ PTable<String, Integer> maxTemps = records
+ .parallelDo(toYearTempPairsFn(), tableOf(strings(), ints()))
+ .groupByKey()
+ .combineValues(CombineFn.<String> MAX_INTS());
+
+ pipeline.writeTextFile(maxTemps, "output");
+ pipeline.run();
+ }
+
+ private static DoFn<String, Pair<String, Integer>> toYearTempPairsFn() {
+ return new DoFn<String, Pair<String, Integer>>() {
+ @Override
+ public void process(String input, Emitter<Pair<String, Integer>> emitter) {
+ String line = input.toString();
+ String year = line.substring(15, 19);
+ int airTemperature;
+ if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
+ airTemperature = Integer.parseInt(line.substring(88, 92));
+ } else {
+ airTemperature = Integer.parseInt(line.substring(87, 92));
+ }
+ String quality = line.substring(92, 93);
+ if (airTemperature != MISSING && quality.matches("[01459]")) {
+ emitter.emit(Pair.of(year, airTemperature));
+ }
+ }
+ };
+ }
+
+}
View
46 experimental/src/test/java/crunch/SortCrunchTest.java
@@ -0,0 +1,46 @@
+package crunch;
+import static com.cloudera.crunch.lib.Sort.ColumnOrder.by;
+import static com.cloudera.crunch.lib.Sort.Order.ASCENDING;
+import static com.cloudera.crunch.lib.Sort.Order.DESCENDING;
+import static com.cloudera.crunch.type.writable.Writables.ints;
+import static com.cloudera.crunch.type.writable.Writables.pairs;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.Pipeline;
+import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.lib.Sort;
+import com.google.common.base.Splitter;
+
+public class SortCrunchTest implements Serializable {
+
+ @Test
+ public void test() throws IOException {
+ Pipeline pipeline = new MRPipeline(SortCrunchTest.class);
+ PCollection<String> records = pipeline.readTextFile("sort/A");
+
+ PCollection<Pair<Integer, Integer>> pairs = records.parallelDo(new DoFn<String, Pair<Integer, Integer>>() {
+ @Override
+ public void process(String input, Emitter<Pair<Integer, Integer>> emitter) {
+ Iterator<String> split = Splitter.on('\t').split(input).iterator();
+ String l = split.next();
+ String r = split.next();
+ emitter.emit(Pair.of(Integer.parseInt(l), Integer.parseInt(r)));
+ }
+ }, pairs(ints(), ints()));
+
+ PCollection<Pair<Integer, Integer>> sorted = Sort.sortPairs(pairs, by(1, ASCENDING), by(2, DESCENDING));
+
+ pipeline.writeTextFile(sorted, "output-sorted");
+ pipeline.run();
+ }
+
+}
View
27 experimental/src/test/java/crunch/ToYearTempPairsFn.java
@@ -0,0 +1,27 @@
+package crunch;
+import com.cloudera.crunch.DoFn;
+import com.cloudera.crunch.Emitter;
+import com.cloudera.crunch.Pair;
+
+public class ToYearTempPairsFn extends DoFn<String, Pair<String, Integer>> {
+
+ private static final int MISSING = 9999;
+
+
+ @Override
+ public void process(String input, Emitter<Pair<String, Integer>> emitter) {
+ String line = input.toString();
+ String year = line.substring(15, 19);
+ int airTemperature;
+ if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
+ airTemperature = Integer.parseInt(line.substring(88, 92));
+ } else {
+ airTemperature = Integer.parseInt(line.substring(87, 92));
+ }
+ String quality = line.substring(92, 93);
+ if (airTemperature != MISSING && quality.matches("[01459]")) {
+ emitter.emit(Pair.of(year, airTemperature));
+ }
+ }
+
+}

0 comments on commit 1ec832c

Please sign in to comment.
Something went wrong with that request. Please try again.