Permalink
Browse files

Merge pull request #255 from rangadi/rcfile_proto_thrift_writables_2

Rcfile record readers should return proto or thrift writables
  • Loading branch information...
2 parents 9fcd367 + 28300d8 commit 2c0f0fe7ed40b4253ca7b064641d02b0166900fc @rangadi rangadi committed Oct 1, 2012
@@ -0,0 +1,34 @@
+package com.twitter.elephantbird.mapreduce.input;
+
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Base input format for Thrift and Protobuf RCFile input formats. <br>
+ * contains a few common common utility methods.
+ */
+public abstract class RCFileBaseInputFormat extends MapReduceInputFormatWrapper<LongWritable, Writable> {
+
+ /** internal, for MR use only. */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public RCFileBaseInputFormat() {
+ super(new RCFileInputFormat());
+ }
+
+ /**
+ * returns super.createRecordReader(split, taskAttempt). This is useful when
+ * a sub class has its own their own wrapper over the base recordreader.
+ */
+ protected final RecordReader<LongWritable, Writable>
+ createUnwrappedRecordReader(InputSplit split, TaskAttemptContext taskAttempt)
+ throws IOException, InterruptedException {
+ return super.createRecordReader(split, taskAttempt);
+ }
+
+}
@@ -4,20 +4,19 @@
import java.util.ArrayList;
import java.util.List;
+import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,24 +28,20 @@
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Message.Builder;
import com.twitter.data.proto.Misc.ColumnarMetadata;
-import com.twitter.elephantbird.pig.util.ProtobufToPig;
-import com.twitter.elephantbird.pig.util.RCFileUtil;
+import com.twitter.elephantbird.util.RCFileUtil;
import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;
-public class RCFileProtobufInputFormat extends MapReduceInputFormatWrapper<LongWritable, BytesRefArrayWritable> {
+public class RCFileProtobufInputFormat extends RCFileBaseInputFormat {
private static final Logger LOG = LoggerFactory.getLogger(RCFileProtobufInputFormat.class);
private TypeRef<Message> typeRef;
- /** internal, for MR use only. */
- public RCFileProtobufInputFormat() {
- super(new RCFileInputFormat<LongWritable, BytesRefArrayWritable>());
- }
+ // for MR
+ public RCFileProtobufInputFormat() {}
public RCFileProtobufInputFormat(TypeRef<Message> typeRef) {
- this();
this.typeRef = typeRef;
}
@@ -58,19 +53,31 @@ public static void setClassConf(Class<? extends Message> protoClass, Configurati
Protobufs.setClassConf(conf, RCFileProtobufInputFormat.class, protoClass);
}
- public class ProtobufReader extends FilterRecordReader<LongWritable, BytesRefArrayWritable> {
+ @Override
+ public RecordReader<LongWritable, Writable>
+ createRecordReader(InputSplit split, TaskAttemptContext taskAttempt)
+ throws IOException, InterruptedException {
+ if (typeRef == null) {
+ typeRef = Protobufs.getTypeRef(taskAttempt.getConfiguration(), RCFileProtobufInputFormat.class);
+ }
+ return new ProtobufReader(createUnwrappedRecordReader(split, taskAttempt));
+ }
- private final TupleFactory tf = TupleFactory.getInstance();
- private final ProtobufToPig protoToPig = new ProtobufToPig();
+ public class ProtobufReader extends FilterRecordReader<LongWritable, Writable> {
- private Builder msgBuilder;
- private boolean readUnknownsColumn = false;
- private List<FieldDescriptor> knownRequiredFields = Lists.newArrayList();
- private ArrayList<Integer> columnsBeingRead = Lists.newArrayList();
+ protected Builder msgBuilder;
+ protected boolean readUnknownsColumn = false;
+ protected List<FieldDescriptor> knownRequiredFields = Lists.newArrayList();
+ protected ArrayList<Integer> columnsBeingRead = Lists.newArrayList();
- private Message currentValue;
+ protected ProtobufWritable<Message> protoWritable;
- public ProtobufReader(RecordReader<LongWritable, BytesRefArrayWritable> reader) {
+ /**
+ * The reader is expected to be a
+ * <code>RecordReader< LongWritable, BytesRefArrayWritable ></code>.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public ProtobufReader(RecordReader reader) {
super(reader);
}
@@ -87,6 +94,7 @@ public void initialize(InputSplit split, TaskAttemptContext ctx)
* read the the "unknowns" column (the last column).
*/
msgBuilder = Protobufs.getMessageBuilder(typeRef.getRawClass());
+ protoWritable = ProtobufWritable.newInstance(typeRef.getRawClass());
Descriptor msgDesc = msgBuilder.getDescriptorForType();
final List<FieldDescriptor> msgFields = msgDesc.getFields();
@@ -131,20 +139,21 @@ public Integer apply(FieldDescriptor fd) {
}
@Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- currentValue = null;
- return super.nextKeyValue();
+ public Writable getCurrentValue() throws IOException, InterruptedException {
+ protoWritable.set(getCurrentProtobufValue());
+ return protoWritable;
+ }
+
+ /** returns <code>super.getCurrentValue()</code> */
+ public BytesRefArrayWritable getCurrentBytesRefArrayWritable() throws IOException, InterruptedException {
+ return (BytesRefArrayWritable) super.getCurrentValue();
}
/**
* Builds protobuf message from the raw bytes returned by RCFile reader.
*/
public Message getCurrentProtobufValue() throws IOException, InterruptedException {
- if (currentValue != null) {
- return currentValue;
- }
-
- BytesRefArrayWritable byteRefs = getCurrentValue();
+ BytesRefArrayWritable byteRefs = getCurrentBytesRefArrayWritable();
if (byteRefs == null) {
return null;
}
@@ -170,54 +179,8 @@ public Message getCurrentProtobufValue() throws IOException, InterruptedExceptio
}
}
- currentValue = builder.build();
- return currentValue;
- }
-
- /**
- * Returns a Tuple consisting of required fields with out creating
- * a Protobuf message at the top level.
- */
- public Tuple getCurrentTupleValue() throws IOException, InterruptedException {
-
- BytesRefArrayWritable byteRefs = getCurrentValue();
- if (byteRefs == null) {
- return null;
- }
-
- Tuple tuple = tf.newTuple(knownRequiredFields.size());
-
- for (int i=0; i < knownRequiredFields.size(); i++) {
- BytesRefWritable buf = byteRefs.get(columnsBeingRead.get(i));
- FieldDescriptor fd = knownRequiredFields.get(i);
- Object value = null;
- if (buf.getLength() > 0) {
- value = Protobufs.readFieldNoTag(
- CodedInputStream.newInstance(buf.getData(), buf.getStart(), buf.getLength()),
- knownRequiredFields.get(i),
- msgBuilder);
- } else if (fd.getType() != FieldDescriptor.Type.MESSAGE) {
- value = fd.getDefaultValue();
- }
- tuple.set(i, protoToPig.fieldToPig(fd, value));
- }
-
- if (readUnknownsColumn) {
- // we can handle this if needed.
- throw new IOException("getCurrentTupleValue() is not supported when 'readUnknownColumns' is set");
- }
-
- return tuple;
+ return builder.build();
}
}
- @Override @SuppressWarnings("unchecked")
- public RecordReader createRecordReader(InputSplit split,
- TaskAttemptContext taskAttempt)
- throws IOException, InterruptedException {
- if (typeRef == null) {
- typeRef = Protobufs.getTypeRef(taskAttempt.getConfiguration(), RCFileProtobufInputFormat.class);
- }
- return new ProtobufReader(super.createRecordReader(split, taskAttempt));
- }
-}
+ }
@@ -0,0 +1,94 @@
+package com.twitter.elephantbird.mapreduce.input;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.twitter.elephantbird.pig.util.ProtobufToPig;
+import com.twitter.elephantbird.util.Protobufs;
+import com.twitter.elephantbird.util.TypeRef;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import java.io.IOException;
+
+/**
+ * This is a wrapper over RCFileProtobufInputFormat and provides a method
+ * to create a Tuple directly from RCFile bytes, skipping building a protobuf
+ * first.
+ */
+public class RCFileProtobufTupleInputFormat extends RCFileProtobufInputFormat {
+
+ // for MR
+ public RCFileProtobufTupleInputFormat() {}
+
+ public RCFileProtobufTupleInputFormat(TypeRef<Message> typeRef) {
+ super(typeRef);
+ }
+
+ @Override
+ public RecordReader<LongWritable, Writable>
+ createRecordReader(InputSplit split, TaskAttemptContext taskAttempt)
+ throws IOException, InterruptedException {
+ return new TupleReader(createUnwrappedRecordReader(split, taskAttempt));
+ }
+
+ public class TupleReader extends ProtobufReader {
+
+ private final TupleFactory tf = TupleFactory.getInstance();
+ private final ProtobufToPig protoToPig = new ProtobufToPig();
+
+ /**
+ * The reader is expected to be a
+ * <code>RecordReader< LongWritable, BytesRefArrayWritable ></code>.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public TupleReader(RecordReader reader) {
+ super(reader);
+ }
+
+ /**
+ * Returns a Tuple consisting of required fields with out creating
+ * a Protobuf message at the top level.
+ */
+ public Tuple getCurrentTupleValue() throws IOException, InterruptedException {
+
+ BytesRefArrayWritable byteRefs = getCurrentBytesRefArrayWritable();
+
+ if (byteRefs == null) {
+ return null;
+ }
+
+ Tuple tuple = tf.newTuple(knownRequiredFields.size());
+
+ for (int i=0; i < knownRequiredFields.size(); i++) {
+ BytesRefWritable buf = byteRefs.get(columnsBeingRead.get(i));
+ Descriptors.FieldDescriptor fd = knownRequiredFields.get(i);
+ Object value = null;
+ if (buf.getLength() > 0) {
+ value = Protobufs.readFieldNoTag(
+ CodedInputStream.newInstance(buf.getData(), buf.getStart(), buf.getLength()),
+ knownRequiredFields.get(i),
+ msgBuilder);
+ } else if (fd.getType() != Descriptors.FieldDescriptor.Type.MESSAGE) {
+ value = fd.getDefaultValue();
+ }
+ tuple.set(i, protoToPig.fieldToPig(fd, value));
+ }
+
+ if (isReadingUnknonwsColumn()) {
+ // we can handle this if needed.
+ throw new IOException("getCurrentTupleValue() is not supported when 'readUnknownColumns' is set");
+ }
+
+ return tuple;
+ }
+ }
+
+}
Oops, something went wrong.

0 comments on commit 2c0f0fe

Please sign in to comment.