Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix CRUNCH-7: allow text files to be read using Avro-based PTypes

  • Loading branch information...
commit 39a2cc4bbbb7123c049d19ad5d86f299ff8675e4 1 parent 0a9cdfb
@jwills jwills authored
View
8 src/main/java/com/cloudera/crunch/io/text/TextFileSourceTarget.java
@@ -29,6 +29,8 @@
import com.cloudera.crunch.io.ReadableSourceTarget;
import com.cloudera.crunch.io.SourceTargetHelper;
import com.cloudera.crunch.type.PType;
+import com.cloudera.crunch.type.avro.AvroTypeFamily;
+import com.cloudera.crunch.type.avro.AvroUtf8InputFormat;
public class TextFileSourceTarget<T> extends TextFileTarget implements ReadableSourceTarget<T> {
@@ -71,7 +73,11 @@ public String toString() {
@Override
public void configureSource(Job job, int inputId) throws IOException {
- SourceTargetHelper.configureSource(job, inputId, TextInputFormat.class, path);
+ if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
+ SourceTargetHelper.configureSource(job, inputId, AvroUtf8InputFormat.class, path);
+ } else {
+ SourceTargetHelper.configureSource(job, inputId, TextInputFormat.class, path);
+ }
}
@Override
View
1  src/main/java/com/cloudera/crunch/io/text/TextFileTarget.java
@@ -26,6 +26,7 @@
import com.cloudera.crunch.io.SourceTargetHelper;
import com.cloudera.crunch.type.PTableType;
import com.cloudera.crunch.type.PType;
+import com.cloudera.crunch.type.avro.AvroTypeFamily;
public class TextFileTarget implements PathTarget, MapReduceTarget {
View
2  src/main/java/com/cloudera/crunch/type/avro/AvroUtf8InputFormat.java
@@ -61,7 +61,7 @@ public float getProgress() throws IOException {
public AvroWrapper<Utf8> getCurrentKey() throws IOException,
InterruptedException {
Text txt = lineRecordReader.getCurrentValue();
- currentKey.datum(new Utf8(txt.getBytes()));
+ currentKey.datum(new Utf8(txt.toString()));
return currentKey;
}
View
6 src/test/java/com/cloudera/crunch/WordCountTest.java
@@ -22,6 +22,7 @@
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mr.MRPipeline;
+import com.cloudera.crunch.io.At;
import com.cloudera.crunch.io.To;
import com.cloudera.crunch.lib.Aggregate;
import com.cloudera.crunch.test.FileHelper;
@@ -98,7 +99,8 @@ public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
File output = FileHelper.createOutputPath();
String outputPath = output.getAbsolutePath();
- PCollection<String> shakespeare = pipeline.readTextFile(inputPath);
+ PCollection<String> shakespeare = pipeline.read(
+ At.textFile(inputPath, typeFamily.strings()));
PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily);
if (useToOutput) {
wordCount.write(To.textFile(outputPath));
@@ -120,7 +122,7 @@ public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
boolean passed = false;
for (String line : lines) {
- if (line.equals("Macbeth\t28")) {
+ if (line.startsWith("Macbeth\t28")) {
passed = true;
break;
}
Please sign in to comment.
Something went wrong with that request. Please try again.