Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
...
  • 7 commits
  • 13 files changed
  • 0 commit comments
  • 1 contributor
View
9 pom.xml
@@ -40,6 +40,13 @@
</dependency>
<dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-smile</artifactId>
+ <version>1.7.3</version>
+ </dependency>
+
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
@@ -65,7 +72,7 @@
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
-
+
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
View
6 scrunch/src/main/scala/com/cloudera/scrunch/Conversions.scala
@@ -15,12 +15,10 @@
package com.cloudera.scrunch
import com.cloudera.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable, PTable => JTable, DoFn, Emitter}
-import com.cloudera.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, Tuple => CTuple, TupleN => CTupleN}
+import com.cloudera.crunch.{Pair => CPair}
import com.cloudera.crunch.`type`.PType
-import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong}
-import java.lang.{Iterable => JIterable}
import java.nio.ByteBuffer
-import scala.collection.{Iterable, Iterator, JavaConversions}
+import scala.collection.Iterable
trait CanParallelTransform[El, To] {
def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: A => Traversable[El], ptype: PType[El]): To
View
8 src/main/java/com/cloudera/crunch/io/text/TextFileSourceTarget.java
@@ -29,6 +29,8 @@
import com.cloudera.crunch.io.ReadableSourceTarget;
import com.cloudera.crunch.io.SourceTargetHelper;
import com.cloudera.crunch.type.PType;
+import com.cloudera.crunch.type.avro.AvroTypeFamily;
+import com.cloudera.crunch.type.avro.AvroUtf8InputFormat;
public class TextFileSourceTarget<T> extends TextFileTarget implements ReadableSourceTarget<T> {
@@ -71,7 +73,11 @@ public String toString() {
@Override
public void configureSource(Job job, int inputId) throws IOException {
- SourceTargetHelper.configureSource(job, inputId, TextInputFormat.class, path);
+ if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
+ SourceTargetHelper.configureSource(job, inputId, AvroUtf8InputFormat.class, path);
+ } else {
+ SourceTargetHelper.configureSource(job, inputId, TextInputFormat.class, path);
+ }
}
@Override
View
1 src/main/java/com/cloudera/crunch/io/text/TextFileTarget.java
@@ -26,6 +26,7 @@
import com.cloudera.crunch.io.SourceTargetHelper;
import com.cloudera.crunch.type.PTableType;
import com.cloudera.crunch.type.PType;
+import com.cloudera.crunch.type.avro.AvroTypeFamily;
public class TextFileTarget implements PathTarget, MapReduceTarget {
View
16 src/main/java/com/cloudera/crunch/lib/Aggregate.java
@@ -14,14 +14,19 @@
*/
package com.cloudera.crunch.lib;
+import java.util.Collection;
+
import com.cloudera.crunch.CombineFn;
import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PGroupedTable;
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pair;
+import com.cloudera.crunch.fn.MapValuesFn;
import com.cloudera.crunch.type.PTypeFamily;
+import com.google.common.collect.Lists;
/**
* Methods for performing various types of aggregations over {@link PCollection}
@@ -119,4 +124,15 @@ public void process(Pair<Boolean, Iterable<S>> input,
emitter.emit(Pair.of(input.first(), min));
} }));
}
+
+ public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) {
+ PTypeFamily tf = collect.getTypeFamily();
+ return collect.groupByKey().parallelDo(new MapValuesFn<K, Iterable<V>, Collection<V>>() {
+ @Override
+ public Collection<V> map(Iterable<V> v) {
+ return Lists.newArrayList(v);
+ }
+ }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType())));
+
+ }
}
View
2 src/main/java/com/cloudera/crunch/type/avro/AvroUtf8InputFormat.java
@@ -61,7 +61,7 @@ public float getProgress() throws IOException {
public AvroWrapper<Utf8> getCurrentKey() throws IOException,
InterruptedException {
Text txt = lineRecordReader.getCurrentValue();
- currentKey.datum(new Utf8(txt.getBytes()));
+ currentKey.datum(new Utf8(txt.toString()));
return currentKey;
}
View
5 src/main/java/com/cloudera/crunch/type/avro/Avros.java
@@ -36,6 +36,7 @@
import com.cloudera.crunch.fn.CompositeMapFn;
import com.cloudera.crunch.type.PType;
import com.cloudera.crunch.type.TupleFactory;
+import com.cloudera.crunch.util.PTypes;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -400,6 +401,10 @@ private static Schema createTupleSchema(PType... ptypes) {
base.getSubTypes().toArray(new PType[0]));
}
+ public static <T> PType<T> jsons(Class<T> clazz) {
+ return PTypes.jsonString(clazz, AvroTypeFamily.getInstance());
+ }
+
public static final <K, V> AvroTableType<K, V> tableOf(PType<K> key, PType<V> value) {
AvroType<K> avroKey = (AvroType<K>) key;
AvroType<V> avroValue = (AvroType<V>) value;
View
5 src/main/java/com/cloudera/crunch/type/writable/Writables.java
@@ -42,6 +42,7 @@
import com.cloudera.crunch.type.DataBridge;
import com.cloudera.crunch.type.PType;
import com.cloudera.crunch.type.TupleFactory;
+import com.cloudera.crunch.util.PTypes;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -520,6 +521,10 @@ public void initialize() {
new MapOutputMapFn(wt.getSerializationClass(), handler.getOutputMapFn()), ptype);
}
+ public static <T> PType<T> jsons(Class<T> clazz) {
+ return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());
+ }
+
// Not instantiable
private Writables() {
}
View
60 src/main/java/com/cloudera/crunch/util/PTypes.java
@@ -14,7 +14,10 @@
*/
package com.cloudera.crunch.util;
+import java.nio.ByteBuffer;
+
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.smile.SmileFactory;
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
@@ -28,15 +31,63 @@
public class PTypes {
public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
- return typeFamily.derived(clazz, new JSONInputManFn<T>(clazz), new JSONOutputMapFn<T>(), typeFamily.strings());
+ return typeFamily.derived(clazz, new JacksonInputMapFn<T>(clazz),
+ new JacksonOutputMapFn<T>(), typeFamily.strings());
}
- public static class JSONInputManFn<T> extends MapFn<String, T> {
+ public static <T> PType<T> smile(Class<T> clazz, PTypeFamily typeFamily) {
+ return typeFamily.derived(clazz, new SmileInputMapFn<T>(clazz),
+ new SmileOutputMapFn<T>(), typeFamily.bytes());
+ }
+
+ public static class SmileInputMapFn<T> extends MapFn<ByteBuffer, T> {
private final Class<T> clazz;
private transient ObjectMapper mapper;
- public JSONInputManFn(Class<T> clazz) {
+ public SmileInputMapFn(Class<T> clazz) {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public void initialize() {
+ this.mapper = new ObjectMapper(new SmileFactory());
+ }
+
+ @Override
+ public T map(ByteBuffer input) {
+ try {
+ return mapper.readValue(input.array(), input.position(), input.limit(), clazz);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+ }
+
+ public static class SmileOutputMapFn<T> extends MapFn<T, ByteBuffer> {
+ private transient ObjectMapper mapper;
+
+ @Override
+ public void initialize() {
+ this.mapper = new ObjectMapper(new SmileFactory());
+ }
+
+ @Override
+ public ByteBuffer map(T input) {
+ try {
+ return ByteBuffer.wrap(mapper.writeValueAsBytes(input));
+ } catch (Exception e) {
+ throw new CrunchRuntimeException(e);
+ }
+ }
+ }
+
+ public static class JacksonInputMapFn<T> extends MapFn<String, T> {
+
+ private final Class<T> clazz;
+ private transient ObjectMapper mapper;
+
+ public JacksonInputMapFn(Class<T> clazz) {
this.clazz = clazz;
}
@@ -55,8 +106,7 @@ public T map(String input) {
}
}
- public static class JSONOutputMapFn<T> extends MapFn<T, String> {
-
+ public static class JacksonOutputMapFn<T> extends MapFn<T, String> {
private transient ObjectMapper mapper;
@Override
View
1 src/test/java/com/cloudera/crunch/MapsTest.java
@@ -5,7 +5,6 @@
import org.junit.Test;
import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.lib.Aggregate;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.type.PTypeFamily;
import com.cloudera.crunch.type.avro.AvroTypeFamily;
View
58 src/test/java/com/cloudera/crunch/PageRankTest.java
@@ -24,37 +24,63 @@
import com.cloudera.crunch.type.PTypeFamily;
import com.cloudera.crunch.type.avro.AvroTypeFamily;
import com.cloudera.crunch.type.writable.WritableTypeFamily;
-import com.cloudera.crunch.util.Collects;
+import com.cloudera.crunch.util.PTypes;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import java.util.Collection;
+import java.util.List;
import org.junit.Test;
public class PageRankTest {
- public static class PageRankData extends Tuple3<Float, Float, Collection<String>> {
- public PageRankData(Float first, Float second, Collection<String> third) {
- super(first, second, third);
- }
+ public static class PageRankData {
+ public float score;
+ public float lastScore;
+ public List<String> urls;
+
+ public PageRankData() { }
+
+ public PageRankData(float score, float lastScore, Iterable<String> urls) {
+ this.score = score;
+ this.lastScore = lastScore;
+ this.urls = Lists.newArrayList(urls);
+ }
}
- @Test public void testAvro() throws Exception {
- run(new MRPipeline(PageRankTest.class), AvroTypeFamily.getInstance());
+ @Test public void testAvroJSON() throws Exception {
+ PTypeFamily tf = AvroTypeFamily.getInstance();
+ PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+ run(new MRPipeline(PageRankTest.class), prType, tf);
}
- @Test public void testWritables() throws Exception {
- run(new MRPipeline(PageRankTest.class), WritableTypeFamily.getInstance());
+ @Test public void testAvroBSON() throws Exception {
+ PTypeFamily tf = AvroTypeFamily.getInstance();
+ PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
+ run(new MRPipeline(PageRankTest.class), prType, tf);
+ }
+
+ @Test public void testWritablesJSON() throws Exception {
+ PTypeFamily tf = WritableTypeFamily.getInstance();
+ PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+ run(new MRPipeline(PageRankTest.class), prType, tf);
}
+ @Test public void testWritablesBSON() throws Exception {
+ PTypeFamily tf = WritableTypeFamily.getInstance();
+ PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
+ run(new MRPipeline(PageRankTest.class), prType, tf);
+ }
+
public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input) {
PTypeFamily ptf = input.getTypeFamily();
PTable<String, Float> outbound = input.parallelDo(
new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
@Override
public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
- float pr = input.second().first() / input.second().third().size();
- for (String link : input.second().third()) {
+ float pr = input.second().score / input.second().urls.size();
+ for (String link : input.second().urls) {
emitter.emit(Pair.of(link, pr));
}
}
@@ -71,15 +97,13 @@ public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float
for (Float s : input.second().second()) {
sum += s;
}
- return Pair.of(input.first(), new PageRankData(0.5f + 0.5f*sum, prd.first(), prd.third()));
+ return Pair.of(input.first(), new PageRankData(0.5f + 0.5f*sum, prd.score, prd.urls));
}
}, input.getPTableType());
}
- public static void run(Pipeline pipeline, PTypeFamily ptf) throws Exception {
+ public static void run(Pipeline pipeline, PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
String urlInput = FileHelper.createTempCopyOf("urls.txt");
- PType<PageRankData> prType = ptf.tuples(PageRankData.class, ptf.floats(), ptf.floats(),
- ptf.collections(ptf.strings()));
PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
.parallelDo(new MapFn<String, Pair<String, String>>() {
@Override
@@ -93,7 +117,7 @@ public static void run(Pipeline pipeline, PTypeFamily ptf) throws Exception {
@Override
public Pair<String, PageRankData> map(
Pair<String, Iterable<String>> input) {
- return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, Collects.newArrayList(input.second())));
+ return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
}
}, ptf.tableOf(ptf.strings(), prType));
@@ -106,7 +130,7 @@ public static void run(Pipeline pipeline, PTypeFamily ptf) throws Exception {
@Override
public Float map(Pair<String, PageRankData> input) {
PageRankData prd = input.second();
- return Math.abs(prd.first() - prd.second());
+ return Math.abs(prd.score - prd.lastScore);
}
}, ptf.floats())).materialize(), null);
}
View
6 src/test/java/com/cloudera/crunch/WordCountTest.java
@@ -22,6 +22,7 @@
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.io.At;
import com.cloudera.crunch.io.To;
import com.cloudera.crunch.lib.Aggregate;
import com.cloudera.crunch.test.FileHelper;
@@ -98,7 +99,8 @@ public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
File output = FileHelper.createOutputPath();
String outputPath = output.getAbsolutePath();
- PCollection<String> shakespeare = pipeline.readTextFile(inputPath);
+ PCollection<String> shakespeare = pipeline.read(
+ At.textFile(inputPath, typeFamily.strings()));
PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily);
if (useToOutput) {
wordCount.write(To.textFile(outputPath));
@@ -120,7 +122,7 @@ public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
boolean passed = false;
for (String line : lines) {
- if (line.equals("Macbeth\t28")) {
+ if (line.startsWith("Macbeth\t28")) {
passed = true;
break;
}
View
35 src/test/java/com/cloudera/crunch/lib/AggregateTest.java
@@ -14,13 +14,18 @@
*/
package com.cloudera.crunch.lib;
+import static com.cloudera.crunch.type.writable.Writables.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.Collection;
+
import org.junit.Test;
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.PCollection;
+import com.cloudera.crunch.PTable;
+import com.cloudera.crunch.Pair;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.test.FileHelper;
@@ -61,4 +66,34 @@ public Integer map(Integer input) {
assertEquals(maxLengths.intValue(), -minLengths.intValue());
pipeline.done();
}
+
+ private static class SplitFn extends MapFn<String, Pair<String, String>> {
+
+ @Override
+ public Pair<String, String> map(String input) {
+ String[] p = input.split("\\s+");
+ return Pair.of(p[0], p[1]);
+ }
+
+ }
+ @Test public void testCollectUrls() throws Exception {
+ Pipeline p = new MRPipeline(AggregateTest.class);
+ String urlsInputPath = FileHelper.createTempCopyOf("urls.txt");
+ PTable<String, Collection<String>> urls = Aggregate.collectValues(
+ p.readTextFile(urlsInputPath)
+ .parallelDo(new SplitFn(), tableOf(strings(), strings())));
+ for (Pair<String, Collection<String>> e : urls.materialize()) {
+ String key = e.first();
+ int expectedSize = 0;
+ if ("www.A.com".equals(key)) {
+ expectedSize = 4;
+ } else if ("www.B.com".equals(key) || "www.F.com".equals(key)) {
+ expectedSize = 2;
+ } else if ("www.C.com".equals(key) || "www.D.com".equals(key) || "www.E.com".equals(key)) {
+ expectedSize = 1;
+ }
+ assertEquals("Checking key = " + key, expectedSize, e.second().size());
+ p.done();
+ }
+ }
}

No commit comments for this range

Something went wrong with that request. Please try again.