Permalink
Browse files

Updated Avro to 1.4.0 + Fixed all the serializers

  • Loading branch information...
1 parent 3f93110 commit 7f8473de4f714e8b111189ebf010b81e42b07c34 @rsumbaly rsumbaly committed Mar 2, 2011
View
@@ -47,10 +47,10 @@
<classpathentry kind="lib" path="lib/paranamer-2.1.jar"/>
<classpathentry kind="lib" path="lib/jackson-mapper-asl-1.4.0.jar"/>
<classpathentry kind="lib" path="lib/jackson-core-asl-1.4.0.jar"/>
- <classpathentry kind="lib" path="lib/avro-modified-jdk5-1.3.0.jar"/>
<classpathentry kind="lib" path="contrib/hadoop/lib/pig-0.7.1-dev-core.jar"/>
<classpathentry kind="lib" path="contrib/krati/lib/krati-0.3.6.jar"/>
<classpathentry kind="lib" path="lib/jna.jar"/>
- <classpathentry kind="lib" path="lib/mockito-all-1.8.5.jar" />
- <classpathentry kind="output" path="classes"/>
+ <classpathentry kind="lib" path="lib/mockito-all-1.8.5.jar"/>
+ <classpathentry kind="lib" path="lib/avro-1.4.0.jar"/>
+ <classpathentry kind="output" path="classes"/>
</classpath>
View
Binary file not shown.
Binary file not shown.
@@ -1,5 +1,5 @@
/*
- * Copyright 2010 Antoine Toulme
+ * Copyright 2011 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -13,15 +13,19 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
+
package voldemort.serialization;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
-/**
- * A set of Utils methods that are reused in different serializers.
- */
public class SerializationUtils {
+ private static final Logger logger = Logger.getLogger(SerializationUtils.class);
+
private static final String ONLY_JAVA_CLIENTS_SUPPORTED = "Only Java clients are supported currently, so the format of the schema-info should be: <schema-info>java=foo.Bar</schema-info> where foo.Bar is the fully qualified name of the message.";
/**
@@ -44,4 +48,15 @@ public static String getJavaClassFromSchemaInfo(String schemaInfo) {
return javaPair[1].trim();
}
+
+ public static void close(ByteArrayOutputStream stream) {
+ if(stream != null) {
+ try {
+ stream.close();
+ } catch(IOException e) {
+ logger.error("Failed to close stream", e);
+ }
+ }
+ }
+
}
@@ -1,5 +1,5 @@
/*
- * Copyright 2010 Antoine Toulme
+ * Copyright 2011 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -15,27 +15,29 @@
*/
package voldemort.serialization.avro;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
import voldemort.serialization.SerializationException;
+import voldemort.serialization.SerializationUtils;
import voldemort.serialization.Serializer;
/**
* Avro serializer that uses the generic representation for Avro data. This
* representation is best for applications which deal with dynamic data, whose
* schemas are not known until runtime.
*
- * @see http://hadoop.apache.org/avro/docs/current/api/java/org/apache/avro/generic/package-summary.html
+ * @see http
+ * ://hadoop.apache.org/avro/docs/current/api/java/org/apache/avro/generic
+ * /package-summary.html
*/
public class AvroGenericSerializer implements Serializer<Object> {
@@ -52,31 +54,28 @@ public AvroGenericSerializer(String schema) {
public byte[] toBytes(Object object) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
- DataFileWriter<Object> writer = null;
+ Encoder encoder = new BinaryEncoder(output);
+ GenericDatumWriter<Object> datumWriter = null;
try {
- DatumWriter<Object> datumWriter = new GenericDatumWriter<Object>(typeDef);
-
- writer = new DataFileWriter<Object>(datumWriter).create(typeDef, output);
- writer.append(object);
+ datumWriter = new GenericDatumWriter<Object>(typeDef);
+ datumWriter.write(object, encoder);
+ encoder.flush();
} catch(IOException e) {
throw new SerializationException(e);
} finally {
- AvroUtils.close(writer);
+ SerializationUtils.close(output);
}
return output.toByteArray();
}
public Object toObject(byte[] bytes) {
- ByteArrayInputStream input = new ByteArrayInputStream(bytes);
- DataFileStream<Object> reader = null;
+ Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+ GenericDatumReader<Object> reader = null;
try {
- DatumReader<Object> datumReader = new GenericDatumReader<Object>(typeDef);
- reader = new DataFileStream<Object>(input, datumReader);
- return reader.next();
+ reader = new GenericDatumReader<Object>(typeDef);
+ return reader.read(null, decoder);
} catch(IOException e) {
throw new SerializationException(e);
- } finally {
- AvroUtils.close(reader);
}
}
}
@@ -1,5 +1,5 @@
/*
- * Copyright 2010 Antoine Toulme
+ * Copyright 2011 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -15,15 +15,13 @@
*/
package voldemort.serialization.avro;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
@@ -49,7 +47,9 @@
* <strong>For now we only support the class case.</strong>
* </p>
*
- * @see http://hadoop.apache.org/avro/docs/current/api/java/org/apache/avro/reflect/package-summary.html
+ * @see http
+ * ://hadoop.apache.org/avro/docs/current/api/java/org/apache/avro/reflect
+ * /package-summary.html
*/
public class AvroReflectiveSerializer<T> implements Serializer<T> {
@@ -59,7 +59,7 @@
* Constructor accepting a Java class name under the convention
* java=classname.
*
- * @param schemaInfo information on the schema for the serializer.
+ * @param schemaInfo information on the schema for the serializer
*/
@SuppressWarnings("unchecked")
public AvroReflectiveSerializer(String schemaInfo) {
@@ -72,32 +72,28 @@ public AvroReflectiveSerializer(String schemaInfo) {
public byte[] toBytes(T object) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
- DataFileWriter<T> writer = null;
+ Encoder encoder = new BinaryEncoder(output);
+ ReflectDatumWriter<T> datumWriter = null;
try {
- DatumWriter<T> datumWriter = new ReflectDatumWriter<T>(clazz);
-
- writer = new DataFileWriter<T>(datumWriter).create(ReflectData.get().getSchema(clazz),
- output);
- writer.append(object);
+ datumWriter = new ReflectDatumWriter<T>(clazz);
+ datumWriter.write(object, encoder);
+ encoder.flush();
} catch(IOException e) {
throw new SerializationException(e);
} finally {
- AvroUtils.close(writer);
+ SerializationUtils.close(output);
}
return output.toByteArray();
}
public T toObject(byte[] bytes) {
- ByteArrayInputStream input = new ByteArrayInputStream(bytes);
- DataFileStream<T> reader = null;
+ Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+ ReflectDatumReader<T> reader = null;
try {
- DatumReader<T> datumReader = new ReflectDatumReader<T>(clazz);
- reader = new DataFileStream<T>(input, datumReader);
- return reader.next();
+ reader = new ReflectDatumReader<T>(clazz);
+ return reader.read(null, decoder);
} catch(IOException e) {
throw new SerializationException(e);
- } finally {
- AvroUtils.close(reader);
}
}
}
@@ -1,5 +1,5 @@
/*
- * Copyright 2010 Antoine Toulme
+ * Copyright 2011 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -15,15 +15,13 @@
*/
package voldemort.serialization.avro;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
@@ -45,7 +43,9 @@
* recommended.
* </p>
*
- * @see http://hadoop.apache.org/avro/docs/current/api/java/org/apache/avro/generic/package-summary.html
+ * @see http
+ * ://avro.apache.org/docs/1.4.1/api/java/org/apache/avro/specific/package
+ * -summary.html
*/
public class AvroSpecificSerializer<T extends SpecificRecord> implements Serializer<T> {
@@ -70,32 +70,28 @@ public AvroSpecificSerializer(String schemaInfo) {
public byte[] toBytes(T object) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
- DataFileWriter<T> writer = null;
+ Encoder encoder = new BinaryEncoder(output);
+ SpecificDatumWriter<T> datumWriter = null;
try {
- DatumWriter<T> datumWriter = new SpecificDatumWriter<T>(clazz);
-
- writer = new DataFileWriter<T>(datumWriter).create(SpecificData.get().getSchema(clazz),
- output);
- writer.append(object);
+ datumWriter = new SpecificDatumWriter<T>(clazz);
+ datumWriter.write(object, encoder);
+ encoder.flush();
} catch(IOException e) {
throw new SerializationException(e);
} finally {
- AvroUtils.close(writer);
+ SerializationUtils.close(output);
}
return output.toByteArray();
}
public T toObject(byte[] bytes) {
- ByteArrayInputStream input = new ByteArrayInputStream(bytes);
- DataFileStream<T> reader = null;
+ Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null);
+ SpecificDatumReader<T> reader = null;
try {
- DatumReader<T> datumReader = new SpecificDatumReader<T>(clazz);
- reader = new DataFileStream<T>(input, datumReader);
- return reader.next();
+ reader = new SpecificDatumReader<T>(clazz);
+ return reader.read(null, decoder);
} catch(IOException e) {
throw new SerializationException(e);
- } finally {
- AvroUtils.close(reader);
}
}
}
@@ -1,50 +0,0 @@
-/*
- * Copyright 2008-2009 LinkedIn, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package voldemort.serialization.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.log4j.Logger;
-
-final class AvroUtils {
-
- private static final Logger logger = Logger.getLogger(AvroUtils.class);
-
- private AvroUtils() {}
-
- static void close(DataFileStream<?> stream) {
- if(stream != null) {
- try {
- stream.close();
- } catch(IOException e) {
- logger.error("Failed to close stream", e);
- }
- }
- }
-
- static void close(DataFileWriter<?> stream) {
- if(stream != null) {
- try {
- stream.close();
- } catch(IOException e) {
- logger.error("Failed to close stream", e);
- }
- }
- }
-}
Oops, something went wrong.

0 comments on commit 7f8473d

Please sign in to comment.