Skip to content
Browse files

BDB data conversion utility

  • Loading branch information...
1 parent 7c9e2b0 commit 5e48136648c12bcd11ff7557a439edf43f52cb23 @vinothchandar vinothchandar committed
View
39 bin/voldemort-convert-bdb.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+#
+# Copyright 2008-2012 LinkedIn, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+base_dir=$(dirname $0)/..
+
+for file in $base_dir/dist/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/lib/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/contrib/*/lib/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+CLASSPATH=$CLASSPATH:$base_dir/dist/resources
+
+JVM_OPTS="-server -Xms5g -Xmx5g -XX:NewSize=1024m -XX:MaxNewSize=1024m -XX:+AlwaysPreTouch -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:SurvivorRatio=2"
+
+java -Dlog4j.configuration=src/java/log4j.properties $JVM_OPTS -cp $CLASSPATH voldemort.store.bdb.dataconversion.BdbConvertData $@
View
100 src/java/voldemort/store/bdb/dataconversion/AbstractBdbConversion.java
@@ -0,0 +1,100 @@
+package voldemort.store.bdb.dataconversion;
+
+import java.io.File;
+
+import org.apache.log4j.Logger;
+
+import voldemort.cluster.Cluster;
+import voldemort.xml.ClusterMapper;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+
+public abstract class AbstractBdbConversion {
+
+ String storeName;
+ Database srcDB;
+ Environment srcEnv;
+
+ Database dstDB;
+ Environment dstEnv;
+ Cluster cluster;
+
+ Cursor cursor;
+
+ Logger logger = Logger.getLogger(BdbConvertData.class);
+
+ AbstractBdbConversion(String storeName,
+ String clusterXmlPath,
+ String sourceEnvPath,
+ String destEnvPath,
+ int logFileSize,
+ int nodeMax) throws Exception {
+ this.cluster = new ClusterMapper().readCluster(new File(clusterXmlPath));
+ this.storeName = storeName;
+
+ // Configure src environment handle
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setTransactional(true);
+ envConfig.setAllowCreate(false);
+ envConfig.setReadOnly(true);
+ envConfig.setCacheSize(1024 * 1024 * 1024);
+
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(false);
+ dbConfig.setSortedDuplicates(areDuplicatesNeededForSrc());
+ dbConfig.setReadOnly(true);
+
+ srcEnv = new Environment(new File(sourceEnvPath), envConfig);
+ srcDB = srcEnv.openDatabase(null, storeName, dbConfig);
+
+ // Configure dest environment handle
+ File newEnvDir = new File(destEnvPath);
+ if(!newEnvDir.exists()) {
+ newEnvDir.mkdirs();
+ }
+
+ envConfig = new EnvironmentConfig();
+ envConfig.setTransactional(false);
+ envConfig.setAllowCreate(true);
+ envConfig.setReadOnly(false);
+ envConfig.setCacheSize(1024 * 1024 * 1024);
+ envConfig.setConfigParam(EnvironmentConfig.LOG_FILE_MAX,
+ Long.toString(logFileSize * 1024L * 1024L));
+ envConfig.setDurability(Durability.COMMIT_NO_SYNC);
+
+ dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(false);
+ dbConfig.setAllowCreate(true);
+ dbConfig.setSortedDuplicates(areDuplicatesNeededForDest());
+ dbConfig.setDeferredWrite(true);
+ dbConfig.setNodeMaxEntries(nodeMax);
+
+ dstEnv = new Environment(newEnvDir, envConfig);
+ dstDB = dstEnv.openDatabase(null, storeName, dbConfig);
+
+ }
+
+ public void close() {
+ if(cursor != null)
+ cursor.close();
+
+ srcDB.close();
+ srcEnv.close();
+
+ dstDB.sync();
+ dstDB.close();
+ dstEnv.close();
+ }
+
+ public abstract void transfer() throws Exception;
+
+ public abstract boolean areDuplicatesNeededForSrc();
+
+ public abstract boolean areDuplicatesNeededForDest();
+}
View
96 src/java/voldemort/store/bdb/dataconversion/BdbConvertBaseToNewDup.java
@@ -0,0 +1,96 @@
+package voldemort.store.bdb.dataconversion;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import voldemort.store.StoreBinaryFormat;
+import voldemort.utils.ByteUtils;
+import voldemort.versioning.VectorClock;
+import voldemort.versioning.Versioned;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+
+public class BdbConvertBaseToNewDup extends AbstractBdbConversion {
+
+ BdbConvertBaseToNewDup(String storeName,
+ String clusterXmlPath,
+ String sourceEnvPath,
+ String destEnvPath,
+ int logFileSize,
+ int nodeMax) throws Exception {
+ super(storeName, clusterXmlPath, sourceEnvPath, destEnvPath, logFileSize, nodeMax);
+ }
+
+ @Override
+ public void transfer() throws Exception {
+ cursor = srcDB.openCursor(null, null);
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+
+ byte[] prevKey = null;
+ List<Versioned<byte[]>> vals = new ArrayList<Versioned<byte[]>>();
+
+ long startTime = System.currentTimeMillis();
+ int scanCount = 0;
+ int keyCount = 0;
+ while(cursor.getNext(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS) {
+ scanCount++;
+ if(scanCount % 1000000 == 0)
+ logger.info("Converted " + scanCount + "entries in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+
+ // read the value as a versioned Object
+ VectorClock clock = new VectorClock(valueEntry.getData());
+ byte[] bytes = ByteUtils.copy(valueEntry.getData(),
+ clock.sizeInBytes(),
+ valueEntry.getData().length);
+ Versioned<byte[]> value = new Versioned<byte[]>(bytes, clock);
+ byte[] key = keyEntry.getData();
+
+ if(prevKey != null && (ByteUtils.compare(prevKey, key) != 0)) {
+ // there is a new key; write out the buffered values and
+ // previous key
+ OperationStatus putStatus = dstDB.put(null,
+ new DatabaseEntry(prevKey),
+ new DatabaseEntry(StoreBinaryFormat.toByteArray(vals)));
+ if(OperationStatus.SUCCESS != putStatus) {
+ String errorStr = "Put failed with " + putStatus + " for key"
+ + BdbConvertData.writeAsciiString(prevKey);
+ logger.error(errorStr);
+ throw new Exception(errorStr);
+ }
+ vals = new ArrayList<Versioned<byte[]>>();
+ keyCount++;
+ }
+
+ vals.add(value);
+ prevKey = key;
+ }
+ if(vals.size() > 0) {
+ OperationStatus putStatus = dstDB.put(null,
+ new DatabaseEntry(prevKey),
+ new DatabaseEntry(StoreBinaryFormat.toByteArray(vals)));
+ if(OperationStatus.SUCCESS != putStatus) {
+ String errorStr = "Put failed with " + putStatus + " for key"
+ + BdbConvertData.writeAsciiString(prevKey);
+ logger.error(errorStr);
+ throw new Exception(errorStr);
+ }
+ keyCount++;
+ }
+ logger.info("Completed " + scanCount + "entries and " + keyCount + " keys in "
@ctasada
ctasada added a note

Cosmetic thing. Should be

logger.info("Completed " + scanCount + " entries and " + keyCount + " keys in "

Note the extra space.

@vinothchandar Collaborator

will change....

@vinothchandar Collaborator

this is pushed to master now..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForSrc() {
+ return true;
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForDest() {
+ return false;
+ }
+}
View
106 src/java/voldemort/store/bdb/dataconversion/BdbConvertBaseToPidScan.java
@@ -0,0 +1,106 @@
+package voldemort.store.bdb.dataconversion;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import voldemort.store.StoreBinaryFormat;
+import voldemort.utils.ByteUtils;
+import voldemort.utils.FnvHashFunction;
+import voldemort.utils.HashFunction;
+import voldemort.versioning.VectorClock;
+import voldemort.versioning.Versioned;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+
+public class BdbConvertBaseToPidScan extends AbstractBdbConversion {
+
+ BdbConvertBaseToPidScan(String storeName,
+ String clusterXmlPath,
+ String sourceEnvPath,
+ String destEnvPath,
+ int logFileSize,
+ int nodeMax) throws Exception {
+ super(storeName, clusterXmlPath, sourceEnvPath, destEnvPath, logFileSize, nodeMax);
+ }
+
+ @Override
+ public void transfer() throws Exception {
+ cursor = srcDB.openCursor(null, null);
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+
+ byte[] prevKey = null;
+ List<Versioned<byte[]>> vals = new ArrayList<Versioned<byte[]>>();
+ HashFunction hash = new FnvHashFunction();
+ int totalPartitions = cluster.getNumberOfPartitions();
+
+ long startTime = System.currentTimeMillis();
+ int scanCount = 0;
+ int keyCount = 0;
+ while(cursor.getNext(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS) {
+ scanCount++;
+ if(scanCount % 1000000 == 0)
+ logger.info("Converted " + scanCount + "entries in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+
+ // read the value as a versioned Object
+ VectorClock clock = new VectorClock(valueEntry.getData());
+ byte[] bytes = ByteUtils.copy(valueEntry.getData(),
+ clock.sizeInBytes(),
+ valueEntry.getData().length);
+ Versioned<byte[]> value = new Versioned<byte[]>(bytes, clock);
+ byte[] key = keyEntry.getData();
+
+ if(prevKey != null && (ByteUtils.compare(prevKey, key) != 0)) {
+ // there is a new key; write out the buffered values and
+ // previous key
+ int partition = BdbConvertData.abs(hash.hash(prevKey))
+ % (Math.max(1, totalPartitions));
+
+ OperationStatus putStatus = dstDB.put(null,
+ new DatabaseEntry(StoreBinaryFormat.makePrefixedKey(prevKey,
+ partition)),
+ new DatabaseEntry(StoreBinaryFormat.toByteArray(vals)));
+ if(OperationStatus.SUCCESS != putStatus) {
+ String errorStr = "Put failed with " + putStatus + " for key"
+ + BdbConvertData.writeAsciiString(prevKey);
+ logger.error(errorStr);
+ throw new Exception(errorStr);
+ }
+ vals = new ArrayList<Versioned<byte[]>>();
+ keyCount++;
+ }
+
+ vals.add(value);
+ prevKey = key;
+ }
+ if(vals.size() > 0) {
+ int partition = BdbConvertData.abs(hash.hash(prevKey)) % (Math.max(1, totalPartitions));
+ OperationStatus putStatus = dstDB.put(null,
+ new DatabaseEntry(StoreBinaryFormat.makePrefixedKey(prevKey,
+ partition)),
+ new DatabaseEntry(StoreBinaryFormat.toByteArray(vals)));
+ if(OperationStatus.SUCCESS != putStatus) {
+ String errorStr = "Put failed with " + putStatus + " for key"
+ + BdbConvertData.writeAsciiString(prevKey);
+ logger.error(errorStr);
+ throw new Exception(errorStr);
+ }
+ keyCount++;
+ }
+ logger.info("Completed " + scanCount + "entries and " + keyCount + " keys in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForSrc() {
+ return true;
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForDest() {
+ return false;
+ }
+}
View
167 src/java/voldemort/store/bdb/dataconversion/BdbConvertData.java
@@ -0,0 +1,167 @@
+package voldemort.store.bdb.dataconversion;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Logger;
+
+import voldemort.utils.CmdUtils;
+
+/**
+ *
+ * Conversion Utility to convert to-fro between 0.96 format and release 1.x+ BDB
+ * data formats
+ *
+ */
+public class BdbConvertData {
+
+ static Logger logger = Logger.getLogger(BdbConvertData.class);
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ OptionParser parser = new OptionParser();
+ parser.accepts("cluster-xml", "[REQUIRED] path to cluster.xml file for the server")
+ .withRequiredArg()
+ .describedAs("cluster-xml")
+ .ofType(String.class);
+ parser.accepts("src", "[REQUIRED] Source environment to be converted")
+ .withRequiredArg()
+ .describedAs("source-env")
+ .ofType(String.class);
+ parser.accepts("dest", "[REQUIRED] Destination environment to place converted data into")
+ .withRequiredArg()
+ .describedAs("destination-env")
+ .ofType(String.class);
+ parser.accepts("store", "[REQUIRED] Store/BDB database to convert")
+ .withRequiredArg()
+ .describedAs("store")
+ .ofType(String.class);
+ parser.accepts("from-format", "[REQUIRED] source format")
+ .withRequiredArg()
+ .describedAs("from-format")
+ .ofType(String.class);
+ parser.accepts("to-format", "[REQUIRED] destination format")
+ .withRequiredArg()
+ .describedAs("to-format")
+ .ofType(String.class);
+ parser.accepts("je-log-size", "[Optional] Size of the converted JE log files")
+ .withRequiredArg()
+ .describedAs("je-log-size")
+ .ofType(Integer.class);
+ parser.accepts("btree-nodemax", "[Optional] Fanout of converted Btree nodes")
+ .withRequiredArg()
+ .describedAs("btree-nodemax")
+ .ofType(Integer.class);
+
+ OptionSet options = parser.parse(args);
+
+ if(!options.has("cluster-xml") || !options.has("src") || !options.has("dest")
+ || !options.has("store") || !options.has("from-format") || !options.has("to-format")) {
+ parser.printHelpOn(System.err);
+ System.exit(0);
+ }
+
+ String clusterXmlPath = CmdUtils.valueOf(options, "cluster-xml", null);
+ String sourceEnvPath = CmdUtils.valueOf(options, "src", null);
+ String destEnvPath = CmdUtils.valueOf(options, "dest", null);
+ String storeName = CmdUtils.valueOf(options, "store", null);
+
+ String fromFormat = CmdUtils.valueOf(options, "from-format", null);
+ String toFormat = CmdUtils.valueOf(options, "to-format", null);
+
+ if(!isValidFormat(fromFormat) || !isValidFormat(toFormat)) {
+ parser.printHelpOn(System.err);
+ System.exit(0);
+ }
+
+ Integer logFileSize = CmdUtils.valueOf(options, "je-log-size", 60);
+ Integer nodeMax = CmdUtils.valueOf(options, "btree-nodemax", 512);
+
+ AbstractBdbConversion conversion = null;
+ try {
+ if(fromFormat.equals("Base") && toFormat.equals("NewDup")) {
+ conversion = new BdbConvertBaseToNewDup(storeName,
+ clusterXmlPath,
+ sourceEnvPath,
+ destEnvPath,
+ logFileSize,
+ nodeMax);
+ } else if(fromFormat.equals("Base") && toFormat.equals("PidScan")) {
+ conversion = new BdbConvertBaseToPidScan(storeName,
+ clusterXmlPath,
+ sourceEnvPath,
+ destEnvPath,
+ logFileSize,
+ nodeMax);
+
+ } else if(fromFormat.equals("NewDup") && toFormat.equals("PidScan")) {
+ conversion = new BdbConvertNewDupToPidScan(storeName,
+ clusterXmlPath,
+ sourceEnvPath,
+ destEnvPath,
+ logFileSize,
+ nodeMax);
+
+ } else if(fromFormat.equals("PidScan") && toFormat.equals("NewDup")) {
+ conversion = new BdbRevertPidScanToNewDup(storeName,
+ clusterXmlPath,
+ sourceEnvPath,
+ destEnvPath,
+ logFileSize,
+ nodeMax);
+
+ } else if(fromFormat.equals("PidScan") && toFormat.equals("Base")) {
+ conversion = new BdbRevertPidScanToBase(storeName,
+ clusterXmlPath,
+ sourceEnvPath,
+ destEnvPath,
+ logFileSize,
+ nodeMax);
+
+ } else if(fromFormat.equals("NewDup") && toFormat.equals("Base")) {
+ conversion = new BdbRevertNewDupToBase(storeName,
+ clusterXmlPath,
+ sourceEnvPath,
+ destEnvPath,
+ logFileSize,
+ nodeMax);
+ } else {
+ throw new Exception("Invalid conversion. Please check READMEFIRST file");
+ }
+ // start the actual data conversion
+ conversion.transfer();
+ } catch(Exception e) {
+ logger.error("Error converting data", e);
+ } finally {
+ if(conversion != null)
+ conversion.close();
+ }
+ }
+
+ static boolean isValidFormat(String format) {
+ if(format == null)
+ return false;
+ return format.equals("Base") || format.equals("NewDup") || format.equals("PidScan");
+ }
+
+ /**
+ * Returns a Base64 encoded version of the byte array
+ *
+ * @param key
+ * @return
+ */
+ static String writeAsciiString(byte[] bytes) {
+ return new String(Base64.encodeBase64(bytes));
+ }
+
+ static int abs(int a) {
+ if(a >= 0)
+ return a;
+ else if(a != Integer.MIN_VALUE)
+ return -a;
+ return Integer.MAX_VALUE;
+ }
+}
View
74 src/java/voldemort/store/bdb/dataconversion/BdbConvertNewDupToPidScan.java
@@ -0,0 +1,74 @@
+package voldemort.store.bdb.dataconversion;
+
+import java.util.List;
+
+import voldemort.store.StoreBinaryFormat;
+import voldemort.utils.FnvHashFunction;
+import voldemort.utils.HashFunction;
+import voldemort.versioning.Versioned;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+
+public class BdbConvertNewDupToPidScan extends AbstractBdbConversion {
+
+ BdbConvertNewDupToPidScan(String storeName,
+ String clusterXmlPath,
+ String sourceEnvPath,
+ String destEnvPath,
+ int logFileSize,
+ int nodeMax) throws Exception {
+ super(storeName, clusterXmlPath, sourceEnvPath, destEnvPath, logFileSize, nodeMax);
+ }
+
+ @Override
+ public void transfer() throws Exception {
+ cursor = srcDB.openCursor(null, null);
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+ HashFunction hash = new FnvHashFunction();
+ int totalPartitions = cluster.getNumberOfPartitions();
+
+ List<Versioned<byte[]>> vals;
+ long startTime = System.currentTimeMillis();
+ int scanCount = 0;
+ int keyCount = 0;
+ while(cursor.getNext(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS) {
+ keyCount++;
+
+ vals = StoreBinaryFormat.fromByteArray(valueEntry.getData());
+ scanCount += vals.size();
+
+ int partition = BdbConvertData.abs(hash.hash(keyEntry.getData()))
+ % (Math.max(1, totalPartitions));
+
+ OperationStatus putStatus = dstDB.put(null,
+ new DatabaseEntry(StoreBinaryFormat.makePrefixedKey(keyEntry.getData(),
+ partition)),
+ valueEntry);
+ if(OperationStatus.SUCCESS != putStatus) {
+ String errorStr = "Put failed with " + putStatus + " for key"
+ + BdbConvertData.writeAsciiString(keyEntry.getData());
+ logger.error(errorStr);
+ throw new Exception(errorStr);
+ }
+
+ if(scanCount % 1000000 == 0)
+ logger.info("Reverted " + scanCount + "entries in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+ logger.info("Converted " + scanCount + "entries and " + keyCount + " keys in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForSrc() {
+ return false;
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForDest() {
+ return false;
+ }
+}
View
71 src/java/voldemort/store/bdb/dataconversion/BdbRevertNewDupToBase.java
@@ -0,0 +1,71 @@
+package voldemort.store.bdb.dataconversion;
+
+import java.util.List;
+
+import voldemort.serialization.IdentitySerializer;
+import voldemort.serialization.VersionedSerializer;
+import voldemort.store.StoreBinaryFormat;
+import voldemort.versioning.Versioned;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+
+public class BdbRevertNewDupToBase extends AbstractBdbConversion {
+
+ BdbRevertNewDupToBase(String storeName,
+ String clusterXmlPath,
+ String sourceEnvPath,
+ String destEnvPath,
+ int logFileSize,
+ int nodeMax) throws Exception {
+ super(storeName, clusterXmlPath, sourceEnvPath, destEnvPath, logFileSize, nodeMax);
+ }
+
+ @Override
+ public void transfer() throws Exception {
+ cursor = srcDB.openCursor(null, null);
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+ VersionedSerializer<byte[]> versionedSerializer = new VersionedSerializer<byte[]>(new IdentitySerializer());
+
+ List<Versioned<byte[]>> vals;
+ long startTime = System.currentTimeMillis();
+
+ int scanCount = 0;
+ int keyCount = 0;
+ while(cursor.getNext(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS) {
+ keyCount++;
+
+ vals = StoreBinaryFormat.fromByteArray(valueEntry.getData());
+
+ for(Versioned<byte[]> val: vals) {
+ OperationStatus putStatus = dstDB.put(null,
+ keyEntry,
+ new DatabaseEntry(versionedSerializer.toBytes(val)));
+ if(OperationStatus.SUCCESS != putStatus) {
+ String errorStr = "Put failed with " + putStatus + " for key"
+ + BdbConvertData.writeAsciiString(keyEntry.getData());
+ logger.error(errorStr);
+ throw new Exception(errorStr);
+ }
+ scanCount++;
+ }
+ if(scanCount % 1000000 == 0)
+ logger.info("Reverted " + scanCount + "entries in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+ logger.info("Reverted " + scanCount + "entries and " + keyCount + " keys in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForSrc() {
+ return false;
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForDest() {
+ return true;
+ }
+}
View
73 src/java/voldemort/store/bdb/dataconversion/BdbRevertPidScanToBase.java
@@ -0,0 +1,73 @@
+package voldemort.store.bdb.dataconversion;
+
+import java.util.List;
+
+import voldemort.serialization.IdentitySerializer;
+import voldemort.serialization.VersionedSerializer;
+import voldemort.store.StoreBinaryFormat;
+import voldemort.versioning.Versioned;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+
+public class BdbRevertPidScanToBase extends AbstractBdbConversion {
+
+ BdbRevertPidScanToBase(String storeName,
+ String clusterXmlPath,
+ String sourceEnvPath,
+ String destEnvPath,
+ int logFileSize,
+ int nodeMax) throws Exception {
+ super(storeName, clusterXmlPath, sourceEnvPath, destEnvPath, logFileSize, nodeMax);
+ }
+
+ @Override
+ public void transfer() throws Exception {
+ cursor = srcDB.openCursor(null, null);
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+ VersionedSerializer<byte[]> versionedSerializer = new VersionedSerializer<byte[]>(new IdentitySerializer());
+
+ List<Versioned<byte[]>> vals;
+ long startTime = System.currentTimeMillis();
+
+ int scanCount = 0;
+ int keyCount = 0;
+ while(cursor.getNext(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS) {
+ keyCount++;
+
+ vals = StoreBinaryFormat.fromByteArray(valueEntry.getData());
+ // pull out the real key
+ byte[] stripedKey = StoreBinaryFormat.extractKey(keyEntry.getData());
+
+ for(Versioned<byte[]> val: vals) {
+ OperationStatus putStatus = dstDB.put(null,
+ new DatabaseEntry(stripedKey),
+ new DatabaseEntry(versionedSerializer.toBytes(val)));
+ if(OperationStatus.SUCCESS != putStatus) {
+ String errorStr = "Put failed with " + putStatus + " for key"
+ + BdbConvertData.writeAsciiString(stripedKey);
+ logger.error(errorStr);
+ throw new Exception(errorStr);
+ }
+ scanCount++;
+ }
+ if(scanCount % 1000000 == 0)
+ logger.info("Reverted " + scanCount + "entries in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+ logger.info("Reverted " + scanCount + "entries and " + keyCount + " keys in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForSrc() {
+ return false;
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForDest() {
+ return true;
+ }
+}
View
66 src/java/voldemort/store/bdb/dataconversion/BdbRevertPidScanToNewDup.java
@@ -0,0 +1,66 @@
+package voldemort.store.bdb.dataconversion;
+
+import java.util.List;
+
+import voldemort.store.StoreBinaryFormat;
+import voldemort.versioning.Versioned;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+
+public class BdbRevertPidScanToNewDup extends AbstractBdbConversion {
+
+ BdbRevertPidScanToNewDup(String storeName,
+ String clusterXmlPath,
+ String sourceEnvPath,
+ String destEnvPath,
+ int logFileSize,
+ int nodeMax) throws Exception {
+ super(storeName, clusterXmlPath, sourceEnvPath, destEnvPath, logFileSize, nodeMax);
+ }
+
+ @Override
+ public void transfer() throws Exception {
+ cursor = srcDB.openCursor(null, null);
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+
+ List<Versioned<byte[]>> vals;
+ long startTime = System.currentTimeMillis();
+ int scanCount = 0;
+ int keyCount = 0;
+ while(cursor.getNext(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS) {
+ keyCount++;
+
+ vals = StoreBinaryFormat.fromByteArray(valueEntry.getData());
+ scanCount += vals.size();
+
+ // pull out the real key
+ byte[] stripedKey = StoreBinaryFormat.extractKey(keyEntry.getData());
+ OperationStatus putStatus = dstDB.put(null, new DatabaseEntry(stripedKey), valueEntry);
+ if(OperationStatus.SUCCESS != putStatus) {
+ String errorStr = "Put failed with " + putStatus + " for key"
+ + BdbConvertData.writeAsciiString(keyEntry.getData());
+ logger.error(errorStr);
+ throw new Exception(errorStr);
+ }
+
+ if(scanCount % 1000000 == 0)
+ logger.info("Reverted " + scanCount + "entries in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+ logger.info("Reverted " + scanCount + "entries and " + keyCount + " keys in "
+ + (System.currentTimeMillis() - startTime) / 1000 + " secs");
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForSrc() {
+ return false;
+ }
+
+ @Override
+ public boolean areDuplicatesNeededForDest() {
+ return false;
+ }
+}
View
110 src/java/voldemort/store/bdb/dataconversion/READMEFIRST
@@ -0,0 +1,110 @@
+This directory contains utility to convert BDB JE data between different versions of Voldemort.
+
+Need for Conversion
+-------------------
+Voldemort has been using "sorted duplicates" feature of BDB JE to handle
+conflicting writes to the same key. At the very minimum, the conversion gets
+rid of BDB sorted duplicates support and handles duplicates in the Voldemort
+storage layer itself. The decision was made after months of closely working
+with Oracle JE team, to understand the factors affecting performance.
+
+Data Formats
+------------
+This section describes the data formats themselves.
+
+1) Base Format (Base)
+---------------------
+This is the format used by Voldemort up until 1.x, relying on BDB JE for
+duplicate handling
+
+Disadvantages:
+-- The manner in which BDB JE handles duplicates is not suitable for an
+ application with small percent of 2-3 duplicates i.e Voldemort.
+-- Data bloat issue that prevented us from migrating to any higher 4.x version
+ to be able to control cache eviction
+-- Incompatible with how duplicates are handled in JE5.
+-- May incur additional locking costs for the "duplicates" subtree
+
+2) New duplicate format (NewDup)
+--------------------------------
+This format is supported from release 1.x, where Voldemort storage layer
+handles duplicates and BDB JE version is bumped up to JE 4.1.17
+
+Advantages:
+-- Ability to move data off disk. This is very GC friendly, relying on OS page
+ cache for the data and using the JVM heap only for index. This is achieved
+ by setting "bdb.cache.evictln" server parameter to "true"
+-- Ability to evict data brought into the cache during scans, minimize impact
+ on online traffic (Restore, Rebalance, Retention). This is achieved by
+ setting "bdb.minimize.scan.impact" to "true"
+-- Thinner storage layer. eg: BdbStorageEngine.put() does not incur the cost
+ of an additional delete()
+-- General speed up due to elimination of duplicates
+
+This format is the minimum requirement to be able to upgrade to 1.x & higher
+
+3) Partition Scan format (PidScan)
+----------------------------------
+This is a super set of 'NewDup' format, supported 1.x upwards. In addition to
+eliminating duplicates and upgrading to JE 4.1.17, it adds a 2 byte prefix
+representing the partition id to each key.
+
+Advantages:
+-- Speed up Restore and Rebalancing linearly to the number of partitions
+ actually fetched. (which means much shorter degraded mode performance)
+
+This is an optional format. You can turn it off, by setting
+bdb.prefix.keys.with.partitionid=false, if you don't like for some reason
+
+Note : We have not seen the extra 2 bytes cause any overhead to online
+performance
+
+IMPORTANT: IT IS REQUIRED TO CONVERT TO EITHER 'NewDup' or 'PidScan' TO RUN
+VOLDEMORT WITH BDB, STARTING RELEASE 1.x
+
+Running the Conversion Utility
+------------------------------
+The tool provides the ability to convert one database from a source environment
+to a destination environment. You need to run the tool for each of the databases
+or voldemort store you have. You can bring one Voldemort server at a time and
+perform the conversion and bring it up on the appropriate release
+
+Note: For users running with "bdb.one.env.per.store=false", it means you will
+have to run the tool with the same --src --dest options for each database
+contained.
+
+In addition to BDB environment locations, the tool needs the cluster.xml to generate
+the partition prefix.
+
+$./voldemort-convert-bdb.sh --src <Required: Path to source bdb environment>
+ --dest <Required: Path to place converted new environment>
+ --store <Required: BDB database (voldemort store) name>
+ --cluster-xml <Required: Path to cluster.xml>
+ --from-format <Required: Format to convert FROM, one of the 3
+ strings 'Base','NewDup','PidScan'>
+ --to-format <Required: Format to convert TO, one of the 3
+ strings 'Base','NewDup','PidScan'>
+ --je-log-size <Optional: Size in MB for the new JE log files,
+ Default:60>
+ --btree-nodemax <Optional: Btree fan out, Default: 512>
+
+We recommend you run the following to move to release 1.x & up.
+
+$./voldemort-convert-bdb.sh --src /path/to/src/env
+ --dest /path/to/dest/env
+ --store teststore
+ --cluster-xml /path/to/cluster/xml
+ --from-format Base
+ --to-format PidScan
+
+
+
+
+
+
+
+
+
+
+
+

0 comments on commit 5e48136

Please sign in to comment.
Something went wrong with that request. Please try again.