Permalink
Browse files

Added support for schema evolution for the Avro generic serializer

  • Loading branch information...
1 parent 79d9163 commit 4b83abdc0940c7e3b3856be0b54bb4260fd4b54f @abh1nay abh1nay committed Sep 18, 2012
@@ -15,8 +15,8 @@
<type>string</type>
</value-serializer>
</store>
- <store>
- <name>avro-test</name>
+ <store>
+ <name>test-evolution</name>
<persistence>bdb</persistence>
<description>Test store</description>
<owners> harry@hogwarts.edu, hermoine@hogwarts.edu </owners>
@@ -25,23 +25,13 @@
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
- <type>avro-generic</type>
- <schema-info version="0">"int"</schema-info>
+ <type>string</type>
</key-serializer>
<value-serializer>
- <type>avro-generic</type>
- <schema-info version="0">
- {
- "type": "record",
- "name": "myrec",
-
- "fields": [
- { "name": "original", "type": "string" },
- { "name": "new-field", "type": [ "string" , "null"], "default":"null" }
-
- ]
- }
- </schema-info>
+ <type>avro-generic-versioned</type>
+ <schema-info version="0">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }]}</schema-info>
+ <schema-info version="1">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }, { "name": "new-field", "type": "string", "default":"" }]}
+ </schema-info>
</value-serializer>
</store>
</stores>
@@ -25,6 +25,7 @@
import voldemort.serialization.avro.AvroGenericSerializer;
import voldemort.serialization.avro.AvroReflectiveSerializer;
import voldemort.serialization.avro.AvroSpecificSerializer;
+import voldemort.serialization.avro.versioned.AvroVersionedGenericSerializer;
import voldemort.serialization.json.JsonTypeDefinition;
import voldemort.serialization.json.JsonTypeSerializer;
import voldemort.serialization.protobuf.ProtoBufSerializer;
@@ -50,6 +51,12 @@
private static final String AVRO_SPECIFIC_TYPE_NAME = "avro-specific";
private static final String AVRO_REFLECTIVE_TYPE_NAME = "avro-reflective";
+ // New serialization types for avro versioning support
+ // We cannot change existing serializer classes since
+ // this will break existing clients while looking for the version byte
+
+ private static final String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";
+
public Serializer<?> getSerializer(SerializerDefinition serializerDef) {
String name = serializerDef.getName();
if(name.equals(JAVA_SERIALIZER_TYPE_NAME)) {
@@ -72,13 +79,24 @@
} else if(name.equals(PROTO_BUF_TYPE_NAME)) {
return new ProtoBufSerializer<Message>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(THRIFT_TYPE_NAME)) {
- return new ThriftSerializer<TBase<?,?>>(serializerDef.getCurrentSchemaInfo());
+ return new ThriftSerializer<TBase<?, ?>>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_GENERIC_TYPE_NAME)) {
return new AvroGenericSerializer(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_SPECIFIC_TYPE_NAME)) {
return new AvroSpecificSerializer<SpecificRecord>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_REFLECTIVE_TYPE_NAME)) {
return new AvroReflectiveSerializer<Object>(serializerDef.getCurrentSchemaInfo());
+ } else if(name.equals(AVRO_GENERIC_VERSIONED_TYPE_NAME)) {
+ if(serializerDef.hasVersion()) {
+ Map<Integer, String> versions = new HashMap<Integer, String>();
+ for(Map.Entry<Integer, String> entry: serializerDef.getAllSchemaInfoVersions()
+ .entrySet())
+ versions.put(entry.getKey(), entry.getValue());
+ return new AvroVersionedGenericSerializer(versions);
+ } else {
+ return new AvroVersionedGenericSerializer(serializerDef.getCurrentSchemaInfo());
+ }
+
} else {
throw new IllegalArgumentException("No known serializer type: "
+ serializerDef.getName());
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2011 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package voldemort.serialization.avro.versioned;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+
+import voldemort.serialization.SerializationException;
+import voldemort.serialization.SerializationUtils;
+import voldemort.serialization.Serializer;
+
+/**
+ * Avro serializer that uses the generic representation for Avro data. This
+ * representation is best for applications which deal with dynamic data, whose
+ * schemas are not known until runtime.
+ *
+ */
+public class AvroVersionedGenericSerializer implements Serializer<Object> {
+
+ private final SortedMap<Integer, String> typeDefVersions;
+ private final Integer newestVersion;
+
+ // reader's schema
+ private final Schema typeDef;
+
+ /**
+ * Constructor accepting the schema definition as a JSON string.
+ *
+ * @param schema a serialized JSON object representing a Avro schema.
+ */
+ public AvroVersionedGenericSerializer(String schema) {
+
+ this.typeDefVersions = new TreeMap<Integer, String>();
+ this.typeDefVersions.put(0, schema);
+ newestVersion = typeDefVersions.lastKey();
+ typeDef = Schema.parse(typeDefVersions.get(newestVersion));
+ }
+
+ public AvroVersionedGenericSerializer(Map<Integer, String> typeDefVersions) {
+
+ this.typeDefVersions = new TreeMap<Integer, String>(typeDefVersions);
+ newestVersion = this.typeDefVersions.lastKey();
+ typeDef = Schema.parse(typeDefVersions.get(newestVersion));
+
+ }
+
+ public byte[] toBytes(Object object) {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ Encoder encoder = new BinaryEncoder(output);
+ GenericDatumWriter<Object> datumWriter = null;
+
+ output.write(newestVersion.byteValue());
+ try {
+ datumWriter = new GenericDatumWriter<Object>(typeDef);
+ datumWriter.write(object, encoder);
+ encoder.flush();
+ } catch(IOException e) {
+ throw new SerializationException(e);
+ } finally {
+ SerializationUtils.close(output);
+ }
+
+ byte version = newestVersion.byteValue();
+
+ return output.toByteArray();
+ }
+
+ public Object toObject(byte[] bytes) {
+
+ Integer version = Integer.valueOf(bytes[0]);
+
+ Schema typeDefWriter = Schema.parse(typeDefVersions.get(version));
+
+ byte[] dataBytes = new byte[bytes.length - 1];
+ System.arraycopy(bytes, 1, dataBytes, 0, bytes.length - 1);
+ Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(dataBytes, null);
+ GenericDatumReader<Object> reader = null;
+ try {
+ reader = new GenericDatumReader<Object>(typeDefWriter, typeDef);
+ // writer's schema
+ reader.setSchema(typeDefWriter);
+ // Reader's schema
+ reader.setExpected(typeDef);
+ return reader.read(null, decoder);
+ } catch(IOException e) {
+ throw new SerializationException(e);
+ }
+
+ }
+}

0 comments on commit 4b83abd

Please sign in to comment.