Skip to content

Commit

Permalink
Merge pull request twitter#104 from billonahill/bg-analytics-1471
Browse files Browse the repository at this point in the history
Adding Pig schema - to - ProtoDescriptor functionality with tests
  • Loading branch information
rangadi committed Nov 18, 2011
2 parents d005ea3 + 539efc9 commit 5f86340
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 0 deletions.
113 changes: 113 additions & 0 deletions src/java/com/twitter/elephantbird/pig/util/PigToProtobuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,26 @@
import java.util.List;

import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;

import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -106,6 +118,70 @@ public static Message tupleToMessage(Builder builder, Tuple tuple) {
return builder.build();
}

/**
* For a given <code>ResourceSchema</code>, generate a protobufs <code>Descriptor</code> with analagous field names
* and types.
*
* @param schema Pig schema.
* @return Protobufs Descriptor
* @throws Descriptors.DescriptorValidationException
*/
public static Descriptor schemaToProtoDescriptor(ResourceSchema schema)
throws DescriptorValidationException {
return schemaToProtoDescriptor(schema, null);
}

/**
* For a given <code>ResourceSchema</code>, generate a protobufs <code>Descriptor</code> with analogous field names
* and types.
*
* @param schema Pig schema.
* @param extraFields optionally pass a List of extra fields (Pairs of name:type) to be included.
* @return Protobufs Descriptor
* @throws Descriptors.DescriptorValidationException
*/
public static Descriptor schemaToProtoDescriptor(ResourceSchema schema, List<Pair<String, Type>> extraFields)
throws DescriptorValidationException {

// init protobufs
DescriptorProto.Builder desBuilder = DescriptorProto.newBuilder();

int count = 0;
for (ResourceFieldSchema fieldSchema : schema.getFields()) {
// Pig types
int position = ++count;
String fieldName = fieldSchema.getName();
byte dataTypeId = fieldSchema.getType();

// determine and add protobuf types
Type protoType = pigTypeToProtoType(dataTypeId);
LOG.info("Mapping Pig field " + fieldName + " of type " + dataTypeId + " to protobuf type: " + protoType);

addField(desBuilder, fieldName, position, protoType);
}

if (count == 0) {
throw new IllegalArgumentException("ResourceSchema does not have any fields");
}

// If extra fields are needed, let's add them
if (extraFields != null) {
for (Pair<String, Type> extraField : extraFields) {
addField(desBuilder, extraField.first, ++count, extraField.second);
}
}

desBuilder.setName("PigToProtobufDynamicBuilder");

DescriptorProto descriptorProto = desBuilder.build();
FileDescriptorProto fileDescriptorProto = FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();

FileDescriptor[] fileDescs = new FileDescriptor[0];
FileDescriptor dynamicDescriptor = FileDescriptor.buildFrom(fileDescriptorProto, fileDescs);

return dynamicDescriptor.findMessageTypeByName("PigToProtobufDynamicBuilder");
}

/**
* Converts a DataBag into a List of objects with the type in the given FieldDescriptor. DataBags
* don't map cleanly to repeated protobuf types, so each Tuple has to be unwrapped (by taking the
Expand Down Expand Up @@ -158,4 +234,41 @@ private static Object tupleFieldToSingleField(FieldDescriptor fieldDescriptor, O
return tupleField;
}
}

/**
* Add a field to a protobuf builder
*/
private static void addField(DescriptorProto.Builder builder, String name, int fieldId, Type type) {
FieldDescriptorProto.Builder fdBuilder = FieldDescriptorProto.newBuilder()
.setName(name)
.setNumber(fieldId)
.setType(type);
builder.addField(fdBuilder.build());
}

/**
* For a given Pig type, return the protobufs type that maps to it.
*/
private static Type pigTypeToProtoType(byte pigTypeId) {

switch(pigTypeId) {
case DataType.BOOLEAN:
return Type.TYPE_BOOL;
case DataType.INTEGER:
return Type.TYPE_INT32;
case DataType.LONG:
return Type.TYPE_INT64;
case DataType.FLOAT:
return Type.TYPE_FLOAT;
case DataType.DOUBLE:
return Type.TYPE_DOUBLE;
case DataType.CHARARRAY:
return Type.TYPE_STRING;
case DataType.BYTEARRAY:
return Type.TYPE_BYTES;
default:
throw new IllegalArgumentException("Unsupported Pig type passed (" + pigTypeId +
") where a simple type is expected while converting Pig to a dynamic Protobuf");
}
}
}
8 changes: 8 additions & 0 deletions src/java/com/twitter/elephantbird/util/Protobufs.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ private static Class<? extends Message> getProtobufClass(Configuration conf, Str
return protoClass.asSubclass(Message.class);
}

/**
* For a configured protoClass, should the message be dynamic or is it a pre-generated Message class? If protoClass is
* null or set to DynamicMessage.class, then the configurer intends for a dynamically generated protobuf to be used.
*/
public static boolean useDynamicProtoMessage(Class protoClass) {
return protoClass == null || protoClass.getCanonicalName().equals(DynamicMessage.class.getCanonicalName());
}

public static Class<? extends Message> getInnerProtobufClass(String canonicalClassName) {
// is an inner class and is not visible from the outside. We have to instantiate
String parentClass = canonicalClassName.substring(0, canonicalClassName.lastIndexOf("."));
Expand Down
103 changes: 103 additions & 0 deletions src/test/com/twitter/elephantbird/pig/util/TestPigToProtobuf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.twitter.elephantbird.pig.util;

import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import org.apache.pig.ResourceSchema;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Pair;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
* Verifies that <code>PigToProtobuf</code> converts from Pig schemas to Protobuf descriptors properly
*
* @author billg
*/
public class TestPigToProtobuf {

@Test
public void testConvertValidTypes() throws Descriptors.DescriptorValidationException {
Schema schema = new Schema();

schema.add(new Schema.FieldSchema("chararray", DataType.CHARARRAY));
schema.add(new Schema.FieldSchema("bytearray", DataType.BYTEARRAY));
schema.add(new Schema.FieldSchema("boolean", DataType.BOOLEAN));
schema.add(new Schema.FieldSchema("integer", DataType.INTEGER));
schema.add(new Schema.FieldSchema("long", DataType.LONG));
schema.add(new Schema.FieldSchema("float", DataType.FLOAT));
schema.add(new Schema.FieldSchema("double", DataType.DOUBLE));

Descriptors.Descriptor descriptor = PigToProtobuf.schemaToProtoDescriptor(new ResourceSchema(schema));

Assert.assertEquals("Incorrect data size", 7, descriptor.getFields().size());
Iterator<Descriptors.FieldDescriptor> fieldIterator = descriptor.getFields().iterator();
assetFieldDescriptor(fieldIterator.next(), "chararray", Descriptors.FieldDescriptor.Type.STRING);
assetFieldDescriptor(fieldIterator.next(), "bytearray", Descriptors.FieldDescriptor.Type.BYTES);
assetFieldDescriptor(fieldIterator.next(), "boolean", Descriptors.FieldDescriptor.Type.BOOL);
assetFieldDescriptor(fieldIterator.next(), "integer", Descriptors.FieldDescriptor.Type.INT32);
assetFieldDescriptor(fieldIterator.next(), "long", Descriptors.FieldDescriptor.Type.INT64);
assetFieldDescriptor(fieldIterator.next(), "float", Descriptors.FieldDescriptor.Type.FLOAT);
assetFieldDescriptor(fieldIterator.next(), "double", Descriptors.FieldDescriptor.Type.DOUBLE);
}

@Test
public void testConvertExtraFields() throws Descriptors.DescriptorValidationException {
Schema schema = new Schema();

schema.add(new Schema.FieldSchema("chararray", DataType.CHARARRAY));
schema.add(new Schema.FieldSchema("bytearray", DataType.BYTEARRAY));

List<Pair<String, DescriptorProtos.FieldDescriptorProto.Type>> extraFields =
new ArrayList<Pair<String, DescriptorProtos.FieldDescriptorProto.Type>>();
extraFields.add(new Pair<String, DescriptorProtos.FieldDescriptorProto.Type>(
"extra_string", DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING));
extraFields.add(new Pair<String, DescriptorProtos.FieldDescriptorProto.Type>(
"extra_int", DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32));

Descriptors.Descriptor descriptor = PigToProtobuf.schemaToProtoDescriptor(new ResourceSchema(schema), extraFields);

Assert.assertEquals("Incorrect data size", 4, descriptor.getFields().size());
Iterator<Descriptors.FieldDescriptor> fieldIterator = descriptor.getFields().iterator();
assetFieldDescriptor(fieldIterator.next(), "chararray", Descriptors.FieldDescriptor.Type.STRING);
assetFieldDescriptor(fieldIterator.next(), "bytearray", Descriptors.FieldDescriptor.Type.BYTES);
assetFieldDescriptor(fieldIterator.next(), "extra_string", Descriptors.FieldDescriptor.Type.STRING);
assetFieldDescriptor(fieldIterator.next(), "extra_int", Descriptors.FieldDescriptor.Type.INT32);
}

@Test(expected=IllegalArgumentException.class)
public void testConvertInvalidTypeBag() throws Descriptors.DescriptorValidationException {
Schema schema = new Schema();
schema.add(new Schema.FieldSchema("bag", DataType.BAG));
PigToProtobuf.schemaToProtoDescriptor(new ResourceSchema(schema));
}

@Test(expected=IllegalArgumentException.class)
public void testConvertInvalidTypeMap() throws Descriptors.DescriptorValidationException {
Schema schema = new Schema();
schema.add(new Schema.FieldSchema("map", DataType.MAP));
PigToProtobuf.schemaToProtoDescriptor(new ResourceSchema(schema));
}

@Test(expected=IllegalArgumentException.class)
public void testConvertInvalidTypeTuple() throws Descriptors.DescriptorValidationException {
Schema schema = new Schema();
schema.add(new Schema.FieldSchema("tuple", DataType.TUPLE));
PigToProtobuf.schemaToProtoDescriptor(new ResourceSchema(schema));
}

@Test(expected=IllegalArgumentException.class)
public void testConvertInvalidSchemaEmpty() throws Descriptors.DescriptorValidationException {
PigToProtobuf.schemaToProtoDescriptor(new ResourceSchema(new Schema()));
}

private static void assetFieldDescriptor(Descriptors.FieldDescriptor fieldDescriptor,
String name, Descriptors.FieldDescriptor.Type type) {
Assert.assertEquals("Incorrect field name", name, fieldDescriptor.getName());
Assert.assertEquals("Incorrect field type", type, fieldDescriptor.getType());
}
}

0 comments on commit 5f86340

Please sign in to comment.