Skip to content

Commit

Permalink
Added support for schema evolution for the Avro generic serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Oct 11, 2012
1 parent 79d9163 commit 4b83abd
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 18 deletions.
24 changes: 7 additions & 17 deletions config/single_node_cluster/config/stores.xml
Expand Up @@ -15,8 +15,8 @@
<type>string</type>
</value-serializer>
</store>
<store>
<name>avro-test</name>
<store>
<name>test-evolution</name>
<persistence>bdb</persistence>
<description>Test store</description>
<owners> harry@hogwarts.edu, hermoine@hogwarts.edu </owners>
Expand All @@ -25,23 +25,13 @@
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
<type>avro-generic</type>
<schema-info version="0">"int"</schema-info>
<type>string</type>
</key-serializer>
<value-serializer>
<type>avro-generic</type>
<schema-info version="0">
{
"type": "record",
"name": "myrec",

"fields": [
{ "name": "original", "type": "string" },
{ "name": "new-field", "type": [ "string" , "null"], "default":"null" }

]
}
</schema-info>
<type>avro-generic-versioned</type>
<schema-info version="0">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }]}</schema-info>
<schema-info version="1">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }, { "name": "new-field", "type": "string", "default":"" }]}
</schema-info>
</value-serializer>
</store>
</stores>
20 changes: 19 additions & 1 deletion src/java/voldemort/serialization/DefaultSerializerFactory.java
Expand Up @@ -25,6 +25,7 @@
import voldemort.serialization.avro.AvroGenericSerializer;
import voldemort.serialization.avro.AvroReflectiveSerializer;
import voldemort.serialization.avro.AvroSpecificSerializer;
import voldemort.serialization.avro.versioned.AvroVersionedGenericSerializer;
import voldemort.serialization.json.JsonTypeDefinition;
import voldemort.serialization.json.JsonTypeSerializer;
import voldemort.serialization.protobuf.ProtoBufSerializer;
Expand All @@ -50,6 +51,12 @@ public class DefaultSerializerFactory implements SerializerFactory {
private static final String AVRO_SPECIFIC_TYPE_NAME = "avro-specific";
private static final String AVRO_REFLECTIVE_TYPE_NAME = "avro-reflective";

// New serialization types for avro versioning support
// We cannot change existing serializer classes since
// this will break existing clients while looking for the version byte

private static final String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";

public Serializer<?> getSerializer(SerializerDefinition serializerDef) {
String name = serializerDef.getName();
if(name.equals(JAVA_SERIALIZER_TYPE_NAME)) {
Expand All @@ -72,13 +79,24 @@ public Serializer<?> getSerializer(SerializerDefinition serializerDef) {
} else if(name.equals(PROTO_BUF_TYPE_NAME)) {
return new ProtoBufSerializer<Message>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(THRIFT_TYPE_NAME)) {
return new ThriftSerializer<TBase<?,?>>(serializerDef.getCurrentSchemaInfo());
return new ThriftSerializer<TBase<?, ?>>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_GENERIC_TYPE_NAME)) {
return new AvroGenericSerializer(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_SPECIFIC_TYPE_NAME)) {
return new AvroSpecificSerializer<SpecificRecord>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_REFLECTIVE_TYPE_NAME)) {
return new AvroReflectiveSerializer<Object>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_GENERIC_VERSIONED_TYPE_NAME)) {
if(serializerDef.hasVersion()) {
Map<Integer, String> versions = new HashMap<Integer, String>();
for(Map.Entry<Integer, String> entry: serializerDef.getAllSchemaInfoVersions()
.entrySet())
versions.put(entry.getKey(), entry.getValue());
return new AvroVersionedGenericSerializer(versions);
} else {
return new AvroVersionedGenericSerializer(serializerDef.getCurrentSchemaInfo());
}

} else {
throw new IllegalArgumentException("No known serializer type: "
+ serializerDef.getName());
Expand Down
@@ -0,0 +1,114 @@
/*
* Copyright 2011 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.serialization.avro.versioned;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;

import voldemort.serialization.SerializationException;
import voldemort.serialization.SerializationUtils;
import voldemort.serialization.Serializer;

/**
* Avro serializer that uses the generic representation for Avro data. This
* representation is best for applications which deal with dynamic data, whose
* schemas are not known until runtime.
*
*/
public class AvroVersionedGenericSerializer implements Serializer<Object> {

private final SortedMap<Integer, String> typeDefVersions;
private final Integer newestVersion;

// reader's schema
private final Schema typeDef;

/**
* Constructor accepting the schema definition as a JSON string.
*
* @param schema a serialized JSON object representing a Avro schema.
*/
public AvroVersionedGenericSerializer(String schema) {

this.typeDefVersions = new TreeMap<Integer, String>();
this.typeDefVersions.put(0, schema);
newestVersion = typeDefVersions.lastKey();
typeDef = Schema.parse(typeDefVersions.get(newestVersion));
}

public AvroVersionedGenericSerializer(Map<Integer, String> typeDefVersions) {

this.typeDefVersions = new TreeMap<Integer, String>(typeDefVersions);
newestVersion = this.typeDefVersions.lastKey();
typeDef = Schema.parse(typeDefVersions.get(newestVersion));

}

public byte[] toBytes(Object object) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
Encoder encoder = new BinaryEncoder(output);
GenericDatumWriter<Object> datumWriter = null;

output.write(newestVersion.byteValue());
try {
datumWriter = new GenericDatumWriter<Object>(typeDef);
datumWriter.write(object, encoder);
encoder.flush();
} catch(IOException e) {
throw new SerializationException(e);
} finally {
SerializationUtils.close(output);
}

byte version = newestVersion.byteValue();

return output.toByteArray();
}

public Object toObject(byte[] bytes) {

Integer version = Integer.valueOf(bytes[0]);

Schema typeDefWriter = Schema.parse(typeDefVersions.get(version));

byte[] dataBytes = new byte[bytes.length - 1];
System.arraycopy(bytes, 1, dataBytes, 0, bytes.length - 1);
Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(dataBytes, null);
GenericDatumReader<Object> reader = null;
try {
reader = new GenericDatumReader<Object>(typeDefWriter, typeDef);
// writer's schema
reader.setSchema(typeDefWriter);
// Reader's schema
reader.setExpected(typeDef);
return reader.read(null, decoder);
} catch(IOException e) {
throw new SerializationException(e);
}

}
}

0 comments on commit 4b83abd

Please sign in to comment.