Permalink
Browse files

- Refactored schema check method

- added fix to the verioned serializer to support writing of objects
  created using the old schema
  • Loading branch information...
abh1nay committed Oct 15, 2012
1 parent f2bfbc0 commit 20261b9ac9fe3c982c74057c8b280ab47250043e
@@ -154,9 +154,8 @@ public void configure(JobConf conf) {
this.saveKeys = conf.getBoolean("save.keys", true);
this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false);
- // TODO this breaks but why?
- // keySerializerDefinition = getStoreDef().getKeySerializer();
- // valueSerializerDefinition = getStoreDef().getValueSerializer();
+ keySerializerDefinition = getStoreDef().getKeySerializer();
+ valueSerializerDefinition = getStoreDef().getValueSerializer();
try {
SerializerFactory factory = new DefaultSerializerFactory();
@@ -90,6 +90,14 @@
private final boolean isAvroVersioned;
+ private static final String AVRO_GENERIC_TYPE_NAME = "avro-generic";
+
+ // New serialization types for avro versioning support
+ // We cannot change existing serializer classes since
+ // this will break existing clients while looking for the version byte
+
+ private static final String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";
+
/* Informed stuff */
private final String informedURL = "http://informed.corp.linkedin.com/_post";
private final List<Future> informedResults;
@@ -173,10 +181,13 @@ public void run() throws Exception {
log.info("Working on " + url);
try {
- if(isAvroJob && !isAvroVersioned)
- verifyAvroSchema(url);
- else if(isAvroJob && isAvroVersioned)
- verifyAvroSchemaandVersions(url);
+ /*
+ * if(isAvroJob && !isAvroVersioned) verifyAvroSchema(url); else
+ * if(isAvroJob && isAvroVersioned)
+ * verifyAvroSchemaandVersions(url, isAvroVersioned);
+ */
+ if(isAvroJob)
+ verifyAvroSchemaandVersions(url, isAvroVersioned);
else
verifySchema(url);
@@ -559,193 +570,11 @@ public String getValueSchema() throws IOException {
}
- // Verify if the new avro schema being pushed is the same one as the old one
- // Does not have logic to check for Avro schema evolution yet
- public void verifyAvroSchema(String url) throws Exception {
- // create new n store def with schema from the metadata in the input
- // path
- Schema schema = AvroUtils.getAvroSchemaFromPath(getInputPath());
- int replicationFactor = props.getInt("build.replication.factor", 2);
- int requiredReads = props.getInt("build.required.reads", 1);
- int requiredWrites = props.getInt("build.required.writes", 1);
- String description = props.getString("push.store.description", "");
- String owners = props.getString("push.store.owners", "");
-
- String keySchema = "\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
- + schema.getField(keyField).schema() + "</schema-info>\n\t";
- String valSchema = "\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
- + schema.getField(valueField).schema() + "</schema-info>\n\t";
-
- boolean hasCompression = false;
- if(props.containsKey("build.compress.value"))
- hasCompression = true;
-
- if(hasCompression) {
- valSchema += "\t<compression><type>gzip</type></compression>\n\t";
- }
-
- if(props.containsKey("build.force.schema.key")) {
- keySchema = props.get("build.force.schema.key");
- }
-
- if(props.containsKey("build.force.schema.value")) {
- valSchema = props.get("build.force.schema.value");
- }
-
- String newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName,
- replicationFactor,
- requiredReads,
- requiredWrites,
- props.containsKey("build.preferred.reads") ? props.getInt("build.preferred.reads")
- : null,
- props.containsKey("build.preferred.writes") ? props.getInt("build.preferred.writes")
- : null,
- (props.containsKey("push.force.schema.key")) ? props.getString("push.force.schema.key")
- : keySchema,
- (props.containsKey("push.force.schema.value")) ? props.getString("push.force.schema.value")
- : valSchema,
- description,
- owners);
-
- log.info("Verifying store: \n" + newStoreDefXml.toString());
-
- StoreDefinition newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);
-
- // get store def from cluster
- log.info("Getting store definition from: " + url + " (node id " + this.nodeId + ")");
-
- AdminClient adminClient = new AdminClient(url, new AdminClientConfig());
- try {
- List<StoreDefinition> remoteStoreDefs = adminClient.getRemoteStoreDefList(this.nodeId)
- .getValue();
- boolean foundStore = false;
-
- // go over all store defs and see if one has the same name as the
- // store we're trying
- // to build
- for(StoreDefinition remoteStoreDef: remoteStoreDefs) {
- if(remoteStoreDef.getName().equals(storeName)) {
- // if the store already exists, but doesn't match what we
- // want to push, we need
- // to worry
- if(!remoteStoreDef.equals(newStoreDef)) {
-
- // let's check to see if the key/value serializers are
- // REALLY equal.
- SerializerDefinition localKeySerializerDef = newStoreDef.getKeySerializer();
- SerializerDefinition localValueSerializerDef = newStoreDef.getValueSerializer();
- SerializerDefinition remoteKeySerializerDef = remoteStoreDef.getKeySerializer();
- SerializerDefinition remoteValueSerializerDef = remoteStoreDef.getValueSerializer();
-
- if(remoteKeySerializerDef.getName().equals("avro-generic")
- && remoteValueSerializerDef.getName().equals("avro-generic")
- && remoteKeySerializerDef.getAllSchemaInfoVersions().size() == 1
- && remoteValueSerializerDef.getAllSchemaInfoVersions().size() == 1) {
- Schema remoteKeyDef = Schema.parse(remoteKeySerializerDef.getCurrentSchemaInfo());
- Schema remoteValDef = Schema.parse(remoteValueSerializerDef.getCurrentSchemaInfo());
- Schema localKeyDef = Schema.parse(localKeySerializerDef.getCurrentSchemaInfo());
- Schema localValDef = Schema.parse(localValueSerializerDef.getCurrentSchemaInfo());
-
- if(remoteKeyDef.equals(localKeyDef) && remoteValDef.equals(localValDef)) {
- String compressionPolicy = "";
- if(hasCompression) {
- compressionPolicy = "\n\t\t<compression><type>gzip</type></compression>";
- }
-
- // if the key/value serializers are REALLY equal
- // (even though the strings may not match), then
- // just use the remote stores to GUARANTEE that
- // they
- // match, and try again.
- newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName,
- replicationFactor,
- requiredReads,
- requiredWrites,
- props.containsKey("build.preferred.reads") ? props.getInt("build.preferred.reads")
- : null,
- props.containsKey("build.preferred.writes") ? props.getInt("build.preferred.writes")
- : null,
- "\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
- + remoteKeySerializerDef.getCurrentSchemaInfo()
- + "</schema-info>\n\t",
- "\n\t\t<type>avro-generic</type>\n\t\t<schema-info version=\"0\">"
- + remoteValueSerializerDef.getCurrentSchemaInfo()
- + "</schema-info>"
- + compressionPolicy
- + "\n\t");
-
- newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);
-
- if(!remoteStoreDef.equals(newStoreDef)) {
- // if we still get a fail, then we know that
- // the
- // store defs don't match for reasons OTHER
- // than
- // the key/value serializer
- throw new RuntimeException("Your store schema is identical, but the store definition does not match. Have: "
- + newStoreDef
- + "\nBut expected: "
- + remoteStoreDef);
- }
- } else {
- // if the key/value serializers are not equal
- // (even
- // in java, not just json strings), then fail
- throw new RuntimeException("Your store definition does not match the store definition that is already in the cluster. Tried to resolve identical schemas between local and remote, but failed. Have: "
- + newStoreDef
- + "\nBut expected: "
- + remoteStoreDef);
- }
- }
- }
-
- foundStore = true;
- break;
- }
- }
-
- // if the store doesn't exist yet, create it
- if(!foundStore) {
- // New requirement - Make sure the user had description and
- // owner specified
- if(description.length() == 0) {
- throw new RuntimeException("Description field missing in store definition. "
- + "Please add \"push.store.description\" with a line describing your store");
- }
-
- if(owners.length() == 0) {
- throw new RuntimeException("Owner field missing in store definition. "
- + "Please add \"push.store.owners\" with value being comma-separated list of LinkedIn email ids");
-
- }
-
- log.info("Could not find store " + storeName
- + " on Voldemort. Adding it to all nodes ");
- adminClient.addStore(newStoreDef);
- }
-
- storeDefs = ImmutableList.of(VoldemortUtils.getStoreDef(VoldemortUtils.getStoreDefXml(storeName,
- replicationFactor,
- requiredReads,
- requiredWrites,
- props.containsKey("build.preferred.reads") ? props.getInt("build.preferred.reads")
- : null,
- props.containsKey("build.preferred.writes") ? props.getInt("build.preferred.writes")
- : null,
- keySchema,
- valSchema)));
- cluster = adminClient.getAdminClientCluster();
- } finally {
- adminClient.stop();
- }
- }
-
// Verify if the new avro schema being pushed is the same one as the last
// version present on the server
// supports schema evolution
- // can refactor 2 methods into one?
- public void verifyAvroSchemaandVersions(String url) throws Exception {
+ public void verifyAvroSchemaandVersions(String url, boolean isVersioned) throws Exception {
// create new n store def with schema from the metadata in the input
// path
Schema schema = AvroUtils.getAvroSchemaFromPath(getInputPath());
@@ -755,9 +584,18 @@ public void verifyAvroSchemaandVersions(String url) throws Exception {
String description = props.getString("push.store.description", "");
String owners = props.getString("push.store.owners", "");
- String keySchema = "\n\t\t<type>avro-generic-versioned</type>\n\t\t<schema-info version=\"0\">"
+ String serializerName;
+
+ if(isVersioned)
+ serializerName = AVRO_GENERIC_VERSIONED_TYPE_NAME;
+ else
+ serializerName = AVRO_GENERIC_TYPE_NAME;
+
+ String keySchema = "\n\t\t<type>" + serializerName
+ + "</type>\n\t\t<schema-info version=\"0\">"
+ schema.getField(keyField).schema() + "</schema-info>\n\t";
- String valSchema = "\n\t\t<type>avro-generic-versioned</type>\n\t\t<schema-info version=\"0\">"
+ String valSchema = "\n\t\t<type>" + serializerName
+ + "</type>\n\t\t<schema-info version=\"0\">"
+ schema.getField(valueField).schema() + "</schema-info>\n\t";
boolean hasCompression = false;
@@ -823,8 +661,8 @@ public void verifyAvroSchemaandVersions(String url) throws Exception {
SerializerDefinition remoteKeySerializerDef = remoteStoreDef.getKeySerializer();
SerializerDefinition remoteValueSerializerDef = remoteStoreDef.getValueSerializer();
- if(remoteKeySerializerDef.getName().equals("avro-generic-versioned")
- && remoteValueSerializerDef.getName().equals("avro-generic-versioned")) {
+ if(remoteKeySerializerDef.getName().equals(serializerName)
+ && remoteValueSerializerDef.getName().equals(serializerName)) {
Schema remoteKeyDef = Schema.parse(remoteKeySerializerDef.getCurrentSchemaInfo());
Schema remoteValDef = Schema.parse(remoteValueSerializerDef.getCurrentSchemaInfo());
@@ -860,7 +698,9 @@ public void verifyAvroSchemaandVersions(String url) throws Exception {
}
} else {
- keySerializerStr = "\n\t\t<type>avro-generic-versioned</type>\n\t\t<schema-info version=\"0\">"
+ keySerializerStr = "\n\t\t<type>"
+ + serializerName
+ + "</type>\n\t\t<schema-info version=\"0\">"
+ remoteKeySerializerDef.getCurrentSchemaInfo()
+ "</schema-info>\n\t";
}
@@ -884,10 +724,11 @@ public void verifyAvroSchemaandVersions(String url) throws Exception {
} else {
- valueSerializerStr = "\n\t\t<type>avro-generic-versioned</type>\n\t\t<schema-info version=\"0\">"
+ valueSerializerStr = "\n\t\t<type>"
+ + serializerName
+ + "</type>\n\t\t<schema-info version=\"0\">"
+ remoteValueSerializerDef.getCurrentSchemaInfo()
- + "</schema-info>"
- + compressionPolicy
+ + "</schema-info>" + compressionPolicy
+ "\n\t";
}
Oops, something went wrong.

0 comments on commit 20261b9

Please sign in to comment.