Skip to content

Commit

Permalink
Added check for schema backwards compatibility for Avro
Browse files Browse the repository at this point in the history
in Admin Client tool and server startup
  • Loading branch information
abh1nay committed Oct 11, 2012
1 parent 9656114 commit 46f42fe
Show file tree
Hide file tree
Showing 6 changed files with 988 additions and 38 deletions.
2 changes: 1 addition & 1 deletion META-INF/MANIFEST.MF
@@ -1,6 +1,6 @@
Manifest-Version: 1.0
Ant-Version: Apache Ant 1.7.1
Created-By: 20.2-b06 (Sun Microsystems Inc.)
Created-By: 20.1-b02 (Sun Microsystems Inc.)
Implementation-Title: Voldemort
Implementation-Version: 0.96
Implementation-Vendor: LinkedIn
Expand Down
75 changes: 38 additions & 37 deletions config/single_node_cluster/config/stores.xml
@@ -1,37 +1,38 @@
<stores>
<store>
<name>test</name>
<persistence>bdb</persistence>
<description>Test store</description>
<owners> harry@hogwarts.edu, hermoine@hogwarts.edu </owners>
<routing>client</routing>
<replication-factor>1</replication-factor>
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
<type>string</type>
</key-serializer>
<value-serializer>
<type>string</type>
</value-serializer>
</store>
<store>
<name>test-evolution</name>
<persistence>bdb</persistence>
<description>Test store</description>
<owners> harry@hogwarts.edu, hermoine@hogwarts.edu </owners>
<routing>client</routing>
<replication-factor>1</replication-factor>
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
<type>string</type>
</key-serializer>
<value-serializer>
<type>avro-generic-versioned</type>
<schema-info version="0">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }]}</schema-info>
<schema-info version="1">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }, { "name": "new-field", "type": "string", "default":"" }]}
</schema-info>
</value-serializer>
</store>
</stores>
<stores>
<store>
<name>test</name>
<persistence>bdb</persistence>
<description>Test store</description>
<owners>harry@hogwarts.edu, hermoine@hogwarts.edu</owners>
<routing-strategy>consistent-routing</routing-strategy>
<routing>client</routing>
<replication-factor>1</replication-factor>
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
<type>string</type>
</key-serializer>
<value-serializer>
<type>string</type>
</value-serializer>
</store>
<store>
<name>test-evolution</name>
<persistence>bdb</persistence>
<description>Test store</description>
<owners>harry@hogwarts.edu, hermoine@hogwarts.edu</owners>
<routing-strategy>consistent-routing</routing-strategy>
<routing>client</routing>
<replication-factor>1</replication-factor>
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
<type>string</type>
</key-serializer>
<value-serializer>
<type>avro-generic-versioned</type>
<schema-info version="0">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }]}</schema-info>
<schema-info version="1">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }, { "name": "new-field", "type": "string", "default":"" }]}</schema-info>
</value-serializer>
</store>
</stores>
79 changes: 79 additions & 0 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -49,7 +49,9 @@
import joptsimple.OptionParser;
import joptsimple.OptionSet;

import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Level;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
Expand All @@ -63,6 +65,8 @@
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
import voldemort.serialization.StringSerializer;
import voldemort.serialization.avro.versioned.SchemaEvolutionValidator;
import voldemort.serialization.avro.versioned.SchemaEvolutionValidator.Message;
import voldemort.server.rebalance.RebalancerState;
import voldemort.store.StoreDefinition;
import voldemort.store.compress.CompressionStrategy;
Expand Down Expand Up @@ -480,6 +484,25 @@ public static void main(String[] args) throws Exception {
throw new VoldemortException("Stores definition xml file path incorrect");
StoreDefinitionsMapper mapper = new StoreDefinitionsMapper();
List<StoreDefinition> storeDefs = mapper.readStoreList(new File(metadataValue));

String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";

for(StoreDefinition storeDef: storeDefs) {
SerializerDefinition keySerDef = storeDef.getKeySerializer();
SerializerDefinition valueSerDef = storeDef.getValueSerializer();

if(keySerDef.getName().equals(AVRO_GENERIC_VERSIONED_TYPE_NAME)) {

checkSchemaCompatibility(keySerDef);

}

if(valueSerDef.getName().equals(AVRO_GENERIC_VERSIONED_TYPE_NAME)) {

checkSchemaCompatibility(valueSerDef);

}
}
executeSetMetadata(nodeId,
adminClient,
MetadataStore.STORES_KEY,
Expand Down Expand Up @@ -635,6 +658,62 @@ private static void synchronizeMetadataVersion(AdminClient adminClient, int base
e.printStackTrace();
System.exit(-1);
}

}

public static void checkSchemaCompatibility(SerializerDefinition serDef) throws Exception {

Map<Integer, String> schemaVersions = serDef.getAllSchemaInfoVersions();

Iterator schemaIterator = schemaVersions.entrySet().iterator();

Schema firstSchema = null;
Schema secondSchema = null;

String firstSchemaStr;
String secondSchemaStr;

if(!schemaIterator.hasNext())
throw new VoldemortException("No schema specified");

Map.Entry schemaPair = (Map.Entry) schemaIterator.next();

firstSchemaStr = (String) schemaPair.getValue();

while(schemaIterator.hasNext()) {

schemaPair = (Map.Entry) schemaIterator.next();

secondSchemaStr = (String) schemaPair.getValue();
Schema oldSchema = Schema.parse(firstSchemaStr);
Schema newSchema = Schema.parse(secondSchemaStr);
List<Message> messages = SchemaEvolutionValidator.checkBackwardCompatability(oldSchema,
newSchema,
oldSchema.getName());
Level maxLevel = Level.ALL;
for(Message message: messages) {
System.out.println(message.getLevel() + ": " + message.getMessage());
if(message.getLevel().isGreaterOrEqual(maxLevel)) {
maxLevel = message.getLevel();
}
}

if(maxLevel.isGreaterOrEqual(Level.ERROR)) {
System.out.println(Level.ERROR
+ ": The schema is not backward compatible. New clients will not be able to read existing data.");
throw new VoldemortException(" The schema is not backward compatible. New clients will not be able to read existing data.");
} else if(maxLevel.isGreaterOrEqual(Level.WARN)) {
System.out.println(Level.WARN
+ ": The schema is partially backward compatible, but old clients will not be able to read data serialized in the new format.");
throw new VoldemortException("The schema is partially backward compatible, but old clients will not be able to read data serialized in the new format.");
} else {
System.out.println(Level.INFO
+ ": The schema is backward compatible. Old and new clients will be able to read records serialized by one another.");
}

firstSchemaStr = secondSchemaStr;

}
}

private static void executeRollback(Integer nodeId,
Expand Down

0 comments on commit 46f42fe

Please sign in to comment.