Permalink
Browse files

Moved ProtobufPigLoader to inherit from LzoBaseLoadFunc. This enables…

… a simplification of the code itself, and made it easier to use the Protobuf definition file to determine the returned schema instead of relying on error-prone manual schema defines. However, it causes a bit of a nomenclature screw-up, since a non-lzo class is now inheriting from a class with Lzo in the name. In this case, LzoBaseLoadFunc has no actual LZO-specific patterns contained in it, but it's still weird. To avoid breaking backwards compatibility, I did not rename LzoBaseLoadFunc.
  • Loading branch information...
1 parent 936919c commit 331057c3cc6e9c8bd74bcdec6afd016638451d2c @scottfines committed Aug 18, 2011
Showing with 11 additions and 21 deletions.
  1. +11 −21 src/java/com/twitter/elephantbird/pig/load/ProtobufPigLoader.java
@@ -4,13 +4,15 @@
import com.twitter.elephantbird.mapreduce.input.ProtobufFileInputFormat;
import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
import com.twitter.elephantbird.pig.util.PigUtil;
+import com.twitter.elephantbird.pig.util.ProtobufToPig;
import com.twitter.elephantbird.pig.util.ProtobufTuple;
+import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.pig.LoadFunc;
+import org.apache.pig.*;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.slf4j.Logger;
@@ -43,12 +45,12 @@
* used.
*
* <p>Usage of this is identical to that of {@link LzoProtobufBlockPigLoader}.
+ *
*/
-public class ProtobufPigLoader<M extends Message> extends LoadFunc {
- private static final Logger LOG = LoggerFactory.getLogger(ProtobufPigLoader.class);
+public class ProtobufPigLoader<M extends Message> extends LzoBaseLoadFunc{
private TypeRef<M> typeRef_ = null;
- private RecordReader reader_;
+ private final ProtobufToPig protoToPig_ = new ProtobufToPig();
/**
* Default constructor. Do not use for actual loading.
*/
@@ -72,26 +74,14 @@ public InputFormat getInputFormat() throws IOException {
return new ProtobufFileInputFormat<M>(typeRef_);
}
-
@Override
- public void prepareToRead(RecordReader recordReader, PigSplit pigSplit) throws IOException {
- reader_ = recordReader;
+ public Tuple getNext() throws IOException {
+ M value = super.getNextBinaryValue(typeRef_);
+ return value!=null? new ProtobufTuple(value):null;
}
@Override
- public Tuple getNext() throws IOException {
- try {
- if(reader_ ==null)
- LOG.warn("Reader is null!");
- else if(reader_.nextKeyValue()){
- M value = ((ProtobufWritable<M>)reader_.getCurrentValue()).get();
-
- return value!=null? new ProtobufTuple(value):null;
- }
- } catch (InterruptedException e) {
- LOG.error("InterruptedException encountered, bailing.", e);
- throw new IOException(e);
- }
- return null; //should never happen
+ public ResourceSchema getSchema(String location, Job job) throws IOException {
+ return new ResourceSchema(protoToPig_.toSchema(Protobufs.getMessageDescriptor(typeRef_.getRawClass())));
}
}

0 comments on commit 331057c

Please sign in to comment.