Skip to content

Commit

Permalink
Add throttle option to versioned put prune job
Browse files Browse the repository at this point in the history
  • Loading branch information
voldemort authored and vinothchandar committed Aug 29, 2013
1 parent 4642417 commit c6d9720
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
19 changes: 19 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -239,6 +239,7 @@ public class VoldemortConfig implements Serializable {
private int maxHttpAggregatedContentLength;

private int repairJobMaxKeysScannedPerSec;
private int versionPruneJobMaxKeysScannedPerSec;

public VoldemortConfig(Properties props) {
this(new Props(props));
Expand Down Expand Up @@ -526,6 +527,8 @@ public VoldemortConfig(Props props) {

this.repairJobMaxKeysScannedPerSec = props.getInt("repairjob.max.keys.scanned.per.sec",
Integer.MAX_VALUE);
this.versionPruneJobMaxKeysScannedPerSec = props.getInt("versionprunejob.max.keys.scanned.per.sec",
Integer.MAX_VALUE);

validateParams();
}
Expand Down Expand Up @@ -2921,4 +2924,20 @@ public int getRepairJobMaxKeysScannedPerSec() {
public void setRepairJobMaxKeysScannedPerSec(int maxKeysPerSecond) {
this.repairJobMaxKeysScannedPerSec = maxKeysPerSecond;
}

public int getVersionPruneJobMaxKeysScannedPerSec() {
return versionPruneJobMaxKeysScannedPerSec;
}

/**
* Global throttle limit for versioned put prune jobs
*
* <ul>
* <li>Property :"versionprunejob.max.keys.scanned.per.sec"</li>
* <li>Default : Integer.MAX_VALUE (unthrottled)</li>
* </ul>
*/
public void setVersionPruneJobMaxKeysScannedPerSec(int maxKeysPerSecond) {
this.versionPruneJobMaxKeysScannedPerSec = maxKeysPerSecond;
}
}
8 changes: 6 additions & 2 deletions src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -345,7 +345,10 @@ protected void startInner() {
// Create a repair job object and register it with Store repository
if(voldemortConfig.isRepairEnabled()) {
logger.info("Initializing repair job.");
RepairJob job = new RepairJob(storeRepository, metadata, scanPermitWrapper, voldemortConfig.getRepairJobMaxKeysScannedPerSec());
RepairJob job = new RepairJob(storeRepository,
metadata,
scanPermitWrapper,
voldemortConfig.getRepairJobMaxKeysScannedPerSec());
JmxUtils.registerMbean(job, JmxUtils.createObjectName(job.getClass()));
storeRepository.registerRepairJob(job);
}
Expand All @@ -355,7 +358,8 @@ protected void startInner() {
logger.info("Intializing prune job");
VersionedPutPruneJob job = new VersionedPutPruneJob(storeRepository,
metadata,
scanPermitWrapper);
scanPermitWrapper,
voldemortConfig.getVersionPruneJobMaxKeysScannedPerSec());
JmxUtils.registerMbean(job, JmxUtils.createObjectName(job.getClass()));
storeRepository.registerPruneJob(job);
}
Expand Down
8 changes: 6 additions & 2 deletions src/java/voldemort/server/storage/VersionedPutPruneJob.java
Expand Up @@ -37,6 +37,8 @@
import voldemort.versioning.VectorClockUtils;
import voldemort.versioning.Versioned;

import com.google.common.primitives.Ints;

/**
* Voldemort supports a "versioned" put interface, where the user can provide a
* vector clock, generated outside of Voldemort. A common practice is to create
Expand Down Expand Up @@ -96,8 +98,9 @@ public class VersionedPutPruneJob extends DataMaintenanceJob {

public VersionedPutPruneJob(StoreRepository storeRepo,
MetadataStore metadataStore,
ScanPermitWrapper repairPermits) {
super(storeRepo, metadataStore, repairPermits);
ScanPermitWrapper repairPermits,
int maxKeysScannedPerSecond) {
super(storeRepo, metadataStore, repairPermits, maxKeysScannedPerSecond);
}

public void setStoreName(String storeName) {
Expand Down Expand Up @@ -162,6 +165,7 @@ public void run() {
numPrunedKeys = this.numKeysUpdatedThisRun.incrementAndGet();
}
itemsScanned = this.numKeysScannedThisRun.incrementAndGet();
throttler.maybeThrottle(Ints.checkedCast(itemsScanned));
if(itemsScanned % STAT_RECORDS_INTERVAL == 0)
logger.info("#Scanned:" + itemsScanned + " #Pruned:" + numPrunedKeys);
} catch(Exception e) {
Expand Down

0 comments on commit c6d9720

Please sign in to comment.