Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Avro Resolving Serialization #55

Closed
wants to merge 17 commits into
from
View
@@ -86,6 +86,7 @@
</target>
<target name="buildtest" description="Compile test classes">
+ <java-avro-compiler src="${commontestsrc.dir}/voldemort/config" classpath="test-classpath"/>
<replace-dir dir="${testclasses.dir}" />
<copy todir="${testclasses.dir}">
<fileset dir="${commontestsrc.dir}">
@@ -130,7 +131,7 @@
<arg line="${proto.sources}"/>
</exec>
</target>
-
+
<target name="jar" depends="build" description="Build server jar file">
<jar destfile="${dist.dir}/${name}-${curr.release}.jar">
<fileset dir="${classes.dir}">
@@ -336,6 +337,23 @@
</tar>
</sequential>
</macrodef>
+
+ <macrodef name="java-avro-compiler">
+ <attribute name="src"/>
+ <attribute name="classpath"/>
+ <sequential>
+ <taskdef name="schema" classname="org.apache.avro.specific.SchemaTask">
+ <classpath refid="@{classpath}" />
+ </taskdef>
+
+ <schema destdir="${commontestsrc.dir}">
+ <fileset dir="@{src}">
+ <include name="**/*.avsc" />
+ </fileset>
+ </schema>
+ </sequential>
+ </macrodef>
+
<target name="snapshot" description="Create a release-snapshot zip file with everything pre-built.">
<create-release-artifacts version="${curr.release.snapshot}" />
@@ -24,6 +24,8 @@
import voldemort.serialization.avro.AvroGenericSerializer;
import voldemort.serialization.avro.AvroReflectiveSerializer;
+import voldemort.serialization.avro.AvroResolvingGenericSerializer;
+import voldemort.serialization.avro.AvroResolvingSpecificSerializer;
import voldemort.serialization.avro.AvroSpecificSerializer;
import voldemort.serialization.json.JsonTypeDefinition;
import voldemort.serialization.json.JsonTypeSerializer;
@@ -49,6 +51,8 @@
private static final String AVRO_GENERIC_TYPE_NAME = "avro-generic";
private static final String AVRO_SPECIFIC_TYPE_NAME = "avro-specific";
private static final String AVRO_REFLECTIVE_TYPE_NAME = "avro-reflective";
+ private static final String AVRO_RESOLVING_SPECIFIC_TYPE_NAME = "avro-specific-resolving";
+ private static final String AVRO_RESOLVING_GENERIC_TYPE_NAME = "avro-generic-resolving";
public Serializer<?> getSerializer(SerializerDefinition serializerDef) {
String name = serializerDef.getName();
@@ -72,13 +76,17 @@
} else if(name.equals(PROTO_BUF_TYPE_NAME)) {
return new ProtoBufSerializer<Message>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(THRIFT_TYPE_NAME)) {
- return new ThriftSerializer<TBase<?,?>>(serializerDef.getCurrentSchemaInfo());
+ return new ThriftSerializer<TBase<?, ?>>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_GENERIC_TYPE_NAME)) {
return new AvroGenericSerializer(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_SPECIFIC_TYPE_NAME)) {
return new AvroSpecificSerializer<SpecificRecord>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_REFLECTIVE_TYPE_NAME)) {
return new AvroReflectiveSerializer<Object>(serializerDef.getCurrentSchemaInfo());
+ } else if(name.equals(AVRO_RESOLVING_SPECIFIC_TYPE_NAME)) {
+ return new AvroResolvingSpecificSerializer<SpecificRecord>(serializerDef);
+ } else if(name.equals(AVRO_RESOLVING_GENERIC_TYPE_NAME)) {
+ return new AvroResolvingGenericSerializer(serializerDef);
} else {
throw new IllegalArgumentException("No known serializer type: "
+ serializerDef.getName());
@@ -0,0 +1,53 @@
+package voldemort.serialization.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+
+import voldemort.serialization.SerializerDefinition;
+
+public class AvroResolvingGenericSerializer<T> extends AvroResolvingSerializer<T> {
+
+ public AvroResolvingGenericSerializer(SerializerDefinition serializerDef) {
+ super(serializerDef);
+ }
+
+ @Override
+ protected DatumWriter<T> createDatumWriter(Schema schema) {
+ return new GenericDatumWriter<T>(schema);
+ }
+
+ @Override
+ protected DatumReader<T> createDatumReader(Schema writerSchema, Schema readerSchema) {
+ return new GenericDatumReader<T>(writerSchema, readerSchema);
+ }
+
+ @Override
+ protected Map<Byte, Schema> loadSchemas(Map<Integer, String> allSchemaInfos) {
+ Map<Byte, Schema> schemaVersions = new HashMap<Byte, Schema>();
+ for(Map.Entry<Integer, String> entry: allSchemaInfos.entrySet()) {
+ // Make sure we can parse the schema
+ Schema schema = Schema.parse(entry.getValue());
+ // Check that the version is less than 256
+ Integer version = entry.getKey();
+ if(version > Byte.MAX_VALUE) {
+ throw new IllegalArgumentException("Cannot have schema version higher than "
+ + Byte.MAX_VALUE);
+ }
+ schemaVersions.put(version.byteValue(), schema);
+ LOG.info("Loaded schema version (" + version + ")");
+ }
+ return schemaVersions;
+ }
+
+ @Override
+ protected Schema getCurrentSchema(SerializerDefinition serializerDef) {
+ String schemaInfo = serializerDef.getCurrentSchemaInfo();
+ return Schema.parse(schemaInfo);
+ }
+}
@@ -0,0 +1,105 @@
+package voldemort.serialization.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import voldemort.serialization.SerializationException;
+import voldemort.serialization.Serializer;
+import voldemort.serialization.SerializerDefinition;
+
+public abstract class AvroResolvingSerializer<T> implements Serializer<T> {
+
+ protected static final Log LOG = LogFactory.getLog(AvroResolvingSerializer.class);
+ private final Map<Byte, Schema> avroSchemaVersions;
+ private final Schema currentSchema;
+ private Byte currentAvroSchemaVersion;
+ private final DatumWriter<T> writer;
+ private final Map<Byte, DatumReader<T>> readers = new HashMap<Byte, DatumReader<T>>();
+ private DecoderFactory decoderFactory = new DecoderFactory();
+
+ public AvroResolvingSerializer(SerializerDefinition serializerDef) {
+ Map<Integer, String> allSchemaInfos = serializerDef.getAllSchemaInfoVersions();
+
+ // Parse the SerializerDefinition and load up the Schemas into a map
+ avroSchemaVersions = loadSchemas(allSchemaInfos);
+
+ // Make sure the "current" schema is loaded
+ currentSchema = getCurrentSchema(serializerDef);
+ for(Map.Entry<Byte, Schema> entry: avroSchemaVersions.entrySet()) {
+ if(entry.getValue().equals(currentSchema)) {
+ currentAvroSchemaVersion = entry.getKey();
+ break;
+ }
+ }
+ if(currentAvroSchemaVersion == null) {
+ throw new IllegalArgumentException("Most recent Schema is not included in the schema-info");
+ }
+
+ // Create a DatumReader for each schema and a DatumWriter for the
+ // current schema
+ for(Map.Entry<Byte, Schema> entry: avroSchemaVersions.entrySet()) {
+ readers.put(entry.getKey(), createDatumReader(entry.getValue(), currentSchema));
+ }
+ writer = createDatumWriter(currentSchema);
+ }
+
+ protected abstract Schema getCurrentSchema(SerializerDefinition serializerDef);
+
+ protected abstract Map<Byte, Schema> loadSchemas(Map<Integer, String> allSchemaInfos);
+
+ protected abstract DatumWriter<T> createDatumWriter(Schema schema);
+
+ protected abstract DatumReader<T> createDatumReader(Schema writerSchema, Schema readerSchema);
+
+ public byte[] toBytes(T object) {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // Write the version as the first byte
+ byte[] versionBytes = ByteBuffer.allocate(1).put(currentAvroSchemaVersion).array();
+ out.write(versionBytes);
+ // Write the serialized Avro object as the remaining bytes
+ Encoder encoder = new BinaryEncoder(out);
+ writer.write(object, encoder);
+ encoder.flush();
+ out.close();
+ // Convert to byte[] and return
+ return out.toByteArray();
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public T toObject(byte[] bytes) {
+ try {
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ // First byte is the version
+ Byte version = bb.get();
+ if(avroSchemaVersions.containsKey(version) == false) {
+ throw new SerializationException("Unknown Schema version (" + version
+ + ") found in serialized value");
+ }
+ // Read the remaining bytes, this is the serialized Avro object
+ byte[] b = new byte[bb.remaining()];
+ bb.get(b);
+
+ // Read the bytes into T object
+ DatumReader<T> datumReader = readers.get(version);
+ Decoder decoder = decoderFactory.createBinaryDecoder(b, null);
+ return datumReader.read(null, decoder);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
@@ -0,0 +1,84 @@
+package voldemort.serialization.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+
+import voldemort.serialization.SerializationException;
+import voldemort.serialization.SerializerDefinition;
+
+public class AvroResolvingSpecificSerializer<T extends SpecificRecord> extends
+ AvroResolvingSerializer<T> {
+
+ public AvroResolvingSpecificSerializer(SerializerDefinition serializerDef) {
+ super(serializerDef);
+ }
+
+ @Override
+ protected DatumWriter<T> createDatumWriter(Schema schema) {
+ return new SpecificDatumWriter<T>(schema);
+ }
+
+ @Override
+ protected DatumReader<T> createDatumReader(Schema writerSchema, Schema readerSchema) {
+ return new SpecificDatumReader<T>(writerSchema, readerSchema);
+ }
+
+ @Override
+ protected Map<Byte, Schema> loadSchemas(Map<Integer, String> allSchemaInfos) {
+ Map<Byte, Schema> schemaVersions = new HashMap<Byte, Schema>();
+ String fullName = null;
+ for(Map.Entry<Integer, String> entry: allSchemaInfos.entrySet()) {
+ Schema schema = Schema.parse(entry.getValue());
+ // Make sure each version of the Schema is for the same class name
+ if(fullName == null) {
+ fullName = schema.getFullName();
+ } else {
+ if(schema.getFullName().equals(fullName) == false) {
+ throw new IllegalArgumentException("Avro schema must all reference the same class");
+ }
+ }
+ // Make sure the Schema is a Record
+ if(schema.getType() != Schema.Type.RECORD) {
+ throw new IllegalArgumentException("Avro schema must be a \"record\" type schema");
+ }
+ Integer version = entry.getKey();
+ if(version > Byte.MAX_VALUE) {
+ throw new IllegalArgumentException("Cannot have schema version higher than "
+ + Byte.MAX_VALUE);
+ }
+ schemaVersions.put(version.byteValue(), schema);
+ LOG.info("Loaded schema version (" + version + ")");
+ }
+ return schemaVersions;
+ }
+
+ @Override
+ protected Schema getCurrentSchema(SerializerDefinition serializerDef) {
+ try {
+ // The current schema is the one extracted from the class
+ String schemaInfo = serializerDef.getCurrentSchemaInfo();
+ Schema schema = Schema.parse(schemaInfo);
+ // Make sure we can instantiate the class, and that it extends
+ // SpecificRecord
+ String fullName = schema.getFullName();
+ Class<T> clazz = (Class<T>) Class.forName(fullName);
+ if(!SpecificRecord.class.isAssignableFrom(clazz))
+ throw new IllegalArgumentException("Class provided should implement SpecificRecord");
+ T inst = clazz.newInstance();
+ return inst.getSchema();
+ } catch(ClassNotFoundException e) {
+ throw new SerializationException(e);
+ } catch(IllegalAccessException e) {
+ throw new SerializationException(e);
+ } catch(InstantiationException e) {
+ throw new SerializationException(e);
+ }
+ }
+}
@@ -118,4 +118,27 @@ public static String getSingleStore322Xml() {
return readString("config/single-store-322.xml");
}
+ public static String getTestSpecificRecordSchemaWithNamespace1() {
+ return readString("config/TestSpecificRecordNS.avsc.v1");
+ }
+
+ public static String getTestSpecificRecordSchema1() {
+ return readString("config/TestSpecificRecord.avsc.v1");
+ }
+
+ public static String getTestSpecificRecordSchemaWithNamespace2() {
+ return readString("config/TestSpecificRecordNS.avsc.v2");
+ }
+
+ public static String getTestSpecificRecordSchema2() {
+ return readString("config/TestSpecificRecord.avsc.v2");
+ }
+
+ public static String getTestSpecificRecordSchemaActual() {
+ return readString("config/TestSpecificRecord.avsc");
+ }
+
+ public static String getAvroStoresXml() {
+ return readString("config/avro-store.xml");
+ }
}
@@ -0,0 +1,10 @@
+{
+ "type": "record",
+ "name": "TestRecord",
+ "namespace": "voldemort",
+ "fields" : [
+ {"name": "f1", "type": "string"},
+ {"name": "f2", "type": "string", "default": "d2"},
+ {"name": "f3", "type": "int", "default": 3}
+ ]
+}
@@ -0,0 +1,7 @@
+{
+ "type": "record",
+ "name": "TestRecord",
+ "fields" : [
+ {"name": "f1", "type": "string"}
+ ]
+}
@@ -0,0 +1,8 @@
+{
+ "type": "record",
+ "name": "TestRecord",
+ "fields" : [
+ {"name": "f1", "type": "string"},
+ {"name": "f2", "type": "string", "default": "d2"}
+ ]
+}
@@ -0,0 +1,8 @@
+{
+ "type": "record",
+ "name": "TestRecord",
+ "namespace": "voldemort",
+ "fields" : [
+ {"name": "f1", "type": "string"}
+ ]
+}
Oops, something went wrong.