Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

preserve unknown fields for Thrift + RCFile #181

Merged
merged 1 commit into from

2 participants

@rangadi
Collaborator

preserve unknown fields when user writes Thrift serialized bytes rather than a Thrift object.
Unit test is updated.

@rangadi rangadi preserve unknown fields when user writes Thrift
serialized bytes rather than a Thrift object.
Unit test is updated.
f68cc7f
@traviscrawford traviscrawford merged commit 7a33623 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 10, 2012
  1. @rangadi

    preserve unknown fields when user writes Thrift

    rangadi authored
    serialized bytes rather than a Thrift object.
    Unit test is updated.
This page is out of date. Refresh to see the latest.
View
134 src/java/com/twitter/elephantbird/mapreduce/output/RCFileThriftOutputFormat.java
@@ -2,11 +2,13 @@
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -14,8 +16,14 @@
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TField;
+import org.apache.thrift.protocol.TProtocolUtil;
+import org.apache.thrift.protocol.TType;
import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.twitter.data.proto.Misc.ColumnarMetadata;
import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
import com.twitter.elephantbird.thrift.TStructDescriptor;
@@ -27,18 +35,17 @@
* OutputFormat for storing Thrift objects in RCFile.<p>
*
* Each of the top level fields is stored in a separate column.
- * Thrift field ids are stored in RCFile metadata.
+ * Thrift field ids are stored in RCFile metadata.<p>
+ *
+ * The user can write either a {@link ThriftWritable} with the Thrift object
+ * or a {@link BytesWritable} with serialized Thrift bytes. The latter
+ * ensures that all the fields are preserved even if the current Thrift
+ * definition does not match the definition represented in the serialized bytes.
+ * Any fields not recognized by current Thrift class are stored in the last
+ * column.
*/
public class RCFileThriftOutputFormat extends RCFileOutputFormat {
- /*
- * TODO: handle unknown fields.
- * Thrift objects do not carry "unknown fields" (as described in javadoc
- * for {@link RCFileProtobufOutputFormat}) and as a result the last column
- * is empty. In order to handle such fields, the output format should
- * accept raw serialized bytes and deserialize it it self.
- */
-
// typeRef is only required for setting metadata for the RCFile
private TypeRef<? extends TBase<?, ?>> typeRef;
private TStructDescriptor tDesc;
@@ -47,9 +54,6 @@
private BytesRefArrayWritable rowWritable = new BytesRefArrayWritable();
private BytesRefWritable[] colValRefs;
- private ByteStream.Output byteStream = new ByteStream.Output();
- private TBinaryProtocol tProto = new TBinaryProtocol(
- new TIOStreamTransport(byteStream));
/** internal, for MR use only. */
public RCFileThriftOutputFormat() {
@@ -86,13 +90,39 @@ protected ColumnarMetadata makeColumnarMetadata() {
private class ProtobufWriter extends RCFileOutputFormat.Writer {
+ private ByteStream.Output byteStream = new ByteStream.Output();
+ private TBinaryProtocol tProto = new TBinaryProtocol(
+ new TIOStreamTransport(byteStream));
+
+ // used when deserializing thrift bytes
+ private Map<Short, Integer> idMap;
+ private TMemoryInputTransport mTransport;
+ private TBinaryProtocol skipProto;
+
ProtobufWriter(TaskAttemptContext job) throws IOException {
super(RCFileThriftOutputFormat.this, job, makeColumnarMetadata());
}
@Override @SuppressWarnings("unchecked")
public void write(NullWritable key, Writable value) throws IOException, InterruptedException {
- TBase tObj = ((ThriftWritable<TBase>)value).get();
+ try {
+ if (value instanceof BytesWritable) {
+ // TODO: handled errors
+ fromBytes((BytesWritable)value);
+ } else {
+ fromObject((TBase<?, ?>)((ThriftWritable)value).get());
+ }
+ } catch (TException e) {
+ // might need to tolerate a few errors.
+ throw new IOException(e);
+ }
+
+ super.write(null, rowWritable);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void fromObject(TBase tObj)
+ throws IOException, InterruptedException, TException {
byteStream.reset(); // reinitialize the byteStream if buffer is too large?
int startPos = 0;
@@ -104,25 +134,83 @@ public void write(NullWritable key, Writable value) throws IOException, Interrup
Field fd = tFields.get(i);
if (tObj.isSet(fd.getFieldIdEnum())) {
- try {
- ThriftUtils.writeFieldNoTag(tProto, fd, tDesc.getFieldValue(i, tObj));
- } catch (TException e) {
- throw new IOException(e);
- }
+ ThriftUtils.writeFieldNoTag(tProto, fd, tDesc.getFieldValue(i, tObj));
}
- } else { // last column : write unknown fields
- // TODO: we need to deserialize thrift buffer ourselves to handle
- // unknown fields.
- }
+ } // else { } : no 'unknown fields' in thrift object
colValRefs[i].set(byteStream.getData(),
startPos,
byteStream.getCount() - startPos);
startPos = byteStream.getCount();
}
+ }
+
+ /**
+ * extract serialized bytes for each field, including unknown fields and
+ * store those byes in columns.
+ */
+ private void fromBytes(BytesWritable bytesWritable)
+ throws IOException, InterruptedException, TException {
+
+ if (mTransport == null) {
+ initIdMap();
+ mTransport = new TMemoryInputTransport();
+ skipProto = new TBinaryProtocol(mTransport);
+ }
- super.write(null, rowWritable);
+ byte[] bytes = bytesWritable.getBytes();
+ mTransport.reset(bytes, 0, bytesWritable.getLength());
+ byteStream.reset();
+
+ // set all the fields to null
+ for(BytesRefWritable ref : colValRefs) {
+ ref.set(bytes, 0, 0);
+ }
+
+ skipProto.readStructBegin();
+
+ while (true) {
+ int start = mTransport.getBufferPosition();
+
+ TField field = skipProto.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+
+ int fieldStart = mTransport.getBufferPosition();
+
+ // skip still creates and copies primitive objects (String, buffer, etc)
+ // skipProto could override readString() and readBuffer() to avoid that.
+ TProtocolUtil.skip(skipProto, field.type);
+
+ int end = mTransport.getBufferPosition();
+
+ Integer idx = idMap.get(field.id);
+
+ if (idx != null && field.type == tFields.get(idx).getType()) {
+ // known field
+ colValRefs[idx].set(bytes, fieldStart, end-fieldStart);
+ } else {
+ // unknown field, copy the bytes to last column (with field id)
+ byteStream.write(bytes, start, end-start);
+ }
+ }
+
+ if (byteStream.getCount() > 0) {
+ byteStream.write(TType.STOP);
+ colValRefs[colValRefs.length-1].set(byteStream.getData(),
+ 0,
+ byteStream.getCount());
+ }
+ }
+
+ private void initIdMap() {
+ idMap = Maps.newHashMap();
+ for(int i=0; i<tFields.size(); i++) {
+ idMap.put(tFields.get(i).getFieldId(), i);
+ }
+ idMap = ImmutableMap.copyOf(idMap);
}
}
View
15 src/java/com/twitter/elephantbird/util/ThriftUtils.java
@@ -195,10 +195,13 @@ public static void writeFieldNoTag(TProtocol proto,
if (field.getType() == TType.MAP) {
+ Field valueField = field.getMapValueField();
Map<?, ?> map = (Map<?, ?>)value;
+
+ proto.writeByte(innerField.getType());
+ proto.writeByte(valueField.getType());
proto.writeI32(map.size());
- Field valueField = field.getMapValueField();
for(Entry<?, ?> entry : map.entrySet()) {
writeSingleFieldNoTag(proto, innerField, entry.getKey());
writeSingleFieldNoTag(proto, valueField, entry.getValue());
@@ -207,6 +210,8 @@ public static void writeFieldNoTag(TProtocol proto,
} else { // SET or LIST
Collection<?> coll = (Collection<?>)value;
+
+ proto.writeByte(innerField.getType());
proto.writeI32(coll.size());
for(Object v : coll) {
@@ -278,10 +283,13 @@ public static Object readFieldNoTag(TProtocol proto,
// collection or a map:
- int nEntries = proto.readI32();
if (field.getType() == TType.MAP) {
+ proto.readByte();
+ proto.readByte();
+ int nEntries = proto.readI32();
+
Map<Object, Object> map = Maps.newHashMap();
Field valueField = field.getMapValueField();
@@ -293,6 +301,9 @@ public static Object readFieldNoTag(TProtocol proto,
} else { // SET or LIST
+ proto.readByte();
+ int nEntries = proto.readI32();
+
for(int i=0; i<nEntries; i++) {
coll.add(readFieldNoTag(proto, innerField));
}
View
4 src/test/com/twitter/elephantbird/pig/load/TestRCFileProtobufStorage.java
@@ -90,11 +90,11 @@ public void setUp() throws Exception {
@Test
public void testRCFileStorage() throws Exception {
/* create a directory with two rcfiles :
- * - one created with normal Person objects using Pig storage.
+ * - one created with normal Person objects using RCFileProtobufPigStorage.
* - other with PersonWithoutEmail (for testing unknown fields)
* using the same objects as the first one.
*
- * Then load both files using Pig loader.
+ * Then load both files using RCFileProtobufPigLoader
*/
// write to rcFile using RCFileProtobufStorage
View
99 src/test/com/twitter/elephantbird/pig/load/TestRCFileThriftStorage.java
@@ -6,7 +6,15 @@
import java.util.Iterator;
import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.DataByteArray;
@@ -17,13 +25,16 @@
import com.google.common.collect.ImmutableMap;
import com.twitter.elephantbird.mapreduce.io.ThriftConverter;
+import com.twitter.elephantbird.mapreduce.output.RCFileThriftOutputFormat;
import com.twitter.elephantbird.pig.piggybank.ThriftBytesToTuple;
import com.twitter.elephantbird.pig.store.RCFileThriftPigStorage;
import com.twitter.elephantbird.pig.util.ThriftToPig;
import com.twitter.elephantbird.thrift.test.TestName;
import com.twitter.elephantbird.thrift.test.TestPerson;
+import com.twitter.elephantbird.thrift.test.TestPersonExtended;
import com.twitter.elephantbird.thrift.test.TestPhoneType;
import com.twitter.elephantbird.util.Codecs;
+import com.twitter.elephantbird.util.ThriftUtils;
/**
* Test RCFile loader and storage with Thrift objects
@@ -36,16 +47,17 @@
private final File inputDir = new File(testDir, "in");
private final File rcfile_in = new File(testDir, "rcfile_in");
- private ThriftToPig<TestPerson> thriftToPig = ThriftToPig.newInstance(TestPerson.class);
- private ThriftConverter<TestPerson> thriftConverter = ThriftConverter.newInstance(TestPerson.class);
+ private ThriftToPig<TestPersonExtended> thriftToPig = ThriftToPig.newInstance(TestPersonExtended.class);
+ private ThriftConverter<TestPersonExtended> thriftConverter = ThriftConverter.newInstance(TestPersonExtended.class);
- private final TestPerson[] records = new TestPerson[]{ makePerson(0),
- makePerson(1),
- makePerson(2) };
+ private final TestPersonExtended[] records = new TestPersonExtended[]{
+ makePerson(0),
+ makePerson(1),
+ makePerson(2) };
private static final Base64 base64 = Codecs.createStandardBase64();
- public static class B64ToTuple extends ThriftBytesToTuple<TestPerson> {
+ public static class B64ToTuple extends ThriftBytesToTuple<TestPersonExtended> {
public B64ToTuple(String className) {
super(className);
}
@@ -73,7 +85,7 @@ public void setUp() throws Exception {
// create an text file with b64 encoded thrift objects.
FileOutputStream out = new FileOutputStream(new File(inputDir, "persons_b64.txt"));
- for (TestPerson rec : records) {
+ for (TestPersonExtended rec : records) {
out.write(base64.encode(thriftConverter.toBytes(rec)));
out.write('\n');
}
@@ -82,6 +94,13 @@ public void setUp() throws Exception {
@Test
public void testRCFileSThrifttorage() throws Exception {
+ /* Create a directory with two files:
+ * - one created with TestPersonExtended objects using RCFileThriftPigStorage
+ * - one created with TestPerson using serialized TestPersonExtended objects
+ * to test handling of unknown fields.
+ *
+ * Then load both files using RCFileThriftPigLoader.
+ */
// write to rcFile using RCFileThriftPigStorage
for(String line : String.format(
@@ -92,31 +111,48 @@ public void testRCFileSThrifttorage() throws Exception {
"STORE A into '%s' using %s('%s');\n"
, B64ToTuple.class.getName()
- , TestPerson.class.getName()
+ , TestPersonExtended.class.getName()
, inputDir.toURI().toString()
, rcfile_in.toURI().toString()
, RCFileThriftPigStorage.class.getName()
- , TestPerson.class.getName()
+ , TestPersonExtended.class.getName()
).split("\n")) {
pigServer.registerQuery(line + "\n");
}
-
- // unknown fields are not yet supported for Thrift.
+ // the RCFile created above has 5 columns : 4 fields in extended Person
+ // and one for unknown fields (this column is empty in this case).
+
+ // store another file with unknowns, by writing the person object with
+ // TestPerson rather than with TestPersionExtended, but using
+ // serialized TestPersionExtended objects.
+
+ RecordWriter<Writable, Writable> thriftWriter =
+ createThriftWriter(TestPerson.class, new File(rcfile_in, "persons_with_unknows.rc"));
+ for(TestPersonExtended person : records) {
+ // write the bytes from TestPersonExtened
+ thriftWriter.write(null, new BytesWritable(thriftConverter.toBytes(person)));
+ }
+ thriftWriter.close(null);
+ // the RCFile has 3 columns : 2 fields in TestPerson and one for unknown
+ // fields. In time unknowns-column contains 2 fields from TestPersonExtended
+ // that were not understood by TestPerson.
// load using RCFileThriftPigLoader
pigServer.registerQuery(String.format(
"A = load '%s' using %s('%s');\n"
, rcfile_in.toURI().toString()
, RCFileThriftPigLoader.class.getName()
- , TestPerson.class.getName()));
+ , TestPersonExtended.class.getName()));
// verify the result:
Iterator<Tuple> rows = pigServer.openIterator("A");
- for(TestPerson person : records) {
- String expected = personToString(person);
- Assert.assertEquals(expected, rows.next().toString());
+ for (int i=0; i<2; i++) {
+ for(TestPersonExtended person : records) {
+ String expected = personToString(person);
+ Assert.assertEquals(expected, rows.next().toString());
+ }
}
// clean up on successful run
@@ -124,14 +160,39 @@ public void testRCFileSThrifttorage() throws Exception {
}
// return a Person thrift object
- private TestPerson makePerson(int index) {
- return new TestPerson(
+ private TestPersonExtended makePerson(int index) {
+ return new TestPersonExtended(
new TestName("bob " + index, "jenkins"),
ImmutableMap.of(TestPhoneType.HOME,
- "408-555-5555" + "ex" + index));
+ "408-555-5555" + "ex" + index),
+ "bob_" + index + "@examle.com",
+ new TestName("alice " + index, "smith"));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static RecordWriter<Writable, Writable>
+ createThriftWriter(Class<?> thriftClass, final File file)
+ throws IOException, InterruptedException {
+
+ OutputFormat outputFormat = (
+ new RCFileThriftOutputFormat(ThriftUtils.getTypeRef(thriftClass.getName())) {
+ @Override
+ public Path getDefaultWorkFile(TaskAttemptContext context,
+ String extension) throws IOException {
+ return new Path(file.toURI().toString());
+ }
+ });
+
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.output.compress", true);
+ // for some reason GzipCodec results in loader failure on Mac OS X
+ conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec");
+
+ return outputFormat.getRecordWriter(
+ new TaskAttemptContext(conf, new TaskAttemptID()));
}
- private String personToString(TestPerson person) {
+ private String personToString(TestPersonExtended person) {
return thriftToPig.getPigTuple(person).toString();
}
View
8 src/thrift/test.thrift
@@ -20,6 +20,14 @@ struct TestPerson {
2: map<TestPhoneType, string> phones, // for testing enum keys in maps.
}
+/* TestPerson, plus couple more traits */
+struct TestPersonExtended {
+ 1: TestName name,
+ 2: map<TestPhoneType, string> phones,
+ 3: string email,
+ 4: TestName friend
+}
+
struct TestIngredient {
1: string name,
2: string color,
Something went wrong with that request. Please try again.