Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: voldemort/voldemort
base: master
...
head fork: voldemort/voldemort
compare: release-096li4
Checking mergeability… Don't worry, you can still create the pull request.
  • 4 commits
  • 21 files changed
  • 0 commit comments
  • 1 contributor
View
2  .classpath
@@ -40,7 +40,7 @@
<classpathentry kind="lib" path="lib/protobuf-java-2.3.0.jar"/>
<classpathentry kind="lib" path="contrib/ec2-testing/lib/typica.jar"/>
<classpathentry kind="lib" path="lib/google-collect-1.0.jar"/>
- <classpathentry kind="lib" path="lib/je-4.0.92.jar"/>
+ <classpathentry kind="lib" path="lib/je-4.1.17.jar"/>
<classpathentry kind="lib" path="lib/paranamer-2.1.jar"/>
<classpathentry kind="lib" path="lib/jackson-mapper-asl-1.4.0.jar"/>
<classpathentry kind="lib" path="lib/jackson-core-asl-1.4.0.jar"/>
View
2  build.properties
@@ -37,4 +37,4 @@ tomcat.manager.password=tomcat
tomcat.context=/voldemort
## Release
-curr.release=0.90.1
+curr.release=0.96.li4
View
BIN  lib/google-collect-1.0-rc2.jar
Binary file not shown
View
BIN  lib/google-collect-1.0.jar
Binary file not shown
View
BIN  lib/je-4.0.92.jar
Binary file not shown
View
BIN  lib/je-4.1.17.jar
Binary file not shown
View
65 release_notes.txt
@@ -1,3 +1,68 @@
+Release 0.96.li4 (LinkedIn release) on 08/02/2012
+* Upgrading BDB JE version to 4.1.17 on top of 0.96.li3
+Note : This is experimental. Once verified, will merge to master
+
+Release 0.96.li3 (LinkedIn release) on 08/02/2012
+
+* Server storage conflict handling done in Voldemort code
+* Turning off duplicates in BDB which has several sideeffects and performance
+* penalty
+* --mirror-from-url option added to admin tool to copy data from another
+* voldemort server
+
+Note : This is an experimental release, and will be merged back to master
+after prod verification.
+
+Release 0.96.li2 (LinkedIn release) on 08/01/2012
+
+Changes made since 0.96.li1
+* HFTP performance issue bug fix (fix in byte buffer and copy process)
+* Added mechanism to isolate BDB cache usage among stores
+* Enhanced debug logging (for traffic analysis).
+* Python client bug fixes (from pull request)
+* Tooling improvement (generate_cluster_xml.py)
+
+Release 0.96.li1 (LinkedIn release) on 07/12/2012
+
+Changes made since 0.95.li9
+* Reuploaded the jar
+
+Changes made since 0.95.li4
+* Added configurable option to interrupt service being unscheduled
+* Added rollback capability to the Admin tool
+* Added tracking functionality for scan permit owner
+* Added zoned option for restore from replicas
+* Enabled NIO connector by default
+* Added client side MBeans for multiple socket pool metrics
+* Shortened some long tests and created new target called "junit-long" that
+* includes them
+* Finer timeouts and partial getalls
+* Refactored to share VodemortOpCode between classes
+* Fixed a donor-based rebalancing bug and added a unit test
+* Fixed a bug that will return null during concurrent read and writes
+* Added jmxId to Mbean name for failureDetector and storefactory
+* Prevent preferred reads from crossing zones
+* Added additional debug messages
+
+Release 0.95.li4 (LinkedIn release) on 04/13/2012
+
+Changes made since 0.95.li3
+* Added error count Mbeans
+* Concurrency bug fix in Donor based rebalancing
+
+Release 0.95.li3 (LinkedIn release) on 03/22/2012
+
+Changes made since 0.95.li2
+ * HOTFIX - DDS-2536: Fixed FailureDetector to handle host swap
+
+Release 0.95.li2 (LinkedIn release) on 03/21/2012
+
+Changes made since 0.95.li1
+ * Hot fix for SNA-10109. Replaced exception by log message in Histogram
+ * Replaced int by long to avoid overflow in Histogram and RequestCounter
+
+Release 0.95.li1 (LinkedIn release) on 03/07/2012
+
Release 0.90.1 on 10/10/2011
Changes made since 0.90
View
37 src/java/voldemort/VoldemortAdminTool.java
@@ -224,6 +224,14 @@ public static void main(String[] args) throws Exception {
.withRequiredArg()
.describedAs("size-in-mb")
.ofType(Long.class);
+ parser.accepts("mirror-from-url", "Cluster url to mirror data from")
+ .withRequiredArg()
+ .describedAs("mirror-cluster-bootstrap-url")
+ .ofType(String.class);
+ parser.accepts("mirror-node", "Node id in the mirror cluster to mirror from")
+ .withRequiredArg()
+ .describedAs("id-of-mirror-node")
+ .ofType(Integer.class);
OptionSet options = parser.parse(args);
@@ -235,6 +243,8 @@ public static void main(String[] args) throws Exception {
Set<String> missing = CmdUtils.missing(options, "url", "node");
if(missing.size() > 0) {
// Not the most elegant way to do this
+ // basically check if only "node" is missing for these set of
+ // options; all these can live without explicit node ids
if(!(missing.equals(ImmutableSet.of("node"))
&& (options.has("add-stores") || options.has("delete-store")
|| options.has("ro-metadata") || options.has("set-metadata")
@@ -321,11 +331,17 @@ public static void main(String[] args) throws Exception {
}
ops += "f";
}
+ if(options.has("mirror-from-url")) {
+ if(!options.has("mirror-node")) {
+ Utils.croak("Specify the mirror node to fetch from");
+ }
+ ops += "h";
+ }
if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
+ "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, "
+ "set-metadata, check-metadata, key-distribution, clear-rebalancing-metadata, async, "
- + "repair-job, native-backup, rollback, reserve-memory) must be specified");
+ + "repair-job, native-backup, rollback, reserve-memory, mirror-from-url) must be specified");
}
List<String> storeNames = null;
@@ -494,6 +510,21 @@ public static void main(String[] args) throws Exception {
long reserveMB = (Long) options.valueOf("reserve-memory");
adminClient.reserveMemory(nodeId, storeNames, reserveMB);
}
+ if(ops.contains("h")) {
+ if(nodeId == -1) {
+ System.err.println("Cannot run mirroring without node id");
+ System.exit(1);
+ }
+ Integer mirrorNodeId = CmdUtils.valueOf(options, "mirror-node", -1);
+ if(mirrorNodeId == -1) {
+ System.err.println("Cannot run mirroring without mirror node id");
+ System.exit(1);
+ }
+ adminClient.mirrorData(nodeId,
+ mirrorNodeId,
+ (String) options.valueOf("mirror-from-url"),
+ storeNames);
+ }
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
@@ -597,6 +628,10 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE
stream.println("\t\t./bin/voldemort-admin-tool.sh --fetch-entries --url [url] --node [node-id]");
stream.println("\t9) Update entries for a set of stores using the output from a binary dump fetch entries");
stream.println("\t\t./bin/voldemort-admin-tool.sh --update-entries [folder path from output of --fetch-entries --outdir] --url [url] --node [node-id] --stores [comma-separated list of store names]");
+ stream.println("\t10) Mirror data from another voldemort server (possibly in another cluster) for specified stores");
+ stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-from-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id] --stores [comma-separated-list-of-store-names]");
+ stream.println("\t11) Mirror data from another voldemort server (possibly in another cluster) for all stores in current cluster");
+ stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-from-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id]");
stream.println();
stream.println("READ-ONLY OPERATIONS");
stream.println("\t1) Retrieve metadata information of read-only data for a particular node and all stores");
View
102 src/java/voldemort/client/protocol/admin/AdminClient.java
@@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -63,6 +64,7 @@
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StoreDefinition;
+import voldemort.store.StoreUtils;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.metadata.MetadataStore.VoldemortState;
import voldemort.store.mysql.MysqlStorageConfiguration;
@@ -2487,4 +2489,104 @@ public void reserveMemory(int nodeId, List<String> stores, long sizeInMB) {
logger.info("Finished reserving memory for store : " + storeName);
}
}
+
+ /**
+ * Mirror data from another voldemort server
+ *
+ * @param nodeId node in the current cluster to mirror to
+ * @param nodeIdToMirrorFrom node from which to mirror data
+ * @param urlToMirrorFrom cluster bootstrap url to mirror from
+ * @param stores set of stores to be mirrored
+ *
+ */
+ public void mirrorData(final int nodeId,
+ final int nodeIdToMirrorFrom,
+ final String urlToMirrorFrom,
+ List<String> stores) {
+ final AdminClient mirrorAdminClient = new AdminClient(urlToMirrorFrom,
+ new AdminClientConfig());
+ final AdminClient currentAdminClient = this;
+
+ // determine the partitions residing on the mirror node
+ Node mirrorNode = mirrorAdminClient.getAdminClientCluster().getNodeById(nodeIdToMirrorFrom);
+ Node currentNode = currentAdminClient.getAdminClientCluster().getNodeById(nodeId);
+
+ if(mirrorNode == null) {
+ logger.error("Mirror node specified does not exist in the mirror cluster");
+ return;
+ }
+
+ if(currentNode == null) {
+ logger.error("node specified does not exist in the current cluster");
+ return;
+ }
+
+ // compare the mirror-from and mirrored-to nodes have same set of stores
+ List<String> currentStoreList = StoreUtils.getStoreNames(currentAdminClient.getRemoteStoreDefList(nodeId)
+ .getValue(),
+ true);
+ List<String> mirrorStoreList = StoreUtils.getStoreNames(mirrorAdminClient.getRemoteStoreDefList(nodeIdToMirrorFrom)
+ .getValue(),
+ true);
+ if(stores == null)
+ stores = currentStoreList;
+
+ if(!currentStoreList.containsAll(stores) || !mirrorStoreList.containsAll(stores)) {
+ logger.error("Make sure the set of stores match on both sides");
+ return;
+ }
+
+ // check if the partitions are same on both the nodes
+ if(!currentNode.getPartitionIds().equals(mirrorNode.getPartitionIds())) {
+ logger.error("Make sure the same set of partitions exist on both sides");
+ return;
+ }
+
+ ExecutorService executors = Executors.newFixedThreadPool(stores.size(),
+ new ThreadFactory() {
+
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setName("mirror-data-thread");
+ return thread;
+ }
+ });
+
+ final List<Integer> partitionIdList = mirrorNode.getPartitionIds();
+ final CountDownLatch waitLatch = new CountDownLatch(stores.size());
+ try {
+ for(final String storeName: stores)
+ executors.submit(new Runnable() {
+
+ public void run() {
+ try {
+ logger.info("Mirroring data for store " + storeName + " from node "
+ + nodeIdToMirrorFrom + "(" + urlToMirrorFrom + ") to node "
+ + nodeId + " partitions:" + partitionIdList);
+
+ Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator = mirrorAdminClient.fetchEntries(nodeIdToMirrorFrom,
+ storeName,
+ partitionIdList,
+ null,
+ false);
+ currentAdminClient.updateEntries(nodeId, storeName, iterator, null);
+
+ logger.info("Mirroring data for store:" + storeName + " from node "
+ + nodeIdToMirrorFrom + " completed.");
+ } catch(Exception e) {
+ logger.error("Mirroring operation for store " + storeName
+ + "from node " + nodeIdToMirrorFrom + " failed.", e);
+ } finally {
+ waitLatch.countDown();
+ }
+ }
+ });
+ waitLatch.await();
+ } catch(Exception e) {
+ logger.error("Mirroring operation failed.", e);
+ } finally {
+ executors.shutdown();
+ logger.info("Finished mirroring data.");
+ }
+ }
}
View
72 src/java/voldemort/server/VoldemortConfig.java
@@ -65,7 +65,6 @@
private long bdbCacheSize;
private boolean bdbWriteTransactions;
private boolean bdbFlushTransactions;
- private boolean bdbSortedDuplicates;
private String bdbDataDirectory;
private long bdbMaxLogFileSize;
private int bdbBtreeFanout;
@@ -87,6 +86,9 @@
private long bdbStatsCacheTtlMs;
private boolean bdbExposeSpaceUtilization;
private long bdbMinimumSharedCache;
+ private boolean bdbCleanerLazyMigration;
+ private boolean bdbCacheModeEvictLN;
+ private boolean bdbMinimizeScanImpact;
private String mysqlUsername;
private String mysqlPassword;
@@ -214,7 +216,6 @@ public VoldemortConfig(Props props) {
this.bdbBtreeFanout = props.getInt("bdb.btree.fanout", 512);
this.bdbCheckpointBytes = props.getLong("bdb.checkpoint.interval.bytes", 20 * 1024 * 1024);
this.bdbCheckpointMs = props.getLong("bdb.checkpoint.interval.ms", 30 * Time.MS_PER_SECOND);
- this.bdbSortedDuplicates = props.getBoolean("bdb.enable.sorted.duplicates", true);
this.bdbOneEnvPerStore = props.getBoolean("bdb.one.env.per.store", false);
this.bdbCleanerMinFileUtilization = props.getInt("bdb.cleaner.min.file.utilization", 5);
this.bdbCleanerMinUtilization = props.getInt("bdb.cleaner.minUtilization", 50);
@@ -231,6 +232,9 @@ public VoldemortConfig(Props props) {
this.bdbStatsCacheTtlMs = props.getLong("bdb.stats.cache.ttl.ms", 5 * Time.MS_PER_SECOND);
this.bdbExposeSpaceUtilization = props.getBoolean("bdb.expose.space.utilization", true);
this.bdbMinimumSharedCache = props.getLong("bdb.minimum.shared.cache", 0);
+ this.bdbCleanerLazyMigration = props.getBoolean("bdb.cleaner.lazy.migration", true);
+ this.bdbCacheModeEvictLN = props.getBoolean("bdb.cache.evictln", false);
+ this.bdbMinimizeScanImpact = props.getBoolean("bdb.minimize.scan.impact", false);
this.readOnlyBackups = props.getInt("readonly.backups", 1);
this.readOnlySearchStrategy = props.getString("readonly.search.strategy",
@@ -792,6 +796,62 @@ public void setBdbBtreeFanout(int bdbBtreeFanout) {
}
/**
+ * If true, Cleaner offloads some work to application threads, to keep up
+ * with the write rate.
+ *
+ * <ul>
+ * <li>property: "bdb.cleaner.lazy.migration"</li>
+ * <li>default : true</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbCleanerLazyMigration() {
+ return bdbCleanerLazyMigration;
+ }
+
+ public final void setBdbCleanerLazyMigration(boolean bdbCleanerLazyMigration) {
+ this.bdbCleanerLazyMigration = bdbCleanerLazyMigration;
+ }
+
+ /**
+ * If true, BDB will not cache data in the JVM.
+ *
+ * <ul>
+ * <li>Property : "bdb.cache.evictln"</li>
+ * <li>Default : false</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbCacheModeEvictLN() {
+ return bdbCacheModeEvictLN;
+ }
+
+ public void setBdbCacheModeEvictLN(boolean bdbCacheModeEvictLN) {
+ this.bdbCacheModeEvictLN = bdbCacheModeEvictLN;
+ }
+
+ /**
+ * If true, attempts are made to minimize impact to BDB cache during scan
+ * jobs
+ *
+ * <ul>
+ * <li>Property : "bdb.minimize.scan.impact"</li>
+ * <li>Default : false</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbMinimizeScanImpact() {
+ return bdbMinimizeScanImpact;
+ }
+
+ public void setBdbMinimizeScanImpact(boolean bdbMinimizeScanImpact) {
+ this.bdbMinimizeScanImpact = bdbMinimizeScanImpact;
+ }
+
+ /**
* The comfortable number of threads the threadpool will attempt to
* maintain. Specified by "core.threads" default: max(1, floor(0.5 *
* max.threads))
@@ -1235,14 +1295,6 @@ public void setBdbWriteTransactions(boolean bdbWriteTransactions) {
this.bdbWriteTransactions = bdbWriteTransactions;
}
- public boolean isBdbSortedDuplicatesEnabled() {
- return this.bdbSortedDuplicates;
- }
-
- public void setBdbSortedDuplicates(boolean enable) {
- this.bdbSortedDuplicates = enable;
- }
-
public void setBdbOneEnvPerStore(boolean bdbOneEnvPerStore) {
this.bdbOneEnvPerStore = bdbOneEnvPerStore;
}
View
3  src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java
@@ -356,7 +356,8 @@ public boolean isComplete() {
writeThrottler.maybeThrottle(writtenLast);
writtenLast = slopSize(head);
- deleteBatch.add(Pair.create(head.getValue().makeKey(), head.getVersion()));
+ deleteBatch.add(Pair.create(head.getValue().makeKey(),
+ (Version) head.getVersion()));
return head;
}
}
View
80 src/java/voldemort/store/StoreBinaryFormat.java
@@ -0,0 +1,80 @@
+package voldemort.store;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import voldemort.VoldemortException;
+import voldemort.utils.ByteUtils;
+import voldemort.versioning.VectorClock;
+import voldemort.versioning.Versioned;
+
+/**
+ * Defines a generic on-disk data format for versioned voldemort data
+ *
+ * The format of the values stored on disk. The format is
+ * VERSION - 1 byte
+ * ------------repeating-------------------
+ * CLOCK - variable length, self delimiting
+ * NUM_CLOCK_ENTRIES - 2 bytes (short)
+ * VERSION_SIZE - 1 byte
+ * --------------- repeating ----------
+ * NODE_ID - 2 bytes (short)
+ * VERSION - VERSION_SIZE bytes
+ * ------------------------------------
+ * VALUE - variable length
+ * VALUE_SIZE - 4 bytes
+ * VALUE_BYTES - VALUE_SIZE bytes
+ * ----------------------------------------
+ */
+public class StoreBinaryFormat {
+
+ /* In the future we can use this to handle format changes */
+ private static final byte VERSION = 0;
+
+ public static byte[] toByteArray(List<Versioned<byte[]>> values) {
+ int size = 1;
+ for(Versioned<byte[]> v: values) {
+ size += v.getVersion().sizeInBytes();
+ size += ByteUtils.SIZE_OF_INT;
+ size += v.getValue().length;
+ }
+ byte[] bytes = new byte[size];
+ int pos = 1;
+ bytes[0] = VERSION;
+ for(Versioned<byte[]> v: values) {
+ pos += v.getVersion().toBytes(bytes, pos);
+ int len = v.getValue().length;
+ ByteUtils.writeInt(bytes, len, pos);
+ pos += ByteUtils.SIZE_OF_INT;
+ System.arraycopy(v.getValue(), 0, bytes, pos, len);
+ pos += len;
+ }
+ if(pos != bytes.length)
+ throw new VoldemortException((bytes.length - pos)
+ + " straggling bytes found in value (this should not be possible)!");
+ return bytes;
+ }
+
+ public static List<Versioned<byte[]>> fromByteArray(byte[] bytes) {
+ if(bytes.length < 1)
+ throw new VoldemortException("Invalid value length: " + bytes.length);
+ if(bytes[0] != VERSION)
+ throw new VoldemortException("Unexpected version number in value: " + bytes[0]);
+ int pos = 1;
+ List<Versioned<byte[]>> vals = new ArrayList<Versioned<byte[]>>(2);
+ while(pos < bytes.length) {
+ VectorClock clock = new VectorClock(bytes, pos);
+ pos += clock.sizeInBytes();
+ int valueSize = ByteUtils.readInt(bytes, pos);
+ pos += ByteUtils.SIZE_OF_INT;
+ byte[] val = new byte[valueSize];
+ System.arraycopy(bytes, pos, val, 0, valueSize);
+ pos += valueSize;
+ vals.add(Versioned.value(val, clock));
+ }
+ if(pos != bytes.length)
+ throw new VoldemortException((bytes.length - pos)
+ + " straggling bytes found in value (this should not be possible)!");
+ return vals;
+ }
+}
View
16 src/java/voldemort/store/StoreUtils.java
@@ -18,6 +18,7 @@
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -199,4 +200,19 @@ public static StoreDefinition getStoreDef(List<StoreDefinition> list, String nam
return def;
return null;
}
+
+ /**
+ * Get the list of store names from a list of store definitions
+ *
+ * @param list
+ * @param ignoreViews
+ * @return list of store names
+ */
+ public static List<String> getStoreNames(List<StoreDefinition> list, boolean ignoreViews) {
+ List<String> storeNameSet = new ArrayList<String>();
+ for(StoreDefinition def: list)
+ if(!def.isView() || !ignoreViews)
+ storeNameSet.add(def.getName());
+ return storeNameSet;
+ }
}
View
11 src/java/voldemort/store/bdb/BdbRuntimeConfig.java
@@ -14,10 +14,12 @@
public static final long DEFAULT_STATS_CACHE_TTL_MS = 5 * Time.MS_PER_SECOND;
public static final LockMode DEFAULT_LOCK_MODE = LockMode.READ_UNCOMMITTED;
public static final boolean DEFAULT_EXPOSE_SPACE_UTIL = true;
+ public static final boolean DEFAULT_MINIMIZE_SCAN_IMPACT = false;
private long statsCacheTtlMs = DEFAULT_STATS_CACHE_TTL_MS;
private LockMode lockMode = DEFAULT_LOCK_MODE;
private boolean exposeSpaceUtil = DEFAULT_EXPOSE_SPACE_UTIL;
+ private boolean minimizeScanImpact = DEFAULT_MINIMIZE_SCAN_IMPACT;
public BdbRuntimeConfig() {
@@ -29,6 +31,7 @@ public BdbRuntimeConfig(VoldemortConfig config) {
setLockMode(lockMode);
setStatsCacheTtlMs(config.getBdbStatsCacheTtlMs());
setExposeSpaceUtil(config.getBdbExposeSpaceUtilization());
+ setMinimizeScanImpact(config.getBdbMinimizeScanImpact());
}
public long getStatsCacheTtlMs() {
@@ -56,4 +59,12 @@ public void setExposeSpaceUtil(boolean expose) {
public boolean getExposeSpaceUtil() {
return this.exposeSpaceUtil;
}
+
+ public boolean getMinimizeScanImpact() {
+ return minimizeScanImpact;
+ }
+
+ public void setMinimizeScanImpact(boolean minimizeScanImpact) {
+ this.minimizeScanImpact = minimizeScanImpact;
+ }
}
View
9 src/java/voldemort/store/bdb/BdbStorageConfiguration.java
@@ -37,6 +37,7 @@
import voldemort.utils.Time;
import com.google.common.collect.Maps;
+import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
@@ -107,11 +108,17 @@ public BdbStorageConfiguration(VoldemortConfig config) {
Integer.toString(config.getBdbLogFaultReadSize()));
environmentConfig.setConfigParam(EnvironmentConfig.LOG_ITERATOR_READ_SIZE,
Integer.toString(config.getBdbLogIteratorReadSize()));
+ environmentConfig.setConfigParam(EnvironmentConfig.CLEANER_LAZY_MIGRATION,
+ Boolean.toString(config.getBdbCleanerLazyMigration()));
environmentConfig.setLockTimeout(config.getBdbLockTimeoutMs(), TimeUnit.MILLISECONDS);
+ if(config.getBdbCacheModeEvictLN()) {
+ environmentConfig.setCacheMode(CacheMode.EVICT_LN);
+ }
+
databaseConfig = new DatabaseConfig();
databaseConfig.setAllowCreate(true);
- databaseConfig.setSortedDuplicates(config.isBdbSortedDuplicatesEnabled());
+ databaseConfig.setSortedDuplicates(false);
databaseConfig.setNodeMaxEntries(config.getBdbBtreeFanout());
databaseConfig.setTransactional(true);
bdbMasterDir = config.getBdbDataDirectory();
View
493 src/java/voldemort/store/bdb/BdbStorageEngine.java
@@ -17,8 +17,12 @@
package voldemort.store.bdb;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.codec.binary.Hex;
@@ -26,31 +30,27 @@
import voldemort.VoldemortException;
import voldemort.annotations.jmx.JmxOperation;
-import voldemort.serialization.IdentitySerializer;
-import voldemort.serialization.Serializer;
-import voldemort.serialization.VersionedSerializer;
import voldemort.server.protocol.admin.AsyncOperationStatus;
import voldemort.store.NoSuchCapabilityException;
import voldemort.store.PersistenceFailureException;
import voldemort.store.StorageEngine;
import voldemort.store.StorageInitializationException;
import voldemort.store.Store;
+import voldemort.store.StoreBinaryFormat;
import voldemort.store.StoreCapabilityType;
import voldemort.store.StoreUtils;
import voldemort.store.backup.NativeBackupable;
import voldemort.store.bdb.stats.BdbEnvironmentStats;
import voldemort.utils.ByteArray;
-import voldemort.utils.ByteUtils;
import voldemort.utils.ClosableIterator;
import voldemort.utils.Pair;
import voldemort.utils.Utils;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Occurred;
-import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
-import com.google.common.collect.Lists;
+import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
@@ -65,7 +65,6 @@
/**
* A store that uses BDB for persistence
*
- *
*/
public class BdbStorageEngine implements StorageEngine<ByteArray, byte[], byte[]>, NativeBackupable {
@@ -75,12 +74,11 @@
private final String name;
private Database bdbDatabase;
private final Environment environment;
- private final VersionedSerializer<byte[]> versionedSerializer;
private final AtomicBoolean isOpen;
private final LockMode readLockMode;
- private final Serializer<Version> versionSerializer;
private final BdbEnvironmentStats bdbEnvironmentStats;
private final AtomicBoolean isTruncating = new AtomicBoolean(false);
+ private final boolean minimizeScanImpact;
public BdbStorageEngine(String name,
Environment environment,
@@ -89,22 +87,12 @@ public BdbStorageEngine(String name,
this.name = Utils.notNull(name);
this.bdbDatabase = Utils.notNull(database);
this.environment = Utils.notNull(environment);
- this.versionedSerializer = new VersionedSerializer<byte[]>(new IdentitySerializer());
- this.versionSerializer = new Serializer<Version>() {
-
- public byte[] toBytes(Version object) {
- return ((VectorClock) object).toBytes();
- }
-
- public Version toObject(byte[] bytes) {
- return versionedSerializer.getVersion(bytes);
- }
- };
this.isOpen = new AtomicBoolean(true);
this.readLockMode = config.getLockMode();
this.bdbEnvironmentStats = new BdbEnvironmentStats(environment,
config.getStatsCacheTtlMs(),
config.getExposeSpaceUtil());
+ this.minimizeScanImpact = config.getMinimizeScanImpact();
}
public String getName() {
@@ -114,6 +102,9 @@ public String getName() {
public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries() {
try {
Cursor cursor = getBdbDatabase().openCursor(null, null);
+ // evict data brought in by the cursor walk right away
+ if(this.minimizeScanImpact)
+ cursor.setCacheMode(CacheMode.EVICT_BIN);
return new BdbEntriesIterator(cursor);
} catch(DatabaseException e) {
logger.error(e);
@@ -124,6 +115,9 @@ public String getName() {
public ClosableIterator<ByteArray> keys() {
try {
Cursor cursor = getBdbDatabase().openCursor(null, null);
+ // evict data brought in by the cursor walk right away
+ if(this.minimizeScanImpact)
+ cursor.setCacheMode(CacheMode.EVICT_BIN);
return new BdbKeysIterator(cursor);
} catch(DatabaseException e) {
logger.error(e);
@@ -151,9 +145,7 @@ public void truncate() {
throw new VoldemortException("Failed to truncate Bdb store " + getName(), e);
} finally {
-
commitOrAbort(succeeded, transaction);
-
// reopen the bdb database for future queries.
if(reopenBdbDatabase()) {
isTruncating.compareAndSet(true, false);
@@ -196,37 +188,46 @@ private boolean reopenBdbDatabase() {
}
}
+ /**
+ * truncate() operation mandates that all opened Database be closed before
+ * attempting truncation.
+ * <p>
+ * This method throws an exception while truncation is happening to any
+ * request attempting in parallel with store truncation.
+ *
+ * @return
+ */
+ private Database getBdbDatabase() {
+ if(isTruncating.get()) {
+ throw new VoldemortException("Bdb Store " + getName()
+ + " is currently truncating cannot serve any request.");
+ }
+ return bdbDatabase;
+ }
+
public List<Version> getVersions(ByteArray key) {
- return get(key, null, readLockMode, versionSerializer);
+ return StoreUtils.getVersions(get(key, null));
}
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms)
throws PersistenceFailureException {
- return get(key, transforms, readLockMode, versionedSerializer);
- }
-
- private <T> List<T> get(ByteArray key,
- @SuppressWarnings("unused") byte[] transforms,
- LockMode lockMode,
- Serializer<T> serializer) throws PersistenceFailureException {
StoreUtils.assertValidKey(key);
+ DatabaseEntry keyEntry = new DatabaseEntry(key.get());
+ DatabaseEntry valueEntry = new DatabaseEntry();
long startTimeNs = -1;
if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();
- Cursor cursor = null;
try {
- cursor = getBdbDatabase().openCursor(null, null);
- List<T> result = get(cursor, key, lockMode, serializer);
-
- // If null, try again in different locking mode to
- // avoid null result due to gap between delete and new write
- if(result.size() == 0 && lockMode != LockMode.DEFAULT) {
- return get(cursor, key, LockMode.DEFAULT, serializer);
+ // uncommitted reads are perfectly fine now, since we have no
+ // je-delete() in put()
+ OperationStatus status = getBdbDatabase().get(null, keyEntry, valueEntry, readLockMode);
+ if(OperationStatus.SUCCESS == status) {
+ return StoreBinaryFormat.fromByteArray(valueEntry.getData());
} else {
- return result;
+ return Collections.emptyList();
}
} catch(DatabaseException e) {
logger.error(e);
@@ -238,95 +239,31 @@ private boolean reopenBdbDatabase() {
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}
-
- attemptClose(cursor);
}
}
- /**
- * truncate() operation mandates that all opened Database be closed before
- * attempting truncation.
- * <p>
- * This method throws an exception while truncation is happening to any
- * request attempting in parallel with store truncation.
- *
- * @return
- */
- private Database getBdbDatabase() {
- if(isTruncating.get()) {
- throw new VoldemortException("Bdb Store " + getName()
- + " is currently truncating cannot serve any request.");
- }
-
- return bdbDatabase;
- }
-
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {
-
+ StoreUtils.assertValidKeys(keys);
+ Map<ByteArray, List<Versioned<byte[]>>> results = null;
long startTimeNs = -1;
if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();
-
- StoreUtils.assertValidKeys(keys);
- Map<ByteArray, List<Versioned<byte[]>>> result = StoreUtils.newEmptyHashMap(keys);
- Cursor cursor = null;
-
- String keyStr = "";
-
try {
- cursor = getBdbDatabase().openCursor(null, null);
- for(ByteArray key: keys) {
-
- if(logger.isTraceEnabled())
+ results = StoreUtils.getAll(this, keys, transforms);
+ } catch(PersistenceFailureException pfe) {
+ throw pfe;
+ } finally {
+ if(logger.isTraceEnabled()) {
+ String keyStr = "";
+ for(ByteArray key: keys)
keyStr += key + " ";
-
- List<Versioned<byte[]>> values = get(cursor, key, readLockMode, versionedSerializer);
- if(!values.isEmpty())
- result.put(key, values);
+ logger.trace("Completed GETALL from keys " + keyStr + " in "
+ + (System.nanoTime() - startTimeNs) + " ns at "
+ + System.currentTimeMillis());
}
- } catch(DatabaseException e) {
- logger.error(e);
- throw new PersistenceFailureException(e);
- } finally {
- attemptClose(cursor);
- }
-
- if(logger.isTraceEnabled())
- logger.trace("Completed GETALL from keys " + keyStr + " in "
- + (System.nanoTime() - startTimeNs) + " ns at "
- + System.currentTimeMillis());
-
- return result;
- }
-
- private static <T> List<T> get(Cursor cursor,
- ByteArray key,
- LockMode lockMode,
- Serializer<T> serializer) throws DatabaseException {
- StoreUtils.assertValidKey(key);
-
- long startTimeNs = -1;
-
- if(logger.isTraceEnabled())
- startTimeNs = System.nanoTime();
-
- DatabaseEntry keyEntry = new DatabaseEntry(key.get());
- DatabaseEntry valueEntry = new DatabaseEntry();
- List<T> results = Lists.newArrayList();
-
- for(OperationStatus status = cursor.getSearchKey(keyEntry, valueEntry, lockMode); status == OperationStatus.SUCCESS; status = cursor.getNextDup(keyEntry,
- valueEntry,
- lockMode)) {
- results.add(serializer.toObject(valueEntry.getData()));
- }
-
- if(logger.isTraceEnabled()) {
- logger.trace("Completed GET from key " + key + " in "
- + (System.nanoTime() - startTimeNs) + " ns at "
- + System.currentTimeMillis());
}
return results;
@@ -334,46 +271,58 @@ private Database getBdbDatabase() {
public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws PersistenceFailureException {
- StoreUtils.assertValidKey(key);
long startTimeNs = -1;
if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();
+ StoreUtils.assertValidKey(key);
DatabaseEntry keyEntry = new DatabaseEntry(key.get());
+ DatabaseEntry valueEntry = new DatabaseEntry();
+
boolean succeeded = false;
Transaction transaction = null;
- Cursor cursor = null;
- try {
- transaction = this.environment.beginTransaction(null, null);
+ List<Versioned<byte[]>> vals = null;
- // Check existing values
- // if there is a version obsoleted by this value delete it
- // if there is a version later than this one, throw an exception
- DatabaseEntry valueEntry = new DatabaseEntry();
- cursor = getBdbDatabase().openCursor(transaction, null);
- for(OperationStatus status = cursor.getSearchKey(keyEntry, valueEntry, LockMode.RMW); status == OperationStatus.SUCCESS; status = cursor.getNextDup(keyEntry,
- valueEntry,
- LockMode.RMW)) {
- VectorClock clock = new VectorClock(valueEntry.getData());
- Occurred occurred = value.getVersion().compare(clock);
- if(occurred == Occurred.BEFORE)
- throw new ObsoleteVersionException("Key "
- + new String(hexCodec.encode(key.get()))
- + " "
- + value.getVersion().toString()
- + " is obsolete, it is no greater than the current version of "
- + clock + ".");
- else if(occurred == Occurred.AFTER)
- // best effort delete of obsolete previous value!
- cursor.delete();
+ try {
+ transaction = environment.beginTransaction(null, null);
+
+ // do a get for the existing values
+ OperationStatus status = getBdbDatabase().get(transaction,
+ keyEntry,
+ valueEntry,
+ LockMode.RMW);
+ if(OperationStatus.SUCCESS == status) {
+ // update
+ vals = StoreBinaryFormat.fromByteArray(valueEntry.getData());
+ // compare vector clocks and throw out old ones, for updates
+
+ Iterator<Versioned<byte[]>> iter = vals.iterator();
+ while(iter.hasNext()) {
+ Versioned<byte[]> curr = iter.next();
+ Occurred occurred = value.getVersion().compare(curr.getVersion());
+ if(occurred == Occurred.BEFORE)
+ throw new ObsoleteVersionException("Key "
+ + new String(hexCodec.encode(key.get()))
+ + " "
+ + value.getVersion().toString()
+ + " is obsolete, it is no greater than the current version of "
+ + curr.getVersion().toString() + ".");
+ else if(occurred == Occurred.AFTER)
+ iter.remove();
+ }
+ } else {
+ // insert
+ vals = new ArrayList<Versioned<byte[]>>();
}
- // Okay so we cleaned up all the prior stuff, so now we are good to
- // insert the new thing
- valueEntry = new DatabaseEntry(versionedSerializer.toBytes(value));
- OperationStatus status = cursor.put(keyEntry, valueEntry);
+ // update the new value
+ vals.add(value);
+
+ valueEntry.setData(StoreBinaryFormat.toByteArray(vals));
+ status = getBdbDatabase().put(transaction, keyEntry, valueEntry);
+
if(status != OperationStatus.SUCCESS)
throw new PersistenceFailureException("Put operation failed with status: " + status);
succeeded = true;
@@ -382,22 +331,21 @@ else if(occurred == Occurred.AFTER)
logger.error(e);
throw new PersistenceFailureException(e);
} finally {
- attemptClose(cursor);
if(succeeded)
attemptCommit(transaction);
else
attemptAbort(transaction);
- }
-
- if(logger.isTraceEnabled()) {
- logger.trace("Completed PUT to key " + key + " (keyRef: "
- + System.identityHashCode(key) + " value " + value + " in "
- + (System.nanoTime() - startTimeNs) + " ns at "
- + System.currentTimeMillis());
+ if(logger.isTraceEnabled()) {
+ logger.trace("Completed PUT to key " + key + " (keyRef: "
+ + System.identityHashCode(key) + " value " + value + " in "
+ + (System.nanoTime() - startTimeNs) + " ns at "
+ + System.currentTimeMillis());
+ }
}
}
public boolean delete(ByteArray key, Version version) throws PersistenceFailureException {
+
StoreUtils.assertValidKey(key);
long startTimeNs = -1;
@@ -405,43 +353,69 @@ public boolean delete(ByteArray key, Version version) throws PersistenceFailureE
if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();
- boolean deletedSomething = false;
- Cursor cursor = null;
Transaction transaction = null;
try {
transaction = this.environment.beginTransaction(null, null);
DatabaseEntry keyEntry = new DatabaseEntry(key.get());
- DatabaseEntry valueEntry = new DatabaseEntry();
- cursor = getBdbDatabase().openCursor(transaction, null);
- OperationStatus status = cursor.getSearchKey(keyEntry,
- valueEntry,
- LockMode.READ_UNCOMMITTED);
- while(status == OperationStatus.SUCCESS) {
- // if version is null no comparison is necessary
- if(new VectorClock(valueEntry.getData()).compare(version) == Occurred.BEFORE) {
- cursor.delete();
- deletedSomething = true;
+
+ if(version == null) {
+ // unversioned delete. Just blow away the whole thing
+ OperationStatus status = getBdbDatabase().delete(transaction, keyEntry);
+ if(OperationStatus.SUCCESS == status)
+ return true;
+ else
+ return false;
+ } else {
+ // versioned deletes; need to determine what to delete
+ DatabaseEntry valueEntry = new DatabaseEntry();
+
+ // do a get for the existing values
+ OperationStatus status = getBdbDatabase().get(transaction,
+ keyEntry,
+ valueEntry,
+ LockMode.RMW);
+ // key does not exist to begin with.
+ if(OperationStatus.NOTFOUND == status)
+ return false;
+
+ List<Versioned<byte[]>> vals = StoreBinaryFormat.fromByteArray(valueEntry.getData());
+ Iterator<Versioned<byte[]>> iter = vals.iterator();
+ int numVersions = vals.size();
+ int numDeletedVersions = 0;
+
+ // go over the versions and remove everything before the
+ // supplied version
+ while(iter.hasNext()) {
+ Versioned<byte[]> curr = iter.next();
+ Version currentVersion = curr.getVersion();
+ if(currentVersion.compare(version) == Occurred.BEFORE) {
+ iter.remove();
+ numDeletedVersions++;
+ }
+ }
+
+ if(numDeletedVersions < numVersions) {
+ // we still have some valid versions
+ valueEntry.setData(StoreBinaryFormat.toByteArray(vals));
+ getBdbDatabase().put(transaction, keyEntry, valueEntry);
+ } else {
+ // we have deleted all the versions; so get rid of the entry
+ // in the database
+ getBdbDatabase().delete(transaction, keyEntry);
}
- status = cursor.getNextDup(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED);
+ return numDeletedVersions > 0;
}
- return deletedSomething;
} catch(DatabaseException e) {
logger.error(e);
throw new PersistenceFailureException(e);
} finally {
-
+ attemptCommit(transaction);
if(logger.isTraceEnabled()) {
logger.trace("Completed DELETE of key " + key + " (keyRef: "
+ System.identityHashCode(key) + ") in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}
-
- try {
- attemptClose(cursor);
- } finally {
- attemptCommit(transaction);
- }
}
}
@@ -483,7 +457,8 @@ private void attemptAbort(Transaction transaction) {
private void attemptCommit(Transaction transaction) {
try {
- transaction.commit();
+ if(transaction != null)
+ transaction.commit();
} catch(DatabaseException e) {
logger.error("Transaction commit failed!", e);
attemptAbort(transaction);
@@ -491,16 +466,6 @@ private void attemptCommit(Transaction transaction) {
}
}
- private static void attemptClose(Cursor cursor) {
- try {
- if(cursor != null)
- cursor.close();
- } catch(DatabaseException e) {
- logger.error("Error closing cursor.", e);
- throw new PersistenceFailureException(e.getMessage(), e);
- }
- }
-
public DatabaseStats getStats(boolean setFast) {
try {
StatsConfig config = new StatsConfig();
@@ -530,121 +495,127 @@ public BdbEnvironmentStats getBdbEnvironmentStats() {
private static abstract class BdbIterator<T> implements ClosableIterator<T> {
- private final boolean noValues;
- final Cursor cursor;
-
- private T current;
private volatile boolean isOpen;
+ final Cursor cursor;
- public BdbIterator(Cursor cursor, boolean noValues) {
+ BdbIterator(Cursor cursor) {
this.cursor = cursor;
isOpen = true;
- this.noValues = noValues;
- DatabaseEntry keyEntry = new DatabaseEntry();
- DatabaseEntry valueEntry = new DatabaseEntry();
- if(noValues)
- valueEntry.setPartial(true);
- try {
- cursor.getFirst(keyEntry, valueEntry, LockMode.READ_UNCOMMITTED);
- } catch(DatabaseException e) {
- logger.error(e);
- throw new PersistenceFailureException(e);
- }
- if(keyEntry.getData() != null)
- current = get(keyEntry, valueEntry);
}
- protected abstract T get(DatabaseEntry key, DatabaseEntry value);
-
- protected abstract void moveCursor(DatabaseEntry key, DatabaseEntry value)
- throws DatabaseException;
-
- public final boolean hasNext() {
- return current != null;
- }
-
- public final T next() {
- if(!isOpen)
- throw new PersistenceFailureException("Call to next() on a closed iterator.");
-
- DatabaseEntry keyEntry = new DatabaseEntry();
- DatabaseEntry valueEntry = new DatabaseEntry();
- if(noValues)
- valueEntry.setPartial(true);
+ public final void close() {
try {
- moveCursor(keyEntry, valueEntry);
+ if(isOpen) {
+ cursor.close();
+ isOpen = false;
+ }
} catch(DatabaseException e) {
logger.error(e);
- throw new PersistenceFailureException(e);
}
- T previous = current;
- if(keyEntry.getData() == null)
- current = null;
- else
- current = get(keyEntry, valueEntry);
-
- return previous;
}
public final void remove() {
- throw new UnsupportedOperationException("No removal y'all.");
- }
-
- public final void close() {
- try {
- cursor.close();
- isOpen = false;
- } catch(DatabaseException e) {
- logger.error(e);
- }
+ throw new UnsupportedOperationException("No removal");
}
@Override
protected final void finalize() {
if(isOpen) {
- logger.error("Failure to close cursor, will be forcably closed.");
+ logger.error("Failure to close cursor, will be forcibly closed.");
close();
}
-
}
}
- private static class BdbKeysIterator extends BdbIterator<ByteArray> {
+ private static class BdbEntriesIterator extends BdbIterator<Pair<ByteArray, Versioned<byte[]>>> {
- public BdbKeysIterator(Cursor cursor) {
- super(cursor, true);
+ private List<Pair<ByteArray, Versioned<byte[]>>> cache;
+
+ public BdbEntriesIterator(Cursor cursor) {
+ super(cursor);
+ this.cache = new ArrayList<Pair<ByteArray, Versioned<byte[]>>>();
}
- @Override
- protected ByteArray get(DatabaseEntry key, DatabaseEntry value) {
- return new ByteArray(key.getData());
+ public boolean hasNext() {
+ // we have a next element if there is at least one cached
+ // element or we can make more
+ return cache.size() > 0 || makeMore();
}
- @Override
- protected void moveCursor(DatabaseEntry key, DatabaseEntry value) throws DatabaseException {
- cursor.getNextNoDup(key, value, LockMode.READ_UNCOMMITTED);
+ public Pair<ByteArray, Versioned<byte[]>> next() {
+ if(cache.size() == 0) {
+ if(!makeMore())
+ throw new NoSuchElementException("Iterated to end.");
+ }
+ // must now have at least one thing in the cache
+ return cache.remove(cache.size() - 1);
}
+ protected boolean makeMore() {
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+ try {
+ OperationStatus status = cursor.getNext(keyEntry,
+ valueEntry,
+ LockMode.READ_UNCOMMITTED);
+
+ if(OperationStatus.NOTFOUND == status) {
+ // we have reached the end of the cursor
+ return false;
+ }
+
+ ByteArray key = new ByteArray(keyEntry.getData());
+ for(Versioned<byte[]> val: StoreBinaryFormat.fromByteArray(valueEntry.getData()))
+ this.cache.add(Pair.create(key, val));
+ return true;
+ } catch(DatabaseException e) {
+ logger.error(e);
+ throw new PersistenceFailureException(e);
+ }
+ }
}
- private static class BdbEntriesIterator extends BdbIterator<Pair<ByteArray, Versioned<byte[]>>> {
+ private static class BdbKeysIterator extends BdbIterator<ByteArray> {
- public BdbEntriesIterator(Cursor cursor) {
- super(cursor, false);
+ ByteArray current = null;
+
+ public BdbKeysIterator(Cursor cursor) {
+ super(cursor);
}
- @Override
- protected Pair<ByteArray, Versioned<byte[]>> get(DatabaseEntry key, DatabaseEntry value) {
- VectorClock clock = new VectorClock(value.getData());
- byte[] bytes = ByteUtils.copy(value.getData(),
- clock.sizeInBytes(),
- value.getData().length);
- return Pair.create(new ByteArray(key.getData()), new Versioned<byte[]>(bytes, clock));
+ public boolean hasNext() {
+ return current != null || fetchNextKey();
}
- @Override
- protected void moveCursor(DatabaseEntry key, DatabaseEntry value) throws DatabaseException {
- cursor.getNext(key, value, LockMode.READ_UNCOMMITTED);
+ public ByteArray next() {
+ ByteArray result = null;
+ if(current == null) {
+ if(!fetchNextKey())
+ throw new NoSuchElementException("Iterated to end.");
+ }
+ result = current;
+ current = null;
+ return result;
+ }
+
+ private boolean fetchNextKey() {
+ DatabaseEntry keyEntry = new DatabaseEntry();
+ DatabaseEntry valueEntry = new DatabaseEntry();
+ valueEntry.setPartial(true);
+ try {
+ OperationStatus status = cursor.getNext(keyEntry,
+ valueEntry,
+ LockMode.READ_UNCOMMITTED);
+ if(OperationStatus.NOTFOUND == status) {
+ // we have reached the end of the cursor
+ return false;
+ }
+ current = new ByteArray(keyEntry.getData());
+ return true;
+ } catch(DatabaseException e) {
+ logger.error(e);
+ throw new PersistenceFailureException(e);
+ }
}
}
View
69 src/java/voldemort/store/bdb/stats/BdbEnvironmentStats.java
@@ -97,6 +97,60 @@ public long getEvictedLNs() {
return getFastStats().getNEvictPasses();
}
+ @JmxGetter(name = "BINFetches")
+ public long getBINFetches() {
+ return getFastStats().getNBINsFetch();
+ }
+
+ @JmxGetter(name = "BINFetchMisses")
+ public long getBINFetchMisses() {
+ return getFastStats().getNBINsFetchMiss();
+ }
+
+ @JmxGetter(name = "INFetches")
+ public long getINFetches() {
+ return getFastStats().getNUpperINsFetch();
+ }
+
+ @JmxGetter(name = "INFetchMisses")
+ public long getINFetchMisses() {
+ return getFastStats().getNUpperINsFetchMiss();
+ }
+
+ @JmxGetter(name = "LNFetches")
+ public long getLNFetches() {
+ return getFastStats().getNLNsFetch();
+ }
+
+ @JmxGetter(name = "LNFetchMisses")
+ public long getLNFetchMisses() {
+ return getFastStats().getNLNsFetchMiss();
+ }
+
+ @JmxGetter(name = "CachedBINs")
+ public long getCachedBINs() {
+ return getFastStats().getNCachedBINs();
+ }
+
+ @JmxGetter(name = "CachedINs")
+ public long getCachedUpperINs() {
+ return getFastStats().getNCachedUpperINs();
+ }
+
+ @JmxGetter(name = "EvictedBINs")
+ public long getEvictedBINs() {
+ EnvironmentStats stats = getFastStats();
+ return stats.getNBINsEvictedCacheMode() + stats.getNBINsEvictedCritical()
+ + stats.getNBINsEvictedDaemon() + stats.getNBINsEvictedManual();
+ }
+
+ @JmxGetter(name = "EvictedINs")
+ public long getEvictedINs() {
+ EnvironmentStats stats = getFastStats();
+ return stats.getNUpperINsEvictedCacheMode() + stats.getNUpperINsEvictedCritical()
+ + stats.getNUpperINsEvictedDaemon() + stats.getNUpperINsEvictedManual();
+ }
+
// 2. IO
@JmxGetter(name = "NumRandomWrites")
public long getNumRandomWrites() {
@@ -299,6 +353,21 @@ public double getPercentageUtilization() {
return safeGetPercentage(getTotalSpaceUtilized(), getTotalSpace());
}
+ @JmxGetter(name = "PercentageBINMiss")
+ public double getPercentageBINMiss() {
+ return safeGetPercentage(getBINFetchMisses(), getBINFetches());
+ }
+
+ @JmxGetter(name = "PercentageINMiss")
+ public double getPercentageINMiss() {
+ return safeGetPercentage(getINFetchMisses(), getINFetches());
+ }
+
+ @JmxGetter(name = "PercentageLNMiss")
+ public double getPercentageLNMiss() {
+ return safeGetPercentage(getLNFetchMisses(), getLNFetches());
+ }
+
public static double safeGetPercentage(long rawNum, long total) {
return total == 0 ? 0.0d : rawNum / (float) total;
}
View
26 src/java/voldemort/versioning/VectorClock.java
@@ -114,24 +114,27 @@ public VectorClock(byte[] bytes, int offset) {
public byte[] toBytes() {
byte[] serialized = new byte[sizeInBytes()];
+ toBytes(serialized, 0);
+ return serialized;
+ }
+
+ public int toBytes(byte[] buf, int offset) {
// write the number of versions
- ByteUtils.writeShort(serialized, (short) versions.size(), 0);
+ ByteUtils.writeShort(buf, (short) versions.size(), offset);
+ offset += ByteUtils.SIZE_OF_SHORT;
// write the size of each version in bytes
byte versionSize = ByteUtils.numberOfBytesRequired(getMaxVersion());
- serialized[2] = versionSize;
+ buf[offset] = versionSize;
+ offset++;
int clockEntrySize = ByteUtils.SIZE_OF_SHORT + versionSize;
- int start = 3;
for(ClockEntry v: versions) {
- ByteUtils.writeShort(serialized, v.getNodeId(), start);
- ByteUtils.writeBytes(serialized,
- v.getVersion(),
- start + ByteUtils.SIZE_OF_SHORT,
- versionSize);
- start += clockEntrySize;
+ ByteUtils.writeShort(buf, v.getNodeId(), offset);
+ ByteUtils.writeBytes(buf, v.getVersion(), offset + ByteUtils.SIZE_OF_SHORT, versionSize);
+ offset += clockEntrySize;
}
- ByteUtils.writeLong(serialized, this.timestamp, start);
- return serialized;
+ ByteUtils.writeLong(buf, this.timestamp, offset);
+ return sizeInBytes();
}
public int sizeInBytes() {
@@ -225,6 +228,7 @@ public String toString() {
builder.append(this.versions.get(this.versions.size() - 1));
}
builder.append(")");
+ builder.append(" ts:" + timestamp);
return builder.toString();
}
View
2  src/java/voldemort/versioning/Versioned.java
@@ -44,7 +44,7 @@ public Versioned(T object, Version version) {
this.object = object;
}
- public Version getVersion() {
+ public VectorClock getVersion() {
return version;
}
View
2  test/integration/voldemort/CatBdbStore.java
@@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception {
DatabaseConfig databaseConfig = new DatabaseConfig();
databaseConfig.setAllowCreate(true);
databaseConfig.setTransactional(config.isBdbWriteTransactionsEnabled());
- databaseConfig.setSortedDuplicates(config.isBdbSortedDuplicatesEnabled());
+ databaseConfig.setSortedDuplicates(false);
Database database = environment.openDatabase(null, storeName, databaseConfig);
StorageEngine<ByteArray, byte[], byte[]> store = new BdbStorageEngine(storeName,