Permalink
Browse files

Issue 133: Avro contribution

  • Loading branch information...
1 parent 0dfb70b commit f019092477fc79a904e8869c387eb02daae7d5f8 @atoulme atoulme committed Feb 10, 2010
View
@@ -44,5 +44,9 @@
<classpathentry kind="lib" path="lib/je-3.3.87.jar"/>
<classpathentry kind="lib" path="lib/protobuf-java-2.2.0.jar"/>
<classpathentry kind="lib" path="contrib/ec2-testing/lib/typica.jar"/>
+ <classpathentry kind="lib" path="lib/avro-1.3.0.jar" sourcepath="/Users/antoine/Downloads/avro-1.3.0-sources.jar"/>
+ <classpathentry kind="lib" path="lib/paranamer-2.1.jar"/>
+ <classpathentry kind="lib" path="lib/jackson-mapper-asl-1.4.0.jar"/>
+ <classpathentry kind="lib" path="lib/jackson-core-asl-1.4.0.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
Binary file not shown.
Binary file not shown.
Binary file not shown.
View
Binary file not shown.
@@ -19,6 +19,9 @@
import java.util.HashMap;
import java.util.Map;
+import voldemort.serialization.avro.AvroGenericSerializer;
+import voldemort.serialization.avro.AvroReflectiveSerializer;
+import voldemort.serialization.avro.AvroSpecificSerializer;
import voldemort.serialization.json.JsonTypeDefinition;
import voldemort.serialization.json.JsonTypeSerializer;
import voldemort.serialization.protobuf.ProtoBufSerializer;
@@ -42,6 +45,9 @@
private static final String JSON_SERIALIZER_TYPE_NAME = "json";
private static final String PROTO_BUF_TYPE_NAME = "protobuf";
private static final String THRIFT_TYPE_NAME = "thrift";
+ private static final String AVRO_GENERIC_TYPE_NAME = "avro-generic";
+ private static final String AVRO_SPECIFIC_TYPE_NAME = "avro-specific";
+ private static final String AVRO_REFLECTIVE_TYPE_NAME = "avro-reflective";
public Serializer<?> getSerializer(SerializerDefinition serializerDef) {
String name = serializerDef.getName();
@@ -62,6 +68,12 @@
return new ProtoBufSerializer<Message>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(THRIFT_TYPE_NAME)) {
return new ThriftSerializer<TBase>(serializerDef.getCurrentSchemaInfo());
+ } else if(name.equals(AVRO_GENERIC_TYPE_NAME)) {
+ return new AvroGenericSerializer(serializerDef.getCurrentSchemaInfo());
+ } else if(name.equals(AVRO_SPECIFIC_TYPE_NAME)) {
+ return new AvroSpecificSerializer(serializerDef.getCurrentSchemaInfo());
+ } else if(name.equals(AVRO_REFLECTIVE_TYPE_NAME)) {
+ return new AvroReflectiveSerializer(serializerDef.getCurrentSchemaInfo());
} else {
throw new IllegalArgumentException("No known serializer type: "
+ serializerDef.getName());
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 Antoine Toulme
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package voldemort.serialization;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * @author antoine
+ *
+ * A set of Utils methods that are reused in different serializers.
+ *
+ */
+public class SerializationUtils {
+
+ private static final String ONLY_JAVA_CLIENTS_SUPPORTED = "Only Java clients are supported currently, so the format of the schema-info should be: <schema-info>java=foo.Bar</schema-info> where foo.Bar is the fully qualified name of the message.";
+
+ /**
+ * Extracts the java class name from the schema info
+ *
+ * @param schemaInfo the schema info, a string like: java=java.lang.String
+ * @return the name of the class extracted from the schema info
+ */
+ public static String getJavaClassFromSchemaInfo(String schemaInfo) {
+ if(StringUtils.isEmpty(schemaInfo))
+ throw new IllegalArgumentException("This serializer requires a non-empty schema-info.");
+
+ String[] languagePairs = StringUtils.split(schemaInfo, ',');
+ if(languagePairs.length > 1)
+ throw new IllegalArgumentException(ONLY_JAVA_CLIENTS_SUPPORTED);
+
+ String[] javaPair = StringUtils.split(languagePairs[0], '=');
+ if(javaPair.length != 2 || !javaPair[0].trim().equals("java"))
+ throw new IllegalArgumentException(ONLY_JAVA_CLIENTS_SUPPORTED);
+
+ return javaPair[1].trim();
+ }
+}
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2010 Antoine Toulme
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package voldemort.serialization.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+
+import voldemort.serialization.SerializationException;
+import voldemort.serialization.Serializer;
+
+/**
+ * AvroSerializer uses the avro protocol to serialize objects
+ *
+ */
+public class AvroGenericSerializer implements Serializer<Object> {
+
+ private final Schema typeDef;
+
+ /**
+ * Constructor accepting the schema definition as a JSON string.
+ *
+ * @param schema a serialized JSON object representing a Avro schema.
+ */
+ public AvroGenericSerializer(String schema) {
+ typeDef = Schema.parse(schema);
+ }
+
+ public byte[] toBytes(Object object) {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataFileWriter<Object> writer = null;
+ try {
+ DatumWriter<Object> datumWriter = new GenericDatumWriter<Object>(typeDef);
+
+ writer = new DataFileWriter<Object>(datumWriter).create(typeDef, output);
+ writer.append(object);
+ writer.flush();
+ return output.toByteArray();
+ } catch(IOException e) {
+ throw new SerializationException(e);
+ } finally {
+ if(writer != null) {
+ try {
+ writer.close();
+ } catch(IOException e) {}
+ }
+ }
+ }
+
+ public Object toObject(byte[] bytes) {
+ SeekableByteArrayInput input = new SeekableByteArrayInput(bytes);
+ DataFileReader<Object> reader = null;
+ try {
+ DatumReader<Object> datumReader = new GenericDatumReader<Object>(typeDef);
+ reader = new DataFileReader<Object>(input, datumReader);
+ return reader.next(null);
+ } catch(IOException e) {
+ throw new SerializationException(e);
+ } finally {
+ if(reader != null) {
+ try {
+ reader.close();
+ } catch(IOException e) {}
+ }
+ }
+ }
+
+ /**
+ * A simple implementation of the SeekableInput for a ByteArrayInputStream.
+ *
+ * @author antoine
+ */
+ static class SeekableByteArrayInput extends ByteArrayInputStream implements SeekableInput {
+
+ public SeekableByteArrayInput(byte[] buf) {
+ super(buf);
+ }
+
+ public long length() throws IOException {
+ return buf.length;
+ }
+
+ public void seek(long p) throws IOException {
+ pos = (int) p;
+ }
+
+ public long tell() throws IOException {
+ return pos;
+ }
+
+ }
+
+}
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2010 Antoine Toulme
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package voldemort.serialization.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+
+import voldemort.serialization.SerializationException;
+import voldemort.serialization.SerializationUtils;
+import voldemort.serialization.Serializer;
+import voldemort.serialization.avro.AvroGenericSerializer.SeekableByteArrayInput;
+
+/**
+ * Avro serializer uses the avro protocol to serialize objects of a particular
+ * class type using reflexivity.
+ *
+ * Reflexivity is supported from either the class, the schema or both.
+ *
+ * For now we only support the class case, the code should be stress-tested once
+ * Avro 1.3 is out.
+ */
+public class AvroReflectiveSerializer implements Serializer<Object> {
+
+ private final Class clazz;
+
+ /**
+ * Constructor accepting a Java class name under the convention
+ * java=classname.
+ *
+ * @param schemaInfo information on the schema for the serializer.
+ */
+ public AvroReflectiveSerializer(String schemaInfo) {
+ try {
+ clazz = Class.forName(SerializationUtils.getJavaClassFromSchemaInfo(schemaInfo));
+ } catch(ClassNotFoundException e) {
+ throw new SerializationException(e);
+ }
+ }
+
+ public byte[] toBytes(Object object) {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataFileWriter<Object> writer = null;
+ try {
+ DatumWriter<Object> datumWriter = new ReflectDatumWriter<Object>(clazz);
+
+ writer = new DataFileWriter<Object>(datumWriter).create(ReflectData.get()
+ .getSchema(clazz),
+ output);
+ writer.append(object);
+ writer.flush();
+ return output.toByteArray();
+ } catch(IOException e) {
+ throw new SerializationException(e);
+ } finally {
+ if(writer != null) {
+ try {
+ writer.close();
+ } catch(IOException e) {}
+ }
+ }
+ }
+
+ public Object toObject(byte[] bytes) {
+ SeekableByteArrayInput input = new SeekableByteArrayInput(bytes);
+ DataFileReader<Object> reader = null;
+ try {
+ DatumReader<Object> datumReader = new ReflectDatumReader(clazz);
+ reader = new DataFileReader<Object>(input, datumReader);
+ return reader.next(null);
+ } catch(IOException e) {
+ throw new SerializationException(e);
+ } finally {
+ if(reader != null) {
+ try {
+ reader.close();
+ } catch(IOException e) {}
+ }
+ }
+ }
+}
Oops, something went wrong.

0 comments on commit f019092

Please sign in to comment.