Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
Checking mergeability… Don’t worry, you can still create the pull request.
  • 12 commits
  • 21 files changed
  • 2 commit comments
  • 2 contributors
View
6 .classpath
@@ -40,7 +40,6 @@
<classpathentry kind="lib" path="lib/protobuf-java-2.3.0.jar"/>
<classpathentry kind="lib" path="contrib/ec2-testing/lib/typica.jar"/>
<classpathentry kind="lib" path="lib/google-collect-1.0.jar"/>
- <classpathentry kind="lib" path="lib/je-4.0.92.jar"/>
<classpathentry kind="lib" path="lib/paranamer-2.1.jar"/>
<classpathentry kind="lib" path="lib/jackson-mapper-asl-1.4.0.jar"/>
<classpathentry kind="lib" path="lib/jackson-core-asl-1.4.0.jar"/>
@@ -51,8 +50,9 @@
<classpathentry kind="lib" path="lib/libthrift-0.5.0.jar"/>
<classpathentry kind="lib" path="lib/compress-lzf-0.9.1.jar"/>
<classpathentry kind="lib" path="lib/snappy-0.2.jar"/>
- <classpathentry kind="lib" path="lib/httpclient-4.1.2.jar" />
- <classpathentry kind="lib" path="lib/httpcore-4.1.2.jar" />
+ <classpathentry kind="lib" path="lib/httpclient-4.1.2.jar"/>
+ <classpathentry kind="lib" path="lib/httpcore-4.1.2.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="lib" path="lib/je-4.1.17.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
BIN lib/je-4.0.92.jar
Binary file not shown.
View
BIN lib/je-4.1.17.jar
Binary file not shown.
View
77 src/java/voldemort/server/VoldemortConfig.java
@@ -74,6 +74,7 @@
private int bdbCleanerMinUtilization;
private int bdbCleanerLookAheadCacheSize;
private boolean bdbCheckpointerHighPriority;
+ private boolean bdbCleanerLazyMigration;
private int bdbCleanerMaxBatchFiles;
private boolean bdbReadUncommitted;
private int bdbCleanerThreads;
@@ -83,6 +84,9 @@
private int bdbLogIteratorReadSize;
private boolean bdbFairLatches;
private long bdbStatsCacheTtlMs;
+ private boolean bdbCacheModeEvictLN;
+ private boolean bdbMinimizeScanImpact;
+ private boolean bdbExposeSpaceUtilization;
private String mysqlUsername;
private String mysqlPassword;
@@ -220,9 +224,13 @@ public VoldemortConfig(Props props) {
this.bdbLogIteratorReadSize = props.getInt("bdb.log.iterator.read.size", 8192);
this.bdbFairLatches = props.getBoolean("bdb.fair.latches", false);
this.bdbCheckpointerHighPriority = props.getBoolean("bdb.checkpointer.high.priority", false);
+ this.bdbCleanerLazyMigration = props.getBoolean("bdb.cleaner.lazy.migration", true);
this.bdbCleanerMaxBatchFiles = props.getInt("bdb.cleaner.max.batch.files", 0);
this.bdbReadUncommitted = props.getBoolean("bdb.lock.read_uncommitted", true);
this.bdbStatsCacheTtlMs = props.getLong("bdb.stats.cache.ttl.ms", 5 * Time.MS_PER_SECOND);
+ this.bdbCacheModeEvictLN = props.getBoolean("bdb.cache.evictln", false);
+ this.bdbMinimizeScanImpact = props.getBoolean("bdb.minimize.scan.impact", false);
+ this.bdbExposeSpaceUtilization = props.getBoolean("bdb.expose.space.utilization", true);
this.readOnlyBackups = props.getInt("readonly.backups", 1);
this.readOnlySearchStrategy = props.getString("readonly.search.strategy",
@@ -600,6 +608,25 @@ public final void setBdbCheckpointerHighPriority(boolean bdbCheckpointerHighPrio
}
/**
+ * If true, Cleaner offloads some work to application threads, to keep up
+ * with the write rate.
+ *
+ * <ul>
+ * <li>property: "bdb.cleaner.lazy.migration"</li>
+ * <li>default : true</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbCleanerLazyMigration() {
+ return bdbCleanerLazyMigration;
+ }
+
+ public final void setBdbCleanerLazyMigration(boolean bdbCleanerLazyMigration) {
+ this.bdbCleanerLazyMigration = bdbCleanerLazyMigration;
+ }
+
+ /**
* The maximum number of log files in the cleaner's backlog, or zero if
* there is no limit
*
@@ -739,6 +766,19 @@ public final void setBdbCleanerMinUtilization(int minUtilization) {
}
/**
+ * This parameter controls whether we expose space utilization via MBean. If
+ * set to false, stat will always return 0;
+ *
+ */
+ public boolean getBdbExposeSpaceUtilization() {
+ return bdbExposeSpaceUtilization;
+ }
+
+ public void setBdbExposeSpaceUtilization(boolean bdbExposeSpaceUtilization) {
+ this.bdbExposeSpaceUtilization = bdbExposeSpaceUtilization;
+ }
+
+ /**
*
* The btree node fanout. Given by "bdb.btree.fanout". default: 512
*/
@@ -751,6 +791,43 @@ public void setBdbBtreeFanout(int bdbBtreeFanout) {
}
/**
+ * If true, BDB will not cache data in the JVM.
+ *
+ * <ul>
+ * <li>Property : "bdb.cache.evictln"</li>
+ * <li>Default : false</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbCacheModeEvictLN() {
+ return bdbCacheModeEvictLN;
+ }
+
+ public void setBdbCacheModeEvictLN(boolean bdbCacheModeEvictLN) {
+ this.bdbCacheModeEvictLN = bdbCacheModeEvictLN;
+ }
+
+ /**
+ * If true, attempts are made to minimize impact to BDB cache during scan
+ * jobs
+ *
+ * <ul>
+ * <li>Property : "bdb.minimize.scan.impact"</li>
+ * <li>Default : false</li>
+ * </ul>
+ *
+ * @return
+ */
+ public boolean getBdbMinimizeScanImpact() {
+ return bdbMinimizeScanImpact;
+ }
+
+ public void setBdbMinimizeScanImpact(boolean bdbMinimizeScanImpact) {
+ this.bdbMinimizeScanImpact = bdbMinimizeScanImpact;
+ }
+
+ /**
* The comfortable number of threads the threadpool will attempt to
* maintain. Specified by "core.threads" default: max(1, floor(0.5 *
* max.threads))
View
5 src/java/voldemort/server/VoldemortServer.java
@@ -47,6 +47,7 @@
import voldemort.server.storage.StorageService;
import voldemort.store.configuration.ConfigurationStorageEngine;
import voldemort.store.metadata.MetadataStore;
+import voldemort.utils.JNAUtils;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.SystemTime;
import voldemort.utils.Utils;
@@ -246,6 +247,8 @@ private void checkHostName() {
@Override
protected void startInner() throws VoldemortException {
+ // lock down jvm heap
+ JNAUtils.tryMlockall();
logger.info("Starting " + services.size() + " services.");
long start = System.currentTimeMillis();
for(VoldemortService service: services)
@@ -278,6 +281,8 @@ protected void stopInner() throws VoldemortException {
if(exceptions.size() > 0)
throw exceptions.get(0);
+ // release lock of jvm heap
+ JNAUtils.tryMunlockall();
}
public static void main(String[] args) throws Exception {
View
28 src/java/voldemort/server/scheduler/DataCleanupJob.java
@@ -16,8 +16,11 @@
package voldemort.server.scheduler;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.log4j.Logger;
+import voldemort.annotations.jmx.JmxGetter;
import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.StorageEngine;
import voldemort.utils.ClosableIterator;
@@ -42,6 +45,8 @@
private final long maxAgeMs;
private final Time time;
private final EventThrottler throttler;
+ private long totalEntriesScanned = 0;
+ private AtomicLong progressThisRun;
public DataCleanupJob(StorageEngine<K, V, T> store,
ScanPermitWrapper cleanupPermits,
@@ -53,10 +58,12 @@ public DataCleanupJob(StorageEngine<K, V, T> store,
this.maxAgeMs = maxAgeMs;
this.time = time;
this.throttler = throttler;
+ this.progressThisRun = new AtomicLong(0);
}
public void run() {
- acquireCleanupPermit();
+ acquireCleanupPermit(progressThisRun);
+
ClosableIterator<Pair<K, Versioned<V>>> iterator = null;
try {
logger.info("Starting data cleanup on store \"" + store.getName() + "\"...");
@@ -70,7 +77,7 @@ public void run() {
logger.info("Datacleanup job halted.");
return;
}
-
+ progressThisRun.incrementAndGet();
Pair<K, Versioned<V>> keyAndVal = iterator.next();
VectorClock clock = (VectorClock) keyAndVal.getSecond().getVersion();
if(now - clock.getTimestamp() > maxAgeMs) {
@@ -83,14 +90,21 @@ public void run() {
// throttle on number of entries.
throttler.maybeThrottle(1);
}
+ // log the total items scanned, so we will get an idea of data
+ // growth in a cheap, periodic way
logger.info("Data cleanup on store \"" + store.getName() + "\" is complete; " + deleted
- + " items deleted.");
+ + " items deleted. " + progressThisRun.get() + " items scanned");
+
} catch(Exception e) {
logger.error("Error in data cleanup job for store " + store.getName() + ": ", e);
} finally {
closeIterator(iterator);
logger.info("Releasing lock after data cleanup on \"" + store.getName() + "\".");
this.cleanupPermits.release();
+ synchronized(this) {
+ totalEntriesScanned += progressThisRun.get();
+ progressThisRun.set(0);
+ }
}
}
@@ -103,14 +117,18 @@ private void closeIterator(ClosableIterator<Pair<K, Versioned<V>>> iterator) {
}
}
- private void acquireCleanupPermit() {
+ private void acquireCleanupPermit(AtomicLong progress) {
logger.info("Acquiring lock to perform data cleanup on \"" + store.getName() + "\".");
try {
- this.cleanupPermits.acquire();
+ this.cleanupPermits.acquire(progress);
} catch(InterruptedException e) {
throw new IllegalStateException("Datacleanup interrupted while waiting for cleanup permit.",
e);
}
}
+ @JmxGetter(name = "numEntriesScanned", description = "Returns number of entries scanned")
+ public synchronized long getEntriesScanned() {
+ return totalEntriesScanned + progressThisRun.get();
+ }
}
View
2 src/java/voldemort/server/scheduler/slop/BlockingSlopPusherJob.java
@@ -219,7 +219,7 @@ public void run() {
private void acquireRepairPermit() {
logger.info("Acquiring lock to perform blocking slop pusher job ");
try {
- this.repairPermits.acquire();
+ this.repairPermits.acquire(null);
logger.info("Acquired lock to perform blocking slop pusher job ");
} catch(InterruptedException e) {
throw new IllegalStateException("Blocking slop pusher job interrupted while waiting for permit.",
View
2 src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java
@@ -372,7 +372,7 @@ public boolean isComplete() {
private void acquireRepairPermit() {
logger.info("Acquiring lock to perform streaming slop pusher job ");
try {
- this.repairPermits.acquire();
+ this.repairPermits.acquire(null);
logger.info("Acquired lock to perform streaming slop pusher job ");
} catch(InterruptedException e) {
stopAdminClient();
View
17 src/java/voldemort/server/storage/RepairJob.java
@@ -4,6 +4,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanOperationInfo;
@@ -75,8 +76,8 @@ public void run() {
for(StoreDefinition storeDef: metadataStore.getStoreDefList()) {
localStats.put(storeDef.getName(), 0L);
}
-
- if(!acquireRepairPermit())
+ AtomicLong progress = new AtomicLong(0);
+ if(!acquireRepairPermit(progress))
return;
try {
// Get routing factory
@@ -93,7 +94,6 @@ public void run() {
metadataStore.getCluster());
long repairSlops = 0L;
long numDeletedKeys = 0;
- long numScannedKeys = 0;
while(iterator.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> keyAndVal;
keyAndVal = iterator.next();
@@ -103,10 +103,9 @@ public void run() {
engine.delete(keyAndVal.getFirst(), keyAndVal.getSecond().getVersion());
numDeletedKeys++;
}
- numScannedKeys++;
- if(numScannedKeys % deleteBatchSize == 0)
- logger.info("#Scanned:" + numScannedKeys + " #Deleted:"
- + numDeletedKeys);
+ long itemsScanned = progress.incrementAndGet();
+ if(itemsScanned % deleteBatchSize == 0)
+ logger.info("#Scanned:" + itemsScanned + " #Deleted:" + numDeletedKeys);
}
closeIterator(iterator);
localStats.put(storeDef.getName(), repairSlops);
@@ -149,9 +148,9 @@ private boolean isWritableStore(StoreDefinition storeDef) {
}
}
- private boolean acquireRepairPermit() {
+ private boolean acquireRepairPermit(AtomicLong progress) {
logger.info("Acquiring lock to perform repair job ");
- if(this.repairPermits.tryAcquire()) {
+ if(this.repairPermits.tryAcquire(progress)) {
logger.info("Acquired lock to perform repair job ");
return true;
} else {
View
48 src/java/voldemort/server/storage/ScanPermitWrapper.java
@@ -2,49 +2,63 @@
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
public class ScanPermitWrapper {
private final Semaphore scanPermits;
- private List<String> permitOwners;
+ private Map<String, AtomicLong> permitOwners;
+ private final int numPermits;
+
+ private long totalEntriesScanned;
public ScanPermitWrapper(final int numPermits) {
+ this.numPermits = numPermits;
scanPermits = new Semaphore(numPermits);
- permitOwners = Collections.synchronizedList(new ArrayList<String>());
+ permitOwners = Collections.synchronizedMap(new HashMap<String, AtomicLong>());
+ }
+
+ public static String getOwnerName() {
+ return Thread.currentThread().getStackTrace()[2].getClassName();
}
- public void acquire() throws InterruptedException {
+ public void acquire(AtomicLong progress) throws InterruptedException {
this.scanPermits.acquire();
synchronized(permitOwners) {
- permitOwners.add(Thread.currentThread().getStackTrace()[2].getClassName());
+ permitOwners.put(getOwnerName(), progress);
}
}
public void release() {
this.scanPermits.release();
synchronized(permitOwners) {
- permitOwners.remove(Thread.currentThread().getStackTrace()[2].getClassName());
+ AtomicLong scannedCount = permitOwners.get(getOwnerName());
+ if(scannedCount != null)
+ totalEntriesScanned += scannedCount.get();
+ permitOwners.remove(getOwnerName());
}
}
public List<String> getPermitOwners() {
List<String> ownerList = new ArrayList<String>();
synchronized(permitOwners) {
- Iterator<String> i = this.permitOwners.iterator();
+ Iterator<String> i = this.permitOwners.keySet().iterator();
while(i.hasNext())
ownerList.add(i.next());
}
return ownerList;
}
- public boolean tryAcquire() {
+ public boolean tryAcquire(AtomicLong progress) {
boolean gotPermit = this.scanPermits.tryAcquire();
if(gotPermit) {
synchronized(permitOwners) {
- permitOwners.add(Thread.currentThread().getStackTrace()[2].getClassName());
+ permitOwners.put(getOwnerName(), progress);
}
}
return gotPermit;
@@ -53,4 +67,22 @@ public boolean tryAcquire() {
public int availablePermits() {
return this.scanPermits.availablePermits();
}
+
+ public int getGrantedPermits() {
+ return numPermits - availablePermits();
+ }
+
+ public long getEntriesScanned() {
+ long itemsScanned = 0;
+ synchronized(permitOwners) {
+ for(Map.Entry<String, AtomicLong> progressEntry: permitOwners.entrySet()) {
+ AtomicLong progress = progressEntry.getValue();
+ // slops are not included since they are tracked separately
+ if(progress != null) {
+ itemsScanned += progress.get();
+ }
+ }
+ }
+ return totalEntriesScanned + itemsScanned;
+ }
}
View
14 src/java/voldemort/server/storage/StorageService.java
@@ -591,7 +591,9 @@ private void scheduleCleanupJob(StoreDefinition storeDef,
* Time.MS_PER_DAY,
SystemTime.INSTANCE,
throttler);
-
+ if(voldemortConfig.isJmxEnabled()) {
+ JmxUtils.registerMbean("DataCleanupJob-" + engine.getName(), cleanupJob);
+ }
this.scheduler.schedule("cleanup-" + storeDef.getName(),
cleanupJob,
startTime,
@@ -819,4 +821,14 @@ public DynamicThrottleLimit getDynThrottleLimit() {
public List<String> getPermitOwners() {
return this.scanPermitWrapper.getPermitOwners();
}
+
+ @JmxGetter(name = "numGrantedScanPermits", description = "Returns number of scan permits granted at the moment")
+ public long getGrantedPermits() {
+ return this.scanPermitWrapper.getGrantedPermits();
+ }
+
+ @JmxGetter(name = "numEntriesScanned", description = "Returns number of entries scanned since last call")
+ public long getEntriesScanned() {
+ return this.scanPermitWrapper.getEntriesScanned();
+ }
}
View
29 src/java/voldemort/store/bdb/BdbRuntimeConfig.java
@@ -1,20 +1,25 @@
package voldemort.store.bdb;
-import com.sleepycat.je.LockMode;
import voldemort.server.VoldemortConfig;
import voldemort.utils.Time;
+import com.sleepycat.je.LockMode;
+
/**
* Runtime (i.e., post Environment creation) configuration for BdbStorageEngine
- *
+ *
*/
public class BdbRuntimeConfig {
public static final long DEFAULT_STATS_CACHE_TTL_MS = 5 * Time.MS_PER_SECOND;
public static final LockMode DEFAULT_LOCK_MODE = LockMode.READ_UNCOMMITTED;
+ public static final boolean DEFAULT_MINIMIZE_SCAN_IMPACT = false;
+ public static final boolean DEFAULT_EXPOSE_SPACE_UTIL = true;
private long statsCacheTtlMs = DEFAULT_STATS_CACHE_TTL_MS;
private LockMode lockMode = DEFAULT_LOCK_MODE;
+ private boolean minimizeScanImpact = DEFAULT_MINIMIZE_SCAN_IMPACT;
+ private boolean exposeSpaceUtil = DEFAULT_EXPOSE_SPACE_UTIL;
public BdbRuntimeConfig() {
@@ -22,9 +27,11 @@ public BdbRuntimeConfig() {
public BdbRuntimeConfig(VoldemortConfig config) {
LockMode lockMode = config.getBdbReadUncommitted() ? LockMode.READ_UNCOMMITTED
- : LockMode.DEFAULT;
+ : LockMode.DEFAULT;
setLockMode(lockMode);
setStatsCacheTtlMs(config.getBdbStatsCacheTtlMs());
+ setMinimizeScanImpact(config.getBdbMinimizeScanImpact());
+ setExposeSpaceUtil(config.getBdbExposeSpaceUtilization());
}
public long getStatsCacheTtlMs() {
@@ -44,4 +51,20 @@ public BdbRuntimeConfig setLockMode(LockMode lockMode) {
this.lockMode = lockMode;
return this;
}
+
+ public boolean getMinimizeScanImpact() {
+ return minimizeScanImpact;
+ }
+
+ public void setMinimizeScanImpact(boolean minimizeScanImpact) {
+ this.minimizeScanImpact = minimizeScanImpact;
+ }
+
+ public void setExposeSpaceUtil(boolean expose) {
+ this.exposeSpaceUtil = expose;
+ }
+
+ public boolean getExposeSpaceUtil() {
+ return this.exposeSpaceUtil;
+ }
}
View
19 src/java/voldemort/store/bdb/BdbStorageConfiguration.java
@@ -31,17 +31,17 @@
import voldemort.utils.ByteArray;
import voldemort.utils.Time;
+import com.google.common.collect.Maps;
+import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentStats;
-import com.sleepycat.je.PreloadConfig;
import com.sleepycat.je.StatsConfig;
-import com.google.common.collect.Maps;
-
/**
* The configuration that is shared between berkeley db instances. This includes
* the db environment and the configuration
@@ -70,13 +70,11 @@ public BdbStorageConfiguration(VoldemortConfig config) {
environmentConfig.setTransactional(true);
environmentConfig.setCacheSize(config.getBdbCacheSize());
if(config.isBdbWriteTransactionsEnabled() && config.isBdbFlushTransactionsEnabled()) {
- environmentConfig.setTxnNoSync(false);
- environmentConfig.setTxnWriteNoSync(false);
+ environmentConfig.setDurability(Durability.COMMIT_SYNC);
} else if(config.isBdbWriteTransactionsEnabled() && !config.isBdbFlushTransactionsEnabled()) {
- environmentConfig.setTxnNoSync(false);
- environmentConfig.setTxnWriteNoSync(true);
+ environmentConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
} else {
- environmentConfig.setTxnNoSync(true);
+ environmentConfig.setDurability(Durability.COMMIT_NO_SYNC);
}
environmentConfig.setAllowCreate(true);
environmentConfig.setConfigParam(EnvironmentConfig.LOG_FILE_MAX,
@@ -105,8 +103,13 @@ public BdbStorageConfiguration(VoldemortConfig config) {
Integer.toString(config.getBdbLogFaultReadSize()));
environmentConfig.setConfigParam(EnvironmentConfig.LOG_ITERATOR_READ_SIZE,
Integer.toString(config.getBdbLogIteratorReadSize()));
+ environmentConfig.setConfigParam(EnvironmentConfig.CLEANER_LAZY_MIGRATION,
+ Boolean.toString(config.getBdbCleanerLazyMigration()));
environmentConfig.setLockTimeout(config.getBdbLockTimeoutMs(), TimeUnit.MILLISECONDS);
+ if(config.getBdbCacheModeEvictLN()) {
+ environmentConfig.setCacheMode(CacheMode.EVICT_LN);
+ }
databaseConfig = new DatabaseConfig();
databaseConfig.setAllowCreate(true);
databaseConfig.setSortedDuplicates(config.isBdbSortedDuplicatesEnabled());
View
13 src/java/voldemort/store/bdb/BdbStorageEngine.java
@@ -51,6 +51,7 @@
import voldemort.versioning.Versioned;
import com.google.common.collect.Lists;
+import com.sleepycat.je.CacheMode;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
@@ -81,6 +82,7 @@
private final Serializer<Version> versionSerializer;
private final BdbEnvironmentStats bdbEnvironmentStats;
private final AtomicBoolean isTruncating = new AtomicBoolean(false);
+ private final boolean minimizeScanImpact;
public BdbStorageEngine(String name,
Environment environment,
@@ -102,7 +104,10 @@ public Version toObject(byte[] bytes) {
};
this.isOpen = new AtomicBoolean(true);
this.readLockMode = config.getLockMode();
- this.bdbEnvironmentStats = new BdbEnvironmentStats(environment, config.getStatsCacheTtlMs());
+ this.bdbEnvironmentStats = new BdbEnvironmentStats(environment,
+ config.getStatsCacheTtlMs(),
+ config.getExposeSpaceUtil());
+ this.minimizeScanImpact = config.getMinimizeScanImpact();
}
public String getName() {
@@ -112,6 +117,9 @@ public String getName() {
public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries() {
try {
Cursor cursor = getBdbDatabase().openCursor(null, null);
+ // evict data brought in by the cursor walk right away
+ if(this.minimizeScanImpact)
+ cursor.setCacheMode(CacheMode.EVICT_BIN);
return new BdbEntriesIterator(cursor);
} catch(DatabaseException e) {
logger.error(e);
@@ -122,6 +130,9 @@ public String getName() {
public ClosableIterator<ByteArray> keys() {
try {
Cursor cursor = getBdbDatabase().openCursor(null, null);
+ // evict data brought in by the cursor walk right away
+ if(this.minimizeScanImpact)
+ cursor.setCacheMode(CacheMode.EVICT_BIN);
return new BdbKeysIterator(cursor);
} catch(DatabaseException e) {
logger.error(e);
View
235 src/java/voldemort/store/bdb/stats/BdbEnvironmentStats.java
@@ -1,24 +1,27 @@
package voldemort.store.bdb.stats;
+import java.util.concurrent.Callable;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.EnvironmentStats;
-import com.sleepycat.je.StatsConfig;
import voldemort.VoldemortException;
import voldemort.annotations.Experimental;
import voldemort.annotations.jmx.JmxGetter;
import voldemort.utils.CachedCallable;
-import java.util.concurrent.Callable;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.EnvironmentStats;
+import com.sleepycat.je.StatsConfig;
public class BdbEnvironmentStats {
private final Environment environment;
private final CachedCallable<EnvironmentStats> fastStats;
+ private final CachedCallable<SpaceUtilizationStats> fastSpaceStats;
+ private final boolean exposeSpaceStats;
- public BdbEnvironmentStats(Environment environment, long ttlMs) {
+ public BdbEnvironmentStats(Environment environment, long ttlMs, boolean exposeSpaceUtil) {
this.environment = environment;
+ this.exposeSpaceStats = exposeSpaceUtil;
Callable<EnvironmentStats> fastStatsCallable = new Callable<EnvironmentStats>() {
public EnvironmentStats call() throws Exception {
@@ -26,6 +29,14 @@ public EnvironmentStats call() throws Exception {
}
};
fastStats = new CachedCallable<EnvironmentStats>(fastStatsCallable, ttlMs);
+
+ Callable<SpaceUtilizationStats> fastDbStatsCallable = new Callable<SpaceUtilizationStats>() {
+
+ public SpaceUtilizationStats call() throws Exception {
+ return getSpaceUtilizationStats();
+ }
+ };
+ fastSpaceStats = new CachedCallable<SpaceUtilizationStats>(fastDbStatsCallable, ttlMs);
}
private EnvironmentStats getEnvironmentStats(boolean fast) {
@@ -34,6 +45,18 @@ private EnvironmentStats getEnvironmentStats(boolean fast) {
return environment.getStats(config);
}
+ private SpaceUtilizationStats getSpaceUtilizationStats() {
+ return new SpaceUtilizationStats(environment);
+ }
+
+ private SpaceUtilizationStats getFastSpaceUtilizationStats() {
+ try {
+ return fastSpaceStats.call();
+ } catch(Exception e) {
+ throw new VoldemortException(e);
+ }
+ }
+
private EnvironmentStats getFastStats() {
try {
return fastStats.call();
@@ -47,6 +70,8 @@ public String getFastStatsAsString() {
return getFastStats().toString();
}
+ // 1. Caching
+
@JmxGetter(name = "NumCacheMiss")
public long getNumCacheMiss() {
return getFastStats().getNCacheMiss();
@@ -57,6 +82,76 @@ public long getNumNotResident() {
return getFastStats().getNNotResident();
}
+ @JmxGetter(name = "TotalCacheSize")
+ public long getTotalCacheSize() {
+ return getFastStats().getSharedCacheTotalBytes();
+ }
+
+ @JmxGetter(name = "AllotedCacheSize")
+ public long getAllotedCacheSize() {
+ return getFastStats().getCacheTotalBytes();
+ }
+
+ @JmxGetter(name = "BINFetches")
+ public long getBINFetches() {
+ return getFastStats().getNBINsFetch();
+ }
+
+ @JmxGetter(name = "BINFetchMisses")
+ public long getBINFetchMisses() {
+ return getFastStats().getNBINsFetchMiss();
+ }
+
+ @JmxGetter(name = "INFetches")
+ public long getINFetches() {
+ return getFastStats().getNUpperINsFetch();
+ }
+
+ @JmxGetter(name = "INFetchMisses")
+ public long getINFetchMisses() {
+ return getFastStats().getNUpperINsFetchMiss();
+ }
+
+ @JmxGetter(name = "LNFetches")
+ public long getLNFetches() {
+ return getFastStats().getNLNsFetch();
+ }
+
+ @JmxGetter(name = "LNFetchMisses")
+ public long getLNFetchMisses() {
+ return getFastStats().getNLNsFetchMiss();
+ }
+
+ @JmxGetter(name = "CachedBINs")
+ public long getCachedBINs() {
+ return getFastStats().getNCachedBINs();
+ }
+
+ @JmxGetter(name = "CachedINs")
+ public long getCachedUpperINs() {
+ return getFastStats().getNCachedUpperINs();
+ }
+
+ @JmxGetter(name = "EvictedBINs")
+ public long getEvictedBINs() {
+ EnvironmentStats stats = getFastStats();
+ return stats.getNBINsEvictedCacheMode() + stats.getNBINsEvictedCritical()
+ + stats.getNBINsEvictedDaemon() + stats.getNBINsEvictedManual();
+ }
+
+ @JmxGetter(name = "EvictedINs")
+ public long getEvictedINs() {
+ EnvironmentStats stats = getFastStats();
+ return stats.getNUpperINsEvictedCacheMode() + stats.getNUpperINsEvictedCritical()
+ + stats.getNUpperINsEvictedDaemon() + stats.getNUpperINsEvictedManual();
+ }
+
+ @JmxGetter(name = "EvictionPasses")
+ public long getEvictedLNs() {
+ return getFastStats().getNEvictPasses();
+ }
+
+ // 2. IO
@JmxGetter(name = "NumRandomWrites")
public long getNumRandomWrites() {
return getFastStats().getNRandomWrites();
@@ -97,6 +192,18 @@ public long getNumSequentialReadBytes() {
return getFastStats().getNSequentialReadBytes();
}
+ @JmxGetter(name = "NumFSyncs")
+ public long getNumFSyncs() {
+ return getFastStats().getNFSyncs();
+ }
+
+ // 3. Cleaning & Checkpointing
+
+ @JmxGetter(name = "NumCleanerEntriesRead")
+ public long getNumCleanerEntriesRead() {
+ return getFastStats().getNCleanerEntriesRead();
+ }
+
@JmxGetter(name = "FileDeletionBacklog")
public long getFileDeletionBacklog() {
return getFastStats().getFileDeletionBacklog();
@@ -115,14 +222,14 @@ public long getCleanerBacklog() {
return getFastStats().getCleanerBacklog();
}
- @JmxGetter(name = "NumAcquiresWithContention")
- public long getNumAcquiresWithContention() {
- return getFastStats().getNAcquiresWithContention();
+ @JmxGetter(name = "NumCleanerRuns")
+ public long getNumCleanerRuns() {
+ return getFastStats().getNCleanerRuns();
}
- @JmxGetter(name = "NumAcquiresNoWaiters")
- public long getNumAcquiresNoWaiters() {
- return getFastStats().getNAcquiresNoWaiters();
+ @JmxGetter(name = "NumCleanerDeletions")
+ public long getNumCleanerDeletions() {
+ return getFastStats().getNCleanerRuns();
}
@JmxGetter(name = "NumCheckpoints")
@@ -130,42 +237,65 @@ public long getNumCheckpoints() {
return getFastStats().getNCheckpoints();
}
- @JmxGetter(name = "NumCleanerEntriesRead")
- public long getNumCleanerEntriesRead() {
- return getFastStats().getNCleanerEntriesRead();
+ @JmxGetter(name = "TotalSpace")
+ public long getTotalSpace() {
+ if(this.exposeSpaceStats)
+ return getFastSpaceUtilizationStats().getTotalSpaceUsed();
+ else
+ return 0;
}
- @JmxGetter(name = "NumFSyncs")
- public long getNumFSyncs() {
- return getFastStats().getNFSyncs();
+ @JmxGetter(name = "TotalSpaceUtilized")
+ public long getTotalSpaceUtilized() {
+ if(this.exposeSpaceStats)
+ return getFastSpaceUtilizationStats().getTotalSpaceUtilized();
+ else
+ return 0;
}
- @JmxGetter(name = "NumCleanerRuns")
- public long getNumCleanerRuns() {
- return getFastStats().getNCleanerRuns();
+ @JmxGetter(name = "UtilizationSummary", description = "Displays the disk space utilization for an environment.")
+ public String getUtilizationSummaryAsString() {
+ return getFastSpaceUtilizationStats().getSummariesAsString();
}
- @JmxGetter(name = "NumCleanerDeletions")
- public long getNumCleanerDeletions() {
- return getFastStats().getNCleanerRuns();
+ // 4. Latching/Locking
+
+ @JmxGetter(name = "BtreeLatches")
+ public long getBtreeLatches() {
+ return getFastStats().getRelatchesRequired();
+ }
+
+ @JmxGetter(name = "NumAcquiresWithContention")
+ public long getNumAcquiresWithContention() {
+ return getFastStats().getNAcquiresWithContention();
+ }
+
+ @JmxGetter(name = "NumAcquiresNoWaiters")
+ public long getNumAcquiresNoWaiters() {
+ return getFastStats().getNAcquiresNoWaiters();
}
- // Compound statistics
+ // Compound statistics derived from raw statistics
@JmxGetter(name = "NumWritesTotal")
public long getNumWritesTotal() {
return getNumRandomWrites() + getNumSequentialWrites();
}
+ @JmxGetter(name = "NumWriteBytesTotal")
+ public long getNumWriteBytesTotal() {
+ return getNumSequentialWriteBytes() + getNumRandomWriteBytes();
+ }
+
@JmxGetter(name = "PercentRandomWrites")
public double getPercentRandomWrites() {
- return safeGetPercentage(getNumRandomWrites(), getNumWritesTotal());
+ return safeGetPercentage(getNumRandomWrites(), getNumWritesTotal());
}
@JmxGetter(name = "PercentageRandomWriteBytes")
public double getPercentageRandomWriteBytes() {
- return safeGetPercentage(getNumRandomWriteBytes(), getNumRandomWriteBytes() +
- getNumSequentialWriteBytes());
+ return safeGetPercentage(getNumRandomWriteBytes(), getNumRandomWriteBytes()
+ + getNumSequentialWriteBytes());
}
@JmxGetter(name = "NumReadsTotal")
@@ -173,6 +303,11 @@ public long getNumReadsTotal() {
return getNumRandomReads() + getNumSequentialReads();
}
+ @JmxGetter(name = "NumReadBytesTotal")
+ public long getNumReadBytesTotal() {
+ return getNumRandomReadBytes() + getNumSequentialReadBytes();
+ }
+
@JmxGetter(name = "PercentageRandomReads")
public double getPercentageRandomReads() {
return safeGetPercentage(getNumRandomReads(), getNumReadsTotal());
@@ -180,8 +315,19 @@ public double getPercentageRandomReads() {
@JmxGetter(name = "PercentageRandomReadBytes")
public double getPercentageRandomReadBytes() {
- return safeGetPercentage(getNumRandomWriteBytes(), getNumRandomReadBytes() +
- getNumSequentialReadBytes());
+ return safeGetPercentage(getNumRandomWriteBytes(), getNumRandomReadBytes()
+ + getNumSequentialReadBytes());
+ }
+
+ @JmxGetter(name = "PercentageReads")
+ public double getPercentageReads() {
+ return safeGetPercentage(getNumReadsTotal(), getNumReadsTotal() + getNumWritesTotal());
+ }
+
+ @JmxGetter(name = "PercentageReadBytes")
+ public double getPercentageReadBytes() {
+ return safeGetPercentage(getNumReadBytesTotal(), getNumWriteBytesTotal()
+ + getNumReadBytesTotal());
}
@Experimental
@@ -193,17 +339,36 @@ public double getPercentageCacheHits() {
@Experimental
@JmxGetter(name = "PercentageCacheMisses")
public double getPercentageCacheMisses() {
- return safeGetPercentage(getNumCacheMiss(),
- getNumReadsTotal() + getNumWritesTotal());
+ return safeGetPercentage(getNumCacheMiss(), getNumReadsTotal() + getNumWritesTotal());
}
@JmxGetter(name = "PercentageContended")
public double getPercentageContended() {
- return safeGetPercentage(getNumAcquiresWithContention(),
- getNumAcquiresWithContention() + getNumAcquiresNoWaiters());
+ return safeGetPercentage(getNumAcquiresWithContention(), getNumAcquiresWithContention()
+ + getNumAcquiresNoWaiters());
+ }
+
+ @JmxGetter(name = "PercentageBINMiss")
+ public double getPercentageBINMiss() {
+ return safeGetPercentage(getBINFetchMisses(), getBINFetches());
+ }
+
+ @JmxGetter(name = "PercentageINMiss")
+ public double getPercentageINMiss() {
+ return safeGetPercentage(getINFetchMisses(), getINFetches());
+ }
+
+ @JmxGetter(name = "PercentageLNMiss")
+ public double getPercentageLNMiss() {
+ return safeGetPercentage(getLNFetchMisses(), getLNFetches());
+ }
+
+ @JmxGetter(name = "PercentageUtilization")
+ public double getPercentageUtilization() {
+ return safeGetPercentage(getTotalSpaceUtilized(), getTotalSpace());
}
public static double safeGetPercentage(long rawNum, long total) {
- return total == 0 ? 0.0d : rawNum / (float)total;
+ return total == 0 ? 0.0d : rawNum / (float) total;
}
}
View
70 src/java/voldemort/store/bdb/stats/SpaceUtilizationStats.java
@@ -0,0 +1,70 @@
+package voldemort.store.bdb.stats;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+
+import com.sleepycat.je.DbInternal;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.cleaner.FileSummary;
+import com.sleepycat.je.cleaner.UtilizationProfile;
+import com.sleepycat.je.dbi.EnvironmentImpl;
+
+/**
+ * Obtains the disk space utilization for the BDB environment
+ */
+final public class SpaceUtilizationStats {
+
+ private final EnvironmentImpl envImpl;
+
+ private SortedMap<Long, FileSummary> summaryMap;
+ private long totalSpaceUsed = 0;
+ private long totalSpaceUtilized = 0;
+
+ public SpaceUtilizationStats(Environment env) {
+ this(DbInternal.getEnvironmentImpl(env));
+ }
+
+ private SpaceUtilizationStats(EnvironmentImpl envImpl) {
+ this.envImpl = envImpl;
+ UtilizationProfile profile = this.envImpl.getUtilizationProfile();
+ summaryMap = profile.getFileSummaryMap(true);
+
+ Iterator<Map.Entry<Long, FileSummary>> fileItr = summaryMap.entrySet().iterator();
+ while(fileItr.hasNext()) {
+ Map.Entry<Long, FileSummary> entry = fileItr.next();
+ FileSummary fs = entry.getValue();
+ totalSpaceUsed += fs.totalSize;
+ totalSpaceUtilized += fs.totalSize - fs.getObsoleteSize();
+ }
+ }
+
+ public long getTotalSpaceUsed() {
+ return totalSpaceUsed;
+ }
+
+ public long getTotalSpaceUtilized() {
+ return totalSpaceUtilized;
+ }
+
+ public String getSummariesAsString() {
+ StringBuffer summaryDetails = new StringBuffer();
+ if(summaryMap != null) {
+ summaryDetails.append("file,util%\n");
+ Iterator<Map.Entry<Long, FileSummary>> fileItr = summaryMap.entrySet().iterator();
+ while(fileItr.hasNext()) {
+ Map.Entry<Long, FileSummary> entry = fileItr.next();
+ FileSummary fs = entry.getValue();
+ long bytesUsed = fs.totalSize - fs.getObsoleteSize();
+ summaryDetails.append(String.format("%s,%f\n",
+ Long.toHexString(entry.getKey().longValue()),
+ BdbEnvironmentStats.safeGetPercentage(bytesUsed,
+ fs.totalSize)));
+ }
+ summaryDetails.append(String.format("total,%f\n",
+ BdbEnvironmentStats.safeGetPercentage(totalSpaceUtilized,
+ totalSpaceUsed)));
+ }
+ return summaryDetails.toString();
+ }
+}
View
84 src/java/voldemort/utils/JNAUtils.java
@@ -0,0 +1,84 @@
+package voldemort.utils;
+
+import org.apache.log4j.Logger;
+
+import com.sun.jna.LastErrorException;
+import com.sun.jna.Native;
+
+/**
+ * Native functions used through JNA
+ *
+ */
+public class JNAUtils {
+
+ private static final Logger logger = Logger.getLogger(JNAUtils.class);
+
+ /* Flags for mlock_all */
+ private static final int MCL_CURRENT = 1;
+ private static final int MCL_FUTURE = 2;
+
+ private static final int ENOMEM = 12;
+
+ static {
+ try {
+ Native.register("c");
+ } catch(NoClassDefFoundError e) {
+ logger.info("Could not locate JNA classes");
+ } catch(UnsatisfiedLinkError e) {
+ logger.info("Failed to link to native library");
+ } catch(NoSuchMethodError e) {
+ logger.warn("Older version of JNA. Please upgrade to 3.2.7+");
+ }
+ }
+
+ private static native int mlockall(int flags) throws LastErrorException;
+
+ private static native int munlockall() throws LastErrorException;
+
+ private static boolean isOperatingSystem(String os) {
+ if(System.getProperty("os.name").toLowerCase().contains(os))
+ return true;
+ else
+ return false;
+ }
+
+ public static void tryMlockall() {
+ try {
+ if(isOperatingSystem("windows"))
+ return;
+ // Since we demand-zero every page of the heap while bringing up the
+ // jvm, MCL_FUTURE is not needed
+ mlockall(MCL_CURRENT);
+ logger.info("mlockall() on JVM Heap successful");
+ } catch(Exception e) {
+ if(!(e instanceof LastErrorException))
+ logger.error("Unexpected error during mlock of server heap", e);
+
+ LastErrorException le = (LastErrorException) e;
+ if(le.getErrorCode() == ENOMEM && isOperatingSystem("linux")) {
+ logger.warn("Unable to lock JVM memory (ENOMEM)."
+ + " This can result in part of the JVM being swapped out with higher Young gen stalls"
+ + " Increase RLIMIT_MEMLOCK or run Voldemort as root.");
+ } else if(!isOperatingSystem("mac")) {
+ // fixes a OS X oddity, where it still throws an error, even
+ // though mlockall succeeds
+ logger.warn("Unknown mlockall error " + le.getErrorCode());
+ }
+ }
+ }
+
+ public static void tryMunlockall() {
+ try {
+ if(isOperatingSystem("windows"))
+ return;
+ munlockall();
+ logger.info("munlockall() on JVM Heap successful");
+ } catch(Exception e) {
+ if(!(e instanceof LastErrorException))
+ logger.error("Unexpected error during mlock of server heap", e);
+ LastErrorException le = (LastErrorException) e;
+ logger.warn("Error unlocking JVM heap " + le.getErrorCode());
+ }
+ }
+
+}
View
4 test/integration/voldemort/CatBdbStore.java
@@ -33,9 +33,9 @@
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockMode;
public class CatBdbStore {
@@ -51,7 +51,7 @@ public static void main(String[] args) throws Exception {
VoldemortConfig config = new VoldemortConfig(new Props(new File(serverProperties)));
EnvironmentConfig environmentConfig = new EnvironmentConfig();
- environmentConfig.setTxnNoSync(true);
+ environmentConfig.setDurability(Durability.COMMIT_NO_SYNC);
environmentConfig.setAllowCreate(true);
environmentConfig.setTransactional(config.isBdbWriteTransactionsEnabled());
Environment environment = new Environment(new File(bdbDir), environmentConfig);
View
3 test/integration/voldemort/performance/BdbGrowth.java
@@ -29,6 +29,7 @@
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
@@ -56,7 +57,7 @@ public static void main(String[] args) throws Exception {
environmentConfig = new EnvironmentConfig();
environmentConfig.setCacheSize(cacheSize);
- environmentConfig.setTxnNoSync(true);
+ environmentConfig.setDurability(Durability.COMMIT_NO_SYNC);
environmentConfig.setConfigParam(EnvironmentConfig.LOG_FILE_MAX, "1000000000");
environmentConfig.setConfigParam(EnvironmentConfig.CLEANER_MAX_BATCH_FILES, "100");
environmentConfig.setConfigParam(EnvironmentConfig.CLEANER_READ_SIZE, "52428800");
View
16 test/unit/voldemort/store/bdb/BdbSplitStorageEngineTest.java
@@ -30,10 +30,10 @@
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentStats;
-import com.sleepycat.je.LockMode;
import com.sleepycat.je.StatsConfig;
/**
@@ -159,7 +159,7 @@ public void testMultipleEnvironment() {
public void testUnsharedCache() throws DatabaseException {
EnvironmentConfig environmentConfig = new EnvironmentConfig();
environmentConfig = new EnvironmentConfig();
- environmentConfig.setTxnNoSync(true);
+ environmentConfig.setDurability(Durability.COMMIT_NO_SYNC);
environmentConfig.setAllowCreate(true);
environmentConfig.setTransactional(true);
environmentConfig.setSharedCache(false);
@@ -178,7 +178,7 @@ public void testUnsharedCache() throws DatabaseException {
public void testSharedCache() throws DatabaseException {
EnvironmentConfig environmentConfig = new EnvironmentConfig();
- environmentConfig.setTxnNoSync(true);
+ environmentConfig.setDurability(Durability.COMMIT_NO_SYNC);
environmentConfig.setAllowCreate(true);
environmentConfig.setTransactional(true);
environmentConfig.setSharedCache(true);
@@ -201,7 +201,10 @@ private long getMaxCacheUsage(EnvironmentConfig environmentConfig, DatabaseConfi
}
Environment environmentA = new Environment(dirA, environmentConfig);
Database databaseA = environmentA.openDatabase(null, "storeA", databaseConfig);
- BdbStorageEngine storeA = new BdbStorageEngine("storeA", environmentA, databaseA, new BdbRuntimeConfig());
+ BdbStorageEngine storeA = new BdbStorageEngine("storeA",
+ environmentA,
+ databaseA,
+ new BdbRuntimeConfig());
File dirB = new File(bdbMasterDir + "/" + "storeB");
if(!dirB.exists()) {
@@ -209,7 +212,10 @@ private long getMaxCacheUsage(EnvironmentConfig environmentConfig, DatabaseConfi
}
Environment environmentB = new Environment(dirB, environmentConfig);
Database databaseB = environmentB.openDatabase(null, "storeB", databaseConfig);
- BdbStorageEngine storeB = new BdbStorageEngine("storeB", environmentB, databaseB, new BdbRuntimeConfig());
+ BdbStorageEngine storeB = new BdbStorageEngine("storeB",
+ environmentB,
+ databaseB,
+ new BdbRuntimeConfig());