Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: voldemort/voldemort
base: master
...
head fork: voldemort/voldemort
compare: release-1.1.5
Checking mergeability… Don’t worry, you can still create the pull request.
  • 4 commits
  • 17 files changed
  • 0 commit comments
  • 2 contributors
View
6 .classpath
@@ -39,7 +39,6 @@
<classpathentry kind="lib" path="lib/commons-pool-1.5.2.jar"/>
<classpathentry kind="lib" path="lib/protobuf-java-2.3.0.jar"/>
<classpathentry kind="lib" path="contrib/ec2-testing/lib/typica.jar"/>
- <classpathentry kind="lib" path="lib/google-collect-1.0.jar"/>
<classpathentry kind="lib" path="lib/je-4.0.92.jar"/>
<classpathentry kind="lib" path="lib/paranamer-2.1.jar"/>
<classpathentry kind="lib" path="lib/jackson-mapper-asl-1.4.0.jar"/>
@@ -51,8 +50,9 @@
<classpathentry kind="lib" path="lib/libthrift-0.5.0.jar"/>
<classpathentry kind="lib" path="lib/compress-lzf-0.9.1.jar"/>
<classpathentry kind="lib" path="lib/snappy-0.2.jar"/>
- <classpathentry kind="lib" path="lib/httpclient-4.1.2.jar" />
- <classpathentry kind="lib" path="lib/httpcore-4.1.2.jar" />
+ <classpathentry kind="lib" path="lib/httpclient-4.1.2.jar"/>
+ <classpathentry kind="lib" path="lib/httpcore-4.1.2.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="lib" path="lib/google-collect-1.0-rc2.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
4 META-INF/MANIFEST.MF
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Ant-Version: Apache Ant 1.7.1
-Created-By: 20.8-b03-424 (Apple Inc.)
+Created-By: 20.1-b02 (Sun Microsystems Inc.)
Implementation-Title: Voldemort
-Implementation-Version: 0.90.1
+Implementation-Version: 1.1.4
Implementation-Vendor: LinkedIn
View
2  build.properties
@@ -37,4 +37,4 @@ tomcat.manager.password=tomcat
tomcat.context=/voldemort
## Release
-curr.release=0.96
+curr.release=1.1.5
View
BIN  lib/google-collect-1.0-rc2.jar
Binary file not shown
View
BIN  lib/google-collect-1.0.jar
Binary file not shown
View
170 release_notes.txt
@@ -1,4 +1,172 @@
-Release 0.90.1 on 10/10/2011
+Release 1.1.5 on 12/10/2012
+
+This is a special release based off of release 0.96.li7. Its purpose is to
+isolate the client-registry branch from the other changes made since 0.96.li7
+and to make the same available to all the Voldemort clients within LinkedIn.
+
+Changes made since 0.96.li7
+* Made DefaultStoreClient as default
+* Removed logging stack trace in LazyStoreClient
+* Added the Avro schema evolution mechanism (from 1.0.0)
+* Added store tracking autobootstrapper mechanism (from 1.0.0)
+
+Release 1.1.4 on 11/29/2012
+
+Changes made since release 1.1.3
+* Added BDB parameters to control LRU behavior in cache & proactive cleaner migration
+* Added a mlock fix for pinning the indexes of RO stores in memory
+
+
+Release 1.1.3 on 11/28/2012
+
+Changes made since release 1.1.2
+* Fixed a bug in the build and push job, specifically the Mapper
+ that caused collisions
+* Added retry mechanism with the HDFS fetcher for hftp
+
+
+Release 1.1.2 on 10/31/2012
+
+Changes made since release 1.1.1
+* Reverted a change to voldemort.versioning.Versioned.getVersion() so
+ that a Version is returned as our clients expect.
+
+
+Release 1.1.1 on 10/30/2012
+
+Changes made since release 1.1.0
+* Fixed connection leak in ClientRequestExecutorFactory
+* Changed client to default to DefaultStoreClient
+
+
+Release 1.1.0 on 10/19/2012
+
+Changes made since release 1.0.0
+
+IMPORTANT NOTE : This release has significant changes to the BDB storage layer.
+Users are required to read the bin/PREUPGRADE_FOR_1_1_X_README file
+thoroughly before attempting to upgrade to 1.1.0. The necessary data
+conversion will be done through bin/voldemort-convert-bdb.sh
+
+* Upgrading to JE 4.1.17
+* New data format that handles conflicting updates in Voldemort more
+ efficiently
+* Move data off heap and only use it for Index
+* When scanning, evict whatever you bring in right away.
+* Partition based scan api to dramatically speed up rebalancing & restore
+ using Partition aware scans (you exactly scan whatever you want to fetch)
+* Flexible knobs to control scheduling of DataCleanupJob
+
+
+Release 1.0.0 on 10/17/2012
+
+NOTE: The large version number jump from 0.96 to 1.0.0 is to
+standardize on a version number of the sort MAJOR.MINOR.PATCH. This
+change is part of our effort to treat internal and open source
+releases in a much more similar manner. Along these lines, release
+notes for internal releases (like this one) are committed on the
+master branch. We hope this improves transparency as we work towards
+the next open source release.
+
+Changes made since release 0.96
+
+* Auto bootstrapping: ZenStoreClient and System stores
+ * Added server side system stores for managing metadata
+ * ZenStoreClient interacts with system stores
+ * ZenStoreClient auto bootstraps whenever cluster.xml or stores.xml changes
+ * Added a new routing strategy to route to all with local preference
+ * Added a client-registry for publishing client info and config values
+ * Updated LazyClientStore to try to bootstrap during Init
+ * Modified Failure Detector to work on a shared cluster object reference
+* Avro: schema evolution and read only support
+ * Added new Avro serializer type that supports schema evolution
+ * Added Avro support to read only stores
+ * Added LinkedIn build-and-push Azkaban jobs to build read only stores to contrib
+ * Added a schema backwards compatibility check to VoldemortAdminTool and on server startup to prevent mishaps due to bad schemas
+* Non-blocking IO: Fixed design flaw that blocked in the face of slow servers
+ * Asynchronous operations no longer do a blocking checkout to get a SocketDestination
+ * Added additional stats collection for better visibility into request queues
+* Minor features
+ * Enhanced VoldemortAdminTool to update store metadata version
+ * Enhanced VoldemortAdminTool to work with the new system stores
+ * Added feature to voldemort-shell.sh to dump byte & object arrays
+ * Added a SlowStorageEngine for testing degraded mode performance
+ * Added mechanism to isolate BDB cache usage among stores
+ * Enhanced debug logging (for traffic analysis).
+ * Python client bug fixes (from pull request)
+ * Improved messages in request tracing
+ * Cleaned up help/usage messages within the client shell
+ * Added server config to control socket backlog
+ * Added "--query-keys" option to query multiple keys of multiple stores from specific node
+ * Added control to DataCleanupJob Frequency
+ * Unified jmxid as the factory across the board
+* Tools
+ * bin/generate_cluster_xml.py to generate cluster.xml
+ * bin/repeat-junit.sh and bin/repeat-junit-test.sh to repeatedly run tests
+* Bug fixes
+ * Changed getall return behavior to comply with javadoc
+ * Fixed a bug that caused unnecessary serial requests in getall
+ * HFTP performance issue bug fix (fix in byte buffer and copy process)
+ * Fixed a bug that prevented "--fetch-keys" and "--fetch-entries" in admin tool from showing multiple store results
+ * Fixed problem in sample config that prevented the server from starting
+ * Fixed some intermittent BindException failures across many unit tests
+ * Fixed some intermittent rebalance test failures
+ * Wrapped long running tests with timeouts
+
+
+Release 0.96 on 09/05/2012
+
+Changes made since 0.90.1
+
+ * Monitoring:
+ * Append cluster name to various mbeans for better stats display
+ * Implement average throughput in bytes
+ * Add BDB JE stats
+ * Add 95th and 99th latency tracking
+ * Add stats for ClientRequestExecutorPool
+ * Add error/exception count and max getall count
+ * BDB+ Data cleanup Monitoring changes
+ * Rebalancing:
+ * Donor-based rebalancing and post cleanup (see https://github.com/voldemort/voldemort/wiki/Voldemort-Donor-Based-Rebalancing for more details)
+ * Rebalancing integration testing framework (under test/integration/voldemort/rebalance/)
+ * Generate multiple cluster.xml files based on the number specified when running the tool and choose the cluster with the smallest std dev as the final-cluster.xml
+ * Add status output to log for updateEntries (used by rebalancing)
+ * Read-only pipeline:
+ * Add hftp and webhdfs support
+ * Read-only bandwidth dynamic throttler
+ * Add minimum throttle limit per store
+ * Add rollback capability to the Admin tool
+ * Voldemort-backed stack and index linked list impl
+ * Change client requests to not process responses after timeout
+ * Modified client request executor timeout to not factor in the NIO selector timeout
+ * Added BDB native backup capabalities, checksum verification and incremental backups (well tested, but not yet used in production)
+ * Add additional client-side tracing for debugging and consistency analytics
+ * Clean up logging during exception at client-side
+ * Security exception handling
+ * Add snappy to CompressionStrategyFactory
+ * Add configurable option to interrupt service being unscheduled
+ * Add logging support for tracking ScanPermit owners (for debugging purposes)
+ * Add a jmx terminate operation for async jobs
+ * Add zone option for restore from replicas
+ * Changing the enable.nio.connector to true by default
+ * Better disconnection handling for python client
+ * Split junit tests into a long and a short test suites
+ * Add separate timeouts for different operations (put, get, delete, and getAll
+ * Allow getAll to return partial results upon timeout
+ * Improved cluster generation tool
+ * Added log4j properties folder for junit test
+ * Bug fixes:
+ * httpclient 3.x to httpclient 4.x
+ * Fix NPE in listing read-only store versions
+ * Fixed 2 failure detector bugs during rebalancing or node swapping
+ * Fixed a thread leak issue in StreamingSlopPusher
+ * Fixed a NIO bug
+ * Fixed a bug in TimeBasedInconsistency resolver.
+ * Fixed race condition in client socket close
+ * Fixed a potential deadlock issue in ScanPermitWrapper
+ * Fixed a bug where a read returns null (on rare occations) when being concurrent with a write
+ * Fixed a performance bug in HdfsFetcher when hftp is used
+
Changes made since 0.90
View
41 src/java/voldemort/VoldemortAdminTool.java
@@ -63,6 +63,7 @@
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
import voldemort.serialization.StringSerializer;
+import voldemort.serialization.avro.versioned.SchemaEvolutionValidator;
import voldemort.server.rebalance.RebalancerState;
import voldemort.store.StoreDefinition;
import voldemort.store.compress.CompressionStrategy;
@@ -480,10 +481,47 @@ public static void main(String[] args) throws Exception {
throw new VoldemortException("Stores definition xml file path incorrect");
StoreDefinitionsMapper mapper = new StoreDefinitionsMapper();
List<StoreDefinition> storeDefs = mapper.readStoreList(new File(metadataValue));
+
+ String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";
+
+ for(StoreDefinition storeDef: storeDefs) {
+ SerializerDefinition keySerDef = storeDef.getKeySerializer();
+ SerializerDefinition valueSerDef = storeDef.getValueSerializer();
+
+ if(keySerDef.getName().equals(AVRO_GENERIC_VERSIONED_TYPE_NAME)) {
+
+ SchemaEvolutionValidator.checkSchemaCompatibility(keySerDef);
+
+ }
+
+ if(valueSerDef.getName().equals(AVRO_GENERIC_VERSIONED_TYPE_NAME)) {
+
+ SchemaEvolutionValidator.checkSchemaCompatibility(valueSerDef);
+
+ }
+ }
executeSetMetadata(nodeId,
adminClient,
MetadataStore.STORES_KEY,
mapper.writeStoreList(storeDefs));
+
+ /*
+ * This is a hack to update the metadata version of the
+ * requested stores. TODO: Add the functionality to
+ * Admin Client and Server to update one individual
+ * store definition.
+ */
+ if(storeNames != null) {
+ System.out.println("Updating metadata version for the following stores: "
+ + storeNames);
+ try {
+ for(String name: storeNames) {
+ adminClient.updateMetadataversion(name);
+ }
+ } catch(Exception e) {
+ System.err.println("Error while updating metadata version for the specified store.");
+ }
+ }
} else if(metadataKey.compareTo(MetadataStore.REBALANCING_STEAL_INFO) == 0) {
if(!Utils.isReadableFile(metadataValue))
throw new VoldemortException("Rebalancing steal info file path incorrect");
@@ -635,6 +673,7 @@ private static void synchronizeMetadataVersion(AdminClient adminClient, int base
e.printStackTrace();
System.exit(-1);
}
+
}
private static void executeRollback(Integer nodeId,
@@ -1604,4 +1643,4 @@ public void writeTo(BufferedWriter out) throws IOException {
});
}
}
-}
+}
View
4 src/java/voldemort/client/ClientConfig.java
@@ -68,8 +68,8 @@
private volatile boolean enablePipelineRoutedStore = true;
private volatile int clientZoneId = Zone.DEFAULT_ZONE_ID;
- // Flag to control which store client to use. Default = Enhanced
- private volatile boolean useDefaultClient = false;
+ // Flag to control which store client to use. Default = old client
+ private volatile boolean useDefaultClient = true;
private volatile String failureDetectorImplementation = FailureDetectorConfig.DEFAULT_IMPLEMENTATION_CLASS_NAME;
private volatile long failureDetectorBannagePeriod = FailureDetectorConfig.DEFAULT_BANNAGE_PERIOD;
View
1  src/java/voldemort/client/LazyStoreClient.java
@@ -61,7 +61,6 @@ public LazyStoreClient(Callable<StoreClient<K, V>> storeClientThunk, boolean ins
storeClient = initStoreClient();
} catch(Exception e) {
storeClient = null;
- e.printStackTrace();
logger.info("Could not bootstrap right away. Trying on the next call ... ");
}
}
View
5 src/java/voldemort/client/ZenStoreClient.java
@@ -170,7 +170,8 @@ public Void call() throws Exception {
};
asyncMetadataManager = new AsyncMetadataVersionManager(this.sysRepository,
- rebootstrapCallback);
+ rebootstrapCallback,
+ this.storeName);
// schedule the job to run every 'checkInterval' period, starting
// now
@@ -272,4 +273,4 @@ public String generateClientId(ClientInfo clientInfo) {
return context.toString();
}
-}
+}
View
69 src/java/voldemort/client/scheduler/AsyncMetadataVersionManager.java
@@ -44,37 +44,56 @@
public class AsyncMetadataVersionManager implements Runnable {
public static final String CLUSTER_VERSION_KEY = "cluster.xml";
+ public static String STORES_VERSION_KEY = "stores.xml";
public static final String VERSIONS_METADATA_STORE = "metadata-versions";
private final Logger logger = Logger.getLogger(this.getClass());
private Long currentClusterVersion;
+ private Long currentStoreVersion;
private final Callable<Void> storeClientThunk;
private final SystemStoreRepository systemStoreRepository;
public boolean isActive = false;
public AsyncMetadataVersionManager(SystemStoreRepository sysRepository,
- Callable<Void> storeClientThunk) {
+ Callable<Void> storeClientThunk,
+ String storeName) {
this.systemStoreRepository = sysRepository;
+ if(storeName != null) {
+ STORES_VERSION_KEY = storeName;
+ }
+
// Get the properties object from the system store (containing versions)
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.systemStoreRepository.getMetadataVersionStore());
- try {
- this.currentClusterVersion = getCurrentVersion(CLUSTER_VERSION_KEY, versionProps);
- } catch(Exception e) {
- logger.error("Exception while getting currentClusterVersion : " + e);
- }
+ // Initialize base cluster version to do all subsequent comparisons
+ this.currentClusterVersion = initializeVersion(CLUSTER_VERSION_KEY, versionProps);
+
+ // Initialize base store version to do all subsequent comparisons
+ this.currentStoreVersion = initializeVersion(STORES_VERSION_KEY, versionProps);
- // If the received version is null, assume version 0
- if(currentClusterVersion == null) {
- currentClusterVersion = new Long(0);
- }
logger.debug("Initial cluster.xml version = " + this.currentClusterVersion);
+ logger.debug("Initial store '" + storeName + "' version = " + this.currentClusterVersion);
this.storeClientThunk = storeClientThunk;
this.isActive = true;
}
+ private Long initializeVersion(String versionKey, Properties versionProps) {
+ Long baseVersion = null;
+ try {
+ baseVersion = getCurrentVersion(versionKey, versionProps);
+ } catch(Exception e) {
+ logger.error("Exception while getting version for key : " + versionKey
+ + " Exception : " + e);
+ }
+
+ if(baseVersion == null) {
+ baseVersion = new Long(0);
+ }
+ return baseVersion;
+ }
+
public Long getCurrentVersion(String versionKey, Properties versionProps) {
Long versionValue = null;
@@ -114,7 +133,7 @@ public Long fetchNewVersion(String versionKey, Long curVersion, Properties versi
// Swallow all exceptions here (we dont want to fail the client).
catch(Exception e) {
- logger.debug("Could not retrieve Metadata Version.");
+ logger.debug("Could not retrieve Metadata Version. Exception : " + e);
}
return null;
@@ -128,18 +147,30 @@ public void run() {
* versions)
*/
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.systemStoreRepository.getMetadataVersionStore());
+
Long newClusterVersion = fetchNewVersion(CLUSTER_VERSION_KEY,
- currentClusterVersion,
+ this.currentClusterVersion,
versionProps);
+ Long newStoreVersion = fetchNewVersion(STORES_VERSION_KEY,
+ this.currentStoreVersion,
+ versionProps);
- // If nothing has been updated, continue
- if(newClusterVersion != null) {
+ // Check if something has been updated
+ if((newClusterVersion != null) || (newStoreVersion != null)) {
logger.info("Metadata version mismatch detected. Re-bootstrapping!");
try {
- logger.info("Updating cluster version");
- currentClusterVersion = newClusterVersion;
+ if(newClusterVersion != null) {
+ logger.info("Updating cluster version");
+ currentClusterVersion = newClusterVersion;
+ }
+
+ if(newStoreVersion != null) {
+ logger.info("Updating store : '" + STORES_VERSION_KEY + "' version");
+ this.currentStoreVersion = newStoreVersion;
+ }
this.storeClientThunk.call();
+
} catch(Exception e) {
if(logger.isDebugEnabled()) {
e.printStackTrace();
@@ -158,6 +189,10 @@ public Long getClusterMetadataVersion() {
return this.currentClusterVersion;
}
+ public Long getStoreMetadataVersion() {
+ return this.currentStoreVersion;
+ }
+
// Fetch the latest versions for cluster metadata
public void updateMetadataVersions() {
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.systemStoreRepository.getMetadataVersionStore());
@@ -166,4 +201,4 @@ public void updateMetadataVersions() {
this.currentClusterVersion = newVersion;
}
}
-}
+}
View
22 src/java/voldemort/serialization/DefaultSerializerFactory.java
@@ -25,6 +25,7 @@
import voldemort.serialization.avro.AvroGenericSerializer;
import voldemort.serialization.avro.AvroReflectiveSerializer;
import voldemort.serialization.avro.AvroSpecificSerializer;
+import voldemort.serialization.avro.versioned.AvroVersionedGenericSerializer;
import voldemort.serialization.json.JsonTypeDefinition;
import voldemort.serialization.json.JsonTypeSerializer;
import voldemort.serialization.protobuf.ProtoBufSerializer;
@@ -50,6 +51,12 @@
private static final String AVRO_SPECIFIC_TYPE_NAME = "avro-specific";
private static final String AVRO_REFLECTIVE_TYPE_NAME = "avro-reflective";
+ // New serialization types for avro versioning support
+ // We cannot change existing serializer classes since
+ // this will break existing clients while looking for the version byte
+
+ private static final String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";
+
public Serializer<?> getSerializer(SerializerDefinition serializerDef) {
String name = serializerDef.getName();
if(name.equals(JAVA_SERIALIZER_TYPE_NAME)) {
@@ -72,16 +79,27 @@
} else if(name.equals(PROTO_BUF_TYPE_NAME)) {
return new ProtoBufSerializer<Message>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(THRIFT_TYPE_NAME)) {
- return new ThriftSerializer<TBase<?,?>>(serializerDef.getCurrentSchemaInfo());
+ return new ThriftSerializer<TBase<?, ?>>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_GENERIC_TYPE_NAME)) {
return new AvroGenericSerializer(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_SPECIFIC_TYPE_NAME)) {
return new AvroSpecificSerializer<SpecificRecord>(serializerDef.getCurrentSchemaInfo());
} else if(name.equals(AVRO_REFLECTIVE_TYPE_NAME)) {
return new AvroReflectiveSerializer<Object>(serializerDef.getCurrentSchemaInfo());
+ } else if(name.equals(AVRO_GENERIC_VERSIONED_TYPE_NAME)) {
+ if(serializerDef.hasVersion()) {
+ Map<Integer, String> versions = new HashMap<Integer, String>();
+ for(Map.Entry<Integer, String> entry: serializerDef.getAllSchemaInfoVersions()
+ .entrySet())
+ versions.put(entry.getKey(), entry.getValue());
+ return new AvroVersionedGenericSerializer(versions);
+ } else {
+ return new AvroVersionedGenericSerializer(serializerDef.getCurrentSchemaInfo());
+ }
+
} else {
throw new IllegalArgumentException("No known serializer type: "
+ serializerDef.getName());
}
}
-}
+}
View
158 src/java/voldemort/serialization/avro/versioned/AvroVersionedGenericSerializer.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2011 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package voldemort.serialization.avro.versioned;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+
+import voldemort.serialization.SerializationException;
+import voldemort.serialization.SerializationUtils;
+import voldemort.serialization.Serializer;
+
+/**
+ * Avro serializer that uses the generic representation for Avro data. This
+ * representation is best for applications which deal with dynamic data, whose
+ * schemas are not known until runtime.
+ *
+ */
+public class AvroVersionedGenericSerializer implements Serializer<Object> {
+
+ private final SortedMap<Integer, String> typeDefVersions;
+ private final Integer newestVersion;
+
+ // reader's schema
+ private final Schema typeDef;
+
+ /**
+ * Constructor accepting the schema definition as a JSON string.
+ *
+ * @param schema a serialized JSON object representing a Avro schema.
+ */
+ public AvroVersionedGenericSerializer(String schema) {
+
+ this.typeDefVersions = new TreeMap<Integer, String>();
+ this.typeDefVersions.put(0, schema);
+ newestVersion = typeDefVersions.lastKey();
+ typeDef = Schema.parse(typeDefVersions.get(newestVersion));
+ }
+
+ public AvroVersionedGenericSerializer(Map<Integer, String> typeDefVersions) {
+
+ this.typeDefVersions = new TreeMap<Integer, String>(typeDefVersions);
+ newestVersion = this.typeDefVersions.lastKey();
+ typeDef = Schema.parse(typeDefVersions.get(newestVersion));
+
+ }
+
+ public byte[] toBytes(Object object) {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ Encoder encoder = new BinaryEncoder(output);
+ GenericDatumWriter<Object> datumWriter = null;
+
+ output.write(newestVersion.byteValue());
+ try {
+ datumWriter = new GenericDatumWriter<Object>(typeDef);
+ datumWriter.write(object, encoder);
+ encoder.flush();
+ } catch(ArrayIndexOutOfBoundsException aIOBE) {
+ Schema writer = ((GenericContainer) object).getSchema();
+ Integer writerVersion = getSchemaVersion(writer);
+ return toBytes(object, writer, writerVersion);
+
+ } catch(IOException e) {
+ throw new SerializationException(e);
+ } catch(SerializationException sE) {
+ throw sE;
+ } finally {
+ SerializationUtils.close(output);
+ }
+
+ return output.toByteArray();
+ }
+
+ private byte[] toBytes(Object object, Schema writer, Integer writerVersion) {
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ Encoder encoder = new BinaryEncoder(output);
+ GenericDatumWriter<Object> datumWriter = null;
+
+ output.write(writerVersion.byteValue());
+ try {
+ datumWriter = new GenericDatumWriter<Object>(writer);
+ datumWriter.write(object, encoder);
+ encoder.flush();
+ } catch(IOException e) {
+ throw new SerializationException(e);
+ } catch(SerializationException sE) {
+ throw sE;
+ } finally {
+ SerializationUtils.close(output);
+ }
+
+ return output.toByteArray();
+
+ }
+
+ private Integer getSchemaVersion(Schema s) throws SerializationException {
+ for(Entry<Integer, String> entry: typeDefVersions.entrySet()) {
+ Schema version = Schema.parse(entry.getValue());
+ if(s.equals(version))
+ return entry.getKey();
+
+ }
+
+ throw new SerializationException("Writer's schema invalid!");
+ }
+
+ public Object toObject(byte[] bytes) {
+
+ Integer version = Integer.valueOf(bytes[0]);
+
+ if(version > newestVersion)
+ throw new SerializationException("Client needs to rebootstrap! \n Writer's schema version greater than Reader");
+
+ Schema typeDefWriter = Schema.parse(typeDefVersions.get(version));
+
+ byte[] dataBytes = new byte[bytes.length - 1];
+ System.arraycopy(bytes, 1, dataBytes, 0, bytes.length - 1);
+ Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(dataBytes, null);
+ GenericDatumReader<Object> reader = null;
+ try {
+ reader = new GenericDatumReader<Object>(typeDefWriter, typeDef);
+ // writer's schema
+ reader.setSchema(typeDefWriter);
+ // Reader's schema
+ reader.setExpected(typeDef);
+ return reader.read(null, decoder);
+ } catch(IOException e) {
+ throw new SerializationException(e);
+ }
+
+ }
+}
View
874 src/java/voldemort/serialization/avro/versioned/SchemaEvolutionValidator.java
@@ -0,0 +1,874 @@
+/*
+ * Copyright 2011 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.serialization.avro.versioned;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.log4j.Level;
+import org.codehaus.jackson.JsonNode;
+
+import voldemort.VoldemortException;
+import voldemort.serialization.SerializerDefinition;
+
+/**
+ * Provides methods to compare schemas for schema evolution and indicate any
+ * potential problems.
+ *
+ * @author Jemiah Westerman <jwesterman@linkedin.com>
+ *
+ * @version $Revision$
+ */
+public class SchemaEvolutionValidator {
+
+ private static final Schema NULL_TYPE_SCHEMA = Schema.create(Schema.Type.NULL);
+ private final List<String> _recordStack = new ArrayList<String>();
+
+ /**
+ * This main method provides an easy command line tool to compare two
+ * schemas.
+ */
+ public static void main(String[] args) {
+ if(args.length != 2) {
+ System.out.println("Usage: SchemaEvolutionValidator pathToOldSchema pathToNewSchema");
+ return;
+ }
+
+ Schema oldSchema;
+ Schema newSchema;
+
+ try {
+ oldSchema = Schema.parse(new File(args[0]));
+ } catch(Exception ex) {
+ oldSchema = null;
+ System.out.println("Could not open or parse the old schema (" + args[0] + ") due to "
+ + ex);
+ }
+
+ try {
+ newSchema = Schema.parse(new File(args[1]));
+ } catch(Exception ex) {
+ newSchema = null;
+ System.out.println("Could not open or parse the new schema (" + args[1] + ") due to "
+ + ex);
+ }
+
+ if(oldSchema == null || newSchema == null) {
+ return;
+ }
+
+ System.out.println("Comparing: ");
+ System.out.println("\t" + args[0]);
+ System.out.println("\t" + args[1]);
+
+ List<Message> messages = SchemaEvolutionValidator.checkBackwardCompatability(oldSchema,
+ newSchema,
+ oldSchema.getName());
+ Level maxLevel = Level.ALL;
+ for(Message message: messages) {
+ System.out.println(message.getLevel() + ": " + message.getMessage());
+ if(message.getLevel().isGreaterOrEqual(maxLevel)) {
+ maxLevel = message.getLevel();
+ }
+ }
+
+ if(maxLevel.isGreaterOrEqual(Level.ERROR)) {
+ System.out.println(Level.ERROR
+ + ": The schema is not backward compatible. New clients will not be able to read existing data.");
+ } else if(maxLevel.isGreaterOrEqual(Level.WARN)) {
+ System.out.println(Level.WARN
+ + ": The schema is partially backward compatible, but old clients will not be able to read data serialized in the new format.");
+ } else {
+ System.out.println(Level.INFO
+ + ": The schema is backward compatible. Old and new clients will be able to read records serialized by one another.");
+ }
+ }
+
+ /**
+ * Compare two schemas to see if they are backward compatible. Returns a
+ * list of validation messages. <li>ERROR messages indicate the schemas are
+ * not backward compatible and the new schema should not be allowed. If an
+ * ERROR schema is uploaded, clients will not be able to read existing data.
+ * <li>WARN messages indicate that the new schemas may cause problems for
+ * existing clients. However, once all clients are updated to this version
+ * of the schema they should be able to read new and existing data. <li>INFO
+ * messages note changes to the schema, basically providing a friendly list
+ * of what has changed from one version to the next. This includes changes
+ * like the addition of fields, changes to default values, etc.
+ *
+ * @param oldSchema the original schema
+ * @param newSchema the new schema
+ * @param name the schema name
+ * @return list of messages about the schema evolution
+ */
+ public static List<Message> checkBackwardCompatability(Schema oldSchema,
+ Schema newSchema,
+ String name) {
+ SchemaEvolutionValidator validator = new SchemaEvolutionValidator();
+ List<Message> messages = new ArrayList<Message>();
+ validator.compareTypes(oldSchema, newSchema, messages, name);
+ return messages;
+ }
+
+ /* package private */void compareTypes(Schema oldSchema,
+ Schema newSchema,
+ List<Message> messages,
+ String name) {
+ oldSchema = stripOptionalTypeUnion(oldSchema);
+ newSchema = stripOptionalTypeUnion(newSchema);
+ Schema.Type oldType = oldSchema.getType();
+ Schema.Type newType = newSchema.getType();
+
+ if(oldType != Type.UNION && newType == Type.UNION) {
+ boolean compatibleTypeFound = false;
+ for(Schema s: newSchema.getTypes()) {
+ if((oldType != Type.RECORD && oldType == s.getType())
+ || (oldType == Type.RECORD && s.getType() == Type.RECORD && oldSchema.getName()
+ .equals(s.getName()))) {
+ compareTypes(oldSchema, s, messages, name);
+ compatibleTypeFound = true;
+ break;
+ }
+ }
+
+ if(compatibleTypeFound) {
+ messages.add(new Message(Level.INFO,
+ "Type change from " + oldType + " to " + newType
+ + " for field " + name
+ + ". The new union includes the original type."));
+ } else {
+ messages.add(new Message(Level.ERROR,
+ "Incompatible type change from "
+ + oldType
+ + " to "
+ + newType
+ + " for field "
+ + name
+ + ". The new union does not include the original type."));
+ }
+ } else if(oldType == Type.RECORD) {
+ if(!_recordStack.contains(oldSchema.getName())) {
+ _recordStack.add(oldSchema.getName());
+ compareRecordTypes(oldSchema, newSchema, messages, name);
+ _recordStack.remove(oldSchema.getName());
+ }
+ } else if(oldType == Type.ENUM) {
+ compareEnumTypes(oldSchema, newSchema, messages, name);
+ } else if(oldType == Type.ARRAY) {
+ compareArrayTypes(oldSchema, newSchema, messages, name);
+ } else if(oldType == Type.MAP) {
+ compareMapTypes(oldSchema, newSchema, messages, name);
+ } else if(oldType == Type.UNION) {
+ compareUnionTypes(oldSchema, newSchema, messages, name);
+ } else if(oldType == Type.FIXED) {
+ compareFixedTypes(oldSchema, newSchema, messages, name);
+ } else {
+ comparePrimitiveTypes(oldSchema, newSchema, messages, name);
+ }
+ }
+
+ /* package private */void compareRecordTypes(Schema oldSchema,
+ Schema newSchema,
+ List<Message> messages,
+ String name) {
+ if(oldSchema == null || newSchema == null || oldSchema.getType() != Schema.Type.RECORD) {
+ throw new IllegalArgumentException("Old schema must be RECORD type. Name=" + name
+ + ". Type=" + oldSchema);
+ }
+
+ if(newSchema.getType() != Schema.Type.RECORD) {
+ messages.add(new Message(Level.ERROR, "Illegal type change from " + oldSchema.getType()
+ + " to " + newSchema.getType() + " for field "
+ + name));
+ return;
+ }
+
+ // Check all of the fields in the new schema
+ for(Field newField: newSchema.getFields()) {
+ String fieldName = newField.name();
+ Field oldField = oldSchema.getField(fieldName);
+
+ if(oldField == null) {
+ // This is a new field that did not exist in the original
+ // schema.
+ // Check if it is optional or has a default value.
+ if(isOptional(newField)) {
+ if(newField.defaultValue() == null) {
+ messages.add(new Message(Level.INFO, "Added optional field " + name + "."
+ + fieldName
+ + " with no default value."));
+ } else {
+ messages.add(new Message(Level.INFO, "Added optional field " + name + "."
+ + fieldName + " with default value: "
+ + newField.defaultValue()));
+ }
+ } else {
+ if(newField.defaultValue() == null) {
+ messages.add(new Message(Level.ERROR, "Added required field " + name + "."
+ + fieldName
+ + " with no default value."));
+ } else {
+ messages.add(new Message(Level.INFO, "Added required field " + name + "."
+ + fieldName + " with default value: "
+ + newField.defaultValue()));
+ }
+ }
+ } else {
+ // This is a field that existed in the original schema.
+
+ // Check if the field was changed from optional to required or
+ // vice versa.
+ boolean newFieldIsOptional = isOptional(newField);
+ boolean oldFieldIsOptional = isOptional(oldField);
+
+ if(oldFieldIsOptional != newFieldIsOptional) {
+ if(oldFieldIsOptional) {
+ messages.add(new Message(Level.ERROR,
+ "Existing field " + name + "." + fieldName
+ + " was optional and is now required."));
+ } else {
+ messages.add(new Message(Level.WARN, "Existing field " + name + "."
+ + fieldName
+ + " was required and is now optional."));
+ }
+ }
+
+ // Recursively compare the nested field types
+ compareTypes(oldField.schema(), newField.schema(), messages, name + "." + fieldName);
+
+ // Check if the default value has been changed
+ if(newField.defaultValue() == null) {
+ if(oldField.defaultValue() != null) {
+ messages.add(new Message(Level.WARN,
+ "Removed default value for existing field " + name
+ + "." + fieldName
+ + ". The old default was: "
+ + oldField.defaultValue()));
+ }
+ } else // newField.defaultValue() != null
+ {
+ if(oldField.defaultValue() == null) {
+ messages.add(new Message(Level.WARN,
+ "Added a default value for existing field " + name
+ + "." + fieldName
+ + ". The new default is: "
+ + newField.defaultValue()));
+ } else if(!newField.defaultValue().equals(oldField.defaultValue())) {
+ messages.add(new Message(Level.INFO,
+ "Changed the default value for existing field "
+ + name + "." + fieldName
+ + ". The old default was: "
+ + oldField.defaultValue()
+ + ". The new default is: "
+ + newField.defaultValue()));
+ }
+ }
+ }
+
+ // For all fields in the new schema (whether or not it existed in
+ // the old schema), if there is a default value for this field, make
+ // sure it is legal.
+ if(newField.defaultValue() != null) {
+ checkDefaultValueIsLegal(newField, messages, name + "." + fieldName);
+ }
+ }
+
+ // Check if any fields were removed.
+ for(Field oldField: newSchema.getFields()) {
+ String fieldName = oldField.name();
+ Field newField = newSchema.getField(fieldName);
+
+ if(newField == null) {
+ if(isOptional(oldField)) {
+ messages.add(new Message(Level.INFO, "Removed optional field " + name + "."
+ + fieldName));
+ } else {
+ messages.add(new Message(Level.WARN, "Removed required field " + name + "."
+ + fieldName));
+ }
+ }
+ }
+
+ // Check if indexing was modified or added to any old fields.
+ for(Field oldField: oldSchema.getFields()) {
+ if(newSchema.getField(oldField.name()) != null) {
+ String oldIndexType = oldField.getProp("indexType");
+ String newIndexType = newSchema.getField(oldField.name()).getProp("indexType");
+
+ // Check if added indexing.
+ if(oldIndexType == null && newIndexType != null) {
+ messages.add(new Message(Level.ERROR,
+ "Cannot add indexing to "
+ + oldField.name()
+ + ". Adding indexing to fields created without indexing is not supported."));
+ }
+
+ // Check if changed indexing
+ if(oldIndexType != null && !oldIndexType.equals(newIndexType)) {
+ messages.add(new Message(Level.ERROR,
+ "Cannot change indexing from "
+ + oldIndexType
+ + " to "
+ + newIndexType
+ + " for "
+ + oldField.name()
+ + ". Changing indexing method is not supported."));
+ }
+ }
+ }
+ }
+
+ /* package private */void compareUnionTypes(Schema oldSchema,
+ Schema newSchema,
+ List<Message> messages,
+ String name) {
+ if(oldSchema == null || newSchema == null || oldSchema.getType() != Schema.Type.UNION) {
+ throw new IllegalArgumentException("Old and new schema must both be UNION types. Name="
+ + name + ". Type=" + oldSchema);
+ }
+
+ // Build a list of type names, ignoring nulls which are checked
+ // separately as optional/required fields
+ List<Schema> newTypes = new ArrayList<Schema>();
+ List<String> newTypeNames = new ArrayList<String>();
+ List<Schema> oldTypes = new ArrayList<Schema>();
+ List<String> oldTypeNames = new ArrayList<String>();
+
+ if(newSchema.getType() == Type.UNION) {
+ for(Schema schema: newSchema.getTypes()) {
+ if(schema.getType() != Schema.Type.NULL) {
+ newTypes.add(schema);
+ newTypeNames.add(schema.getName());
+ }
+ }
+ } else {
+ newTypes.add(newSchema);
+ newTypeNames.add(newSchema.getName());
+ }
+
+ for(Schema schema: oldSchema.getTypes()) {
+ if(schema.getType() != Schema.Type.NULL) {
+ oldTypes.add(schema);
+ oldTypeNames.add(schema.getName());
+ }
+ }
+
+ if(!newTypeNames.containsAll(oldTypeNames)) {
+ messages.add(new Message(Level.ERROR,
+ "Existing UNION field " + name
+ + " had one or more types removed. The old types are:"
+ + oldTypeNames + ". The new types are: "
+ + newTypeNames));
+ }
+ if(!oldTypeNames.containsAll(newTypeNames)) {
+ messages.add(new Message(Level.INFO,
+ "Existing UNION field " + name
+ + " had one or more types added. The old types are:"
+ + oldTypeNames + ". The new types are: "
+ + newTypeNames));
+ }
+ if(newTypeNames.containsAll(oldTypeNames) && oldTypeNames.containsAll(newTypeNames)
+ && !newTypeNames.equals(oldTypeNames)) {
+ messages.add(new Message(Level.INFO,
+ "Existing UNION field "
+ + name
+ + " had one or more types reordered. The old types are:"
+ + oldTypeNames + ". The new types are: "
+ + newTypeNames));
+ }
+
+ for(int i = 0; i < newTypeNames.size(); i++) {
+ String typeName = newTypeNames.get(i);
+ int oldIndex = oldTypeNames.indexOf(typeName);
+ if(oldIndex != -1) {
+ compareTypes(oldTypes.get(oldIndex),
+ newTypes.get(i),
+ messages,
+ name + "." + oldTypes.get(oldIndex).getName());
+ }
+ }
+ }
+
+ /* package private */void compareEnumTypes(Schema oldSchema,
+ Schema newSchema,
+ List<Message> messages,
+ String name) {
+ if(oldSchema == null || newSchema == null || oldSchema.getType() != Schema.Type.ENUM) {
+ throw new IllegalArgumentException("Old schema must be ENUM type. Name=" + name
+ + ". Type=" + oldSchema);
+ }
+
+ if(newSchema.getType() != Schema.Type.ENUM) {
+ messages.add(new Message(Level.ERROR, "Illegal type change from " + oldSchema.getType()
+ + " to " + newSchema.getType() + " for field "
+ + name));
+ return;
+ }
+
+ List<String> newEnumSymbols = newSchema.getEnumSymbols();
+ List<String> oldEnumSymbols = oldSchema.getEnumSymbols();
+
+ // Check if enum types were added or removed
+ if(!newEnumSymbols.containsAll(oldEnumSymbols)) {
+ messages.add(new Message(Level.ERROR,
+ "Existing ENUM field "
+ + name
+ + " had one or more enum symbols removed. The old symbols are:"
+ + oldEnumSymbols + ". The new symbols are: "
+ + newEnumSymbols));
+ }
+ if(!oldEnumSymbols.containsAll(newEnumSymbols)) {
+ messages.add(new Message(Level.INFO,
+ "Existing ENUM field "
+ + name
+ + " had one or more enum symbols added. The old symbols are:"
+ + oldEnumSymbols + ". The new symbols are: "
+ + newEnumSymbols));
+ }
+
+ // Check if enum types were reordered.
+ if(newEnumSymbols.containsAll(oldEnumSymbols)) {
+ for(int i = 0; i < oldEnumSymbols.size(); i++) {
+ if(!oldEnumSymbols.get(i).equals(newEnumSymbols.get(i))) {
+ messages.add(new Message(Level.WARN,
+ "Existing ENUM field "
+ + name
+ + " had one or more enum symbols reordered. The old symbols are:"
+ + oldEnumSymbols + ". The new symbols are: "
+ + newEnumSymbols));
+ break;
+ }
+ }
+ }
+ }
+
+ /* package private */void compareArrayTypes(Schema oldSchema,
+ Schema newSchema,
+ List<Message> messages,
+ String name) {
+ if(oldSchema == null || newSchema == null || oldSchema.getType() != Schema.Type.ARRAY) {
+ throw new IllegalArgumentException("Old schema must be ARRAY type. Name=" + name
+ + ". Type=" + oldSchema);
+ }
+
+ if(newSchema.getType() != Schema.Type.ARRAY) {
+ messages.add(new Message(Level.ERROR, "Illegal type change from " + oldSchema.getType()
+ + " to " + newSchema.getType() + " for field "
+ + name));
+ return;
+ }
+
+ // Compare the array element types
+ compareTypes(oldSchema.getElementType(),
+ newSchema.getElementType(),
+ messages,
+ name + ".<array element>");
+ }
+
+ /* package private */void compareMapTypes(Schema oldSchema,
+ Schema newSchema,
+ List<Message> messages,
+ String name) {
+ if(oldSchema == null || newSchema == null || oldSchema.getType() != Schema.Type.MAP) {
+ throw new IllegalArgumentException("Old schema must be MAP type. Name=" + name
+ + ". Type=" + oldSchema);
+ }
+
+ if(newSchema.getType() != Schema.Type.MAP) {
+ messages.add(new Message(Level.ERROR, "Illegal type change from " + oldSchema.getType()
+ + " to " + newSchema.getType() + " for field "
+ + name));
+ return;
+ }
+
+ // Compare the array element types
+ compareTypes(oldSchema.getValueType(),
+ newSchema.getValueType(),
+ messages,
+ name + ".<map element>");
+ }
+
+ /* package private */void compareFixedTypes(Schema oldSchema,
+ Schema newSchema,
+ List<Message> messages,
+ String name) {
+ if(oldSchema == null || newSchema == null || oldSchema.getType() != Schema.Type.FIXED) {
+ throw new IllegalArgumentException("Old schema must be FIXED type. Name=" + name
+ + ". Type=" + oldSchema);
+ }
+
+ if(newSchema.getType() != Schema.Type.FIXED) {
+ messages.add(new Message(Level.ERROR, "Illegal type change from " + oldSchema.getType()
+ + " to " + newSchema.getType() + " for field "
+ + name));
+ return;
+ }
+
+ if(newSchema.getFixedSize() != oldSchema.getFixedSize()) {
+ messages.add(new Message(Level.ERROR, "Illegal size change for fixed type field "
+ + name));
+ }
+ }
+
+ /* package private */void comparePrimitiveTypes(Schema oldSchema,
+ Schema newSchema,
+ List<Message> messages,
+ String name) {
+ if(oldSchema == null || newSchema == null) {
+ throw new IllegalArgumentException("Old schema must both be a primitive type. Name="
+ + name + ". Type=" + oldSchema);
+ }
+
+ Schema.Type oldType = oldSchema.getType();
+ Schema.Type newType = newSchema.getType();
+
+ if(oldType != newType) {
+ if(((oldType == Schema.Type.INT && (newType == Schema.Type.LONG
+ || newType == Schema.Type.FLOAT || newType == Schema.Type.DOUBLE))
+ || (oldType == Schema.Type.LONG && (newType == Schema.Type.FLOAT || newType == Schema.Type.DOUBLE)) || (oldType == Schema.Type.FLOAT && (newType == Schema.Type.DOUBLE)))) {
+ messages.add(new Message(Level.INFO, "Type change from " + oldSchema.getType()
+ + " to " + newSchema.getType() + " for field "
+ + name));
+ } else {
+ messages.add(new Message(Level.ERROR, "Illegal type change from "
+ + oldSchema.getType() + " to "
+ + newSchema.getType() + " for field " + name));
+ }
+ }
+ }
+
+ /**
+ * Returns true if this field is optional. Optional fields are represented
+ * as a type union containing the null type.
+ *
+ * @param field
+ * @return
+ */
+ /* package private */boolean isOptional(Field field) {
+ if(field.schema().getType() == Type.UNION) {
+ for(Schema nestedType: field.schema().getTypes()) {
+ if(nestedType.getType() == Type.NULL) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /* package private */Schema stripOptionalTypeUnion(Schema schema) {
+ if(schema.getType() == Schema.Type.UNION && schema.getTypes().size() == 2
+ && schema.getTypes().contains(NULL_TYPE_SCHEMA)) {
+ return schema.getTypes().get(0).equals(NULL_TYPE_SCHEMA) ? schema.getTypes().get(1)
+ : schema.getTypes().get(0);
+ }
+ return schema;
+ }
+
+ public static class Message {
+
+ private final Level _level;
+ private final String _message;
+
+ public Message(Level level, String message) {
+ super();
+ _level = level;
+ _message = message;
+ }
+
+ public Level getLevel() {
+ return _level;
+ }
+
+ public String getMessage() {
+ return _message;
+ }
+
+ @Override
+ public String toString() {
+ return _level + ": " + _message;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_level == null) ? 0 : _level.toString().hashCode());
+ result = prime * result + ((_message == null) ? 0 : _message.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(this == obj)
+ return true;
+ if(obj == null)
+ return false;
+ if(getClass() != obj.getClass())
+ return false;
+ Message other = (Message) obj;
+ if(_level == null) {
+ if(other._level != null)
+ return false;
+ } else if(!_level.equals(other._level))
+ return false;
+ if(_message == null) {
+ if(other._message != null)
+ return false;
+ } else if(!_message.equals(other._message))
+ return false;
+ return true;
+ }
+ }
+
+ /**
+ * Makes sure the default value is good
+ *
+ * @param parent
+ * @param field
+ */
+ /* package private */static void checkDefaultValueIsLegal(Field field,
+ List<Message> messages,
+ String name) {
+ if(field == null) {
+ throw new IllegalArgumentException("Field must be non-null. Name=" + name);
+ }
+
+ if(field.defaultValue() != null) {
+ // Get the type schema. If this is a UNION, the default must be of
+ // the leading type
+ Schema fieldSchema = field.schema();
+ if(fieldSchema.getType() == Schema.Type.UNION) {
+ fieldSchema = fieldSchema.getTypes().get(0);
+ }
+
+ // Get the default value
+ JsonNode defaultJson = field.defaultValue();
+
+ String expectedVal = checkDefaultJson(defaultJson, field.schema());
+
+ if(expectedVal != null) {
+ messages.add(new Message(Level.ERROR, "Illegal default value for field " + name
+ + ". The default must be of type "
+ + expectedVal + "."));
+ }
+ }
+ }
+
+ /**
+ * Check that the default json node is a valid default value. If not, return
+ * the expected type as a String.
+ */
+ /* package private */static String checkDefaultJson(JsonNode defaultJson, Schema schema) {
+ Schema.Type fieldType = schema.getType();
+ String expectedVal = null;
+ switch(fieldType) {
+ case NULL:
+ if(!defaultJson.isNull()) {
+ expectedVal = "null";
+ }
+
+ break;
+ case BOOLEAN:
+ if(!defaultJson.isBoolean()) {
+ expectedVal = "boolean";
+ }
+ break;
+ case INT:
+ if(!defaultJson.isInt()) {
+ expectedVal = "int";
+ }
+ break;
+ case LONG:
+ if(!defaultJson.isInt() && !defaultJson.isLong()) {
+ expectedVal = "long";
+ }
+ break;
+ case FLOAT:
+ case DOUBLE:
+ if(!defaultJson.isNumber()) {
+ expectedVal = "number";
+ }
+ break;
+ case BYTES:
+ if(defaultJson.isTextual()) {
+ break;
+ }
+ expectedVal = "bytes (ex. \"\\u00FF\")";
+ break;
+ case STRING:
+ if(!defaultJson.isTextual()) {
+ expectedVal = "string";
+ }
+ break;
+ case ENUM:
+ if(defaultJson.isTextual()) {
+ if(schema.hasEnumSymbol(defaultJson.getTextValue())) {
+ break;
+ }
+ }
+ expectedVal = "valid enum";
+ break;
+ case FIXED:
+ if(defaultJson.isTextual()) {
+ byte[] fixed = defaultJson.getValueAsText().getBytes();
+ if(fixed.length == schema.getFixedSize()) {
+ break;
+ }
+ expectedVal = "fixed size incorrect. Expected size: " + schema.getFixedSize()
+ + " got size " + fixed.length;
+ break;
+ }
+ expectedVal = "fixed (ex. \"\\u00FF\")";
+ break;
+ case ARRAY:
+ if(defaultJson.isArray()) {
+ // Check all array variables
+ boolean isGood = true;
+ for(JsonNode node: defaultJson) {
+ String val = checkDefaultJson(node, schema.getElementType());
+ if(val == null) {
+ continue;
+ } else {
+ isGood = false;
+ break;
+ }
+ }
+
+ if(isGood) {
+ break;
+ }
+ }
+ expectedVal = "array of type " + schema.getElementType().toString();
+ break;
+ case MAP:
+ if(defaultJson.isObject()) {
+ boolean isGood = true;
+ for(JsonNode node: defaultJson) {
+ String val = checkDefaultJson(node, schema.getValueType());
+ if(val == null) {
+ continue;
+ } else {
+ isGood = false;
+ break;
+ }
+ }
+
+ if(isGood) {
+ break;
+ }
+ }
+
+ expectedVal = "map of type " + schema.getValueType().toString();
+ break;
+ case RECORD:
+ if(defaultJson.isObject()) {
+ boolean isGood = true;
+ for(Field field: schema.getFields()) {
+ JsonNode jsonNode = defaultJson.get(field.name());
+
+ if(jsonNode == null) {
+ jsonNode = field.defaultValue();
+ if(jsonNode == null) {
+ isGood = false;
+ break;
+ }
+ }
+
+ String val = checkDefaultJson(jsonNode, field.schema());
+ if(val != null) {
+ isGood = false;
+ break;
+ }
+ }
+
+ if(isGood) {
+ break;
+ }
+ }
+
+ expectedVal = "record of type " + schema.toString();
+ break;
+ case UNION:
+ // Avro spec states we only need to match with the first item
+ expectedVal = checkDefaultJson(defaultJson, schema.getTypes().get(0));
+ break;
+ }
+
+ return expectedVal;
+ }
+
+ public static void checkSchemaCompatibility(SerializerDefinition serDef) {
+
+ Map<Integer, String> schemaVersions = serDef.getAllSchemaInfoVersions();
+
+ Iterator schemaIterator = schemaVersions.entrySet().iterator();
+
+ Schema firstSchema = null;
+ Schema secondSchema = null;
+
+ String firstSchemaStr;
+ String secondSchemaStr;
+
+ if(!schemaIterator.hasNext())
+ throw new VoldemortException("No schema specified");
+
+ Map.Entry schemaPair = (Map.Entry) schemaIterator.next();
+
+ firstSchemaStr = (String) schemaPair.getValue();
+
+ while(schemaIterator.hasNext()) {
+
+ schemaPair = (Map.Entry) schemaIterator.next();
+
+ secondSchemaStr = (String) schemaPair.getValue();
+ Schema oldSchema = Schema.parse(firstSchemaStr);
+ Schema newSchema = Schema.parse(secondSchemaStr);
+ List<Message> messages = SchemaEvolutionValidator.checkBackwardCompatability(oldSchema,
+ newSchema,
+ oldSchema.getName());
+ Level maxLevel = Level.ALL;
+ for(Message message: messages) {
+ System.out.println(message.getLevel() + ": " + message.getMessage());
+ if(message.getLevel().isGreaterOrEqual(maxLevel)) {
+ maxLevel = message.getLevel();
+ }
+ }
+
+ if(maxLevel.isGreaterOrEqual(Level.ERROR)) {
+ System.out.println(Level.ERROR
+ + ": The schema is not backward compatible. New clients will not be able to read existing data.");
+ throw new VoldemortException(" The schema is not backward compatible. New clients will not be able to read existing data.");
+ } else if(maxLevel.isGreaterOrEqual(Level.WARN)) {
+ System.out.println(Level.WARN
+ + ": The schema is partially backward compatible, but old clients will not be able to read data serialized in the new format.");
+ throw new VoldemortException("The schema is partially backward compatible, but old clients will not be able to read data serialized in the new format.");
+ } else {
+ System.out.println(Level.INFO
+ + ": The schema is backward compatible. Old and new clients will be able to read records serialized by one another.");
+ }
+
+ firstSchemaStr = secondSchemaStr;
+
+ }
+ }
+}
View
23 src/java/voldemort/server/storage/StorageService.java
@@ -55,6 +55,8 @@
import voldemort.common.service.ServiceType;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
+import voldemort.serialization.SerializerDefinition;
+import voldemort.serialization.avro.versioned.SchemaEvolutionValidator;
import voldemort.server.RequestRoutingType;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
@@ -329,6 +331,25 @@ protected void startInner() {
List<StoreDefinition> storeDefs = new ArrayList<StoreDefinition>(this.metadata.getStoreDefList());
logger.info("Initializing stores:");
+ logger.info("Validating schemas:");
+ String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";
+
+ for(StoreDefinition storeDef: storeDefs) {
+ SerializerDefinition keySerDef = storeDef.getKeySerializer();
+ SerializerDefinition valueSerDef = storeDef.getValueSerializer();
+
+ if(keySerDef.getName().equals(AVRO_GENERIC_VERSION