Navigation Menu

Skip to content

Commit

Permalink
Add throttle option to repair job
Browse files Browse the repository at this point in the history
  • Loading branch information
voldemort authored and vinothchandar committed Aug 29, 2013
1 parent 175ab04 commit 9aa3b0d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
21 changes: 21 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -238,6 +238,8 @@ public class VoldemortConfig implements Serializable {
private int restServiceStorageThreadPoolQueueSize;
private int maxHttpAggregatedContentLength;

private int repairJobMaxKeysScannedPerSec;

public VoldemortConfig(Properties props) {
this(new Props(props));
}
Expand Down Expand Up @@ -521,6 +523,10 @@ public VoldemortConfig(Props props) {
numRestServiceStorageThreads);
this.maxHttpAggregatedContentLength = props.getInt("max.http.aggregated.content.length",
1048576);

this.repairJobMaxKeysScannedPerSec = props.getInt("repairjob.max.keys.scanned.per.sec",
Integer.MAX_VALUE);

validateParams();
}

Expand Down Expand Up @@ -2900,4 +2906,19 @@ public void setMaxHttpAggregatedContentLength(int maxHttpAggregatedContentLength
this.maxHttpAggregatedContentLength = maxHttpAggregatedContentLength;
}

public int getRepairJobMaxKeysScannedPerSec() {
return repairJobMaxKeysScannedPerSec;
}

/**
* Global throttle limit for repair jobs
*
* <ul>
* <li>Property :"repairjob.max.keys.scanned.per.sec"</li>
* <li>Default : Integer.MAX_VALUE (unthrottled)</li>
* </ul>
*/
public void setRepairJobMaxKeysScannedPerSec(int maxKeysPerSecond) {
this.repairJobMaxKeysScannedPerSec = maxKeysPerSecond;
}
}
12 changes: 11 additions & 1 deletion src/java/voldemort/server/storage/DataMaintenanceJob.java
Expand Up @@ -29,6 +29,7 @@
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.EventThrottler;
import voldemort.utils.Utils;

/**
Expand All @@ -49,16 +50,25 @@ public abstract class DataMaintenanceJob implements Runnable {
protected long totalKeysScanned = 0;
protected long totalKeysUpdated = 0;
protected AtomicBoolean isRunning;
protected final EventThrottler throttler;

public DataMaintenanceJob(StoreRepository storeRepo,
MetadataStore metadataStore,
ScanPermitWrapper scanPermits) {
ScanPermitWrapper scanPermits,
int maxRatePerSecond) {
this.storeRepo = storeRepo;
this.metadataStore = metadataStore;
this.scanPermits = Utils.notNull(scanPermits);
this.numKeysScannedThisRun = new AtomicLong(0);
this.numKeysUpdatedThisRun = new AtomicLong(0);
this.isRunning = new AtomicBoolean(false);
this.throttler = new EventThrottler(maxRatePerSecond);
}

public DataMaintenanceJob(StoreRepository storeRepo,
MetadataStore metadataStore,
ScanPermitWrapper scanPermits) {
this(storeRepo, metadataStore, scanPermits, Integer.MAX_VALUE);
}

@Override
Expand Down
9 changes: 7 additions & 2 deletions src/java/voldemort/server/storage/RepairJob.java
Expand Up @@ -31,6 +31,8 @@
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;

import com.google.common.primitives.Ints;

/**
* This is a background job that should be run after successful rebalancing. The
* job deletes all data that does not belong to the server.
Expand All @@ -43,8 +45,9 @@ public class RepairJob extends DataMaintenanceJob {

public RepairJob(StoreRepository storeRepo,
MetadataStore metadataStore,
ScanPermitWrapper repairPermits) {
super(storeRepo, metadataStore, repairPermits);
ScanPermitWrapper repairPermits,
int maxKeysScannedPerSecond) {
super(storeRepo, metadataStore, repairPermits, maxKeysScannedPerSecond);
}

@JmxOperation(description = "Start the Repair Job thread", impact = MBeanOperationInfo.ACTION)
Expand Down Expand Up @@ -98,6 +101,8 @@ public void run() {
numDeletedKeys = this.numKeysUpdatedThisRun.incrementAndGet();
}
itemsScanned = this.numKeysScannedThisRun.incrementAndGet();
// Throttle the itemsScanned
throttler.maybeThrottle(Ints.checkedCast(itemsScanned));
if(itemsScanned % STAT_RECORDS_INTERVAL == 0) {
logger.info("#Scanned:" + itemsScanned + " #Deleted:" + numDeletedKeys);
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -345,7 +345,7 @@ protected void startInner() {
// Create a repair job object and register it with Store repository
if(voldemortConfig.isRepairEnabled()) {
logger.info("Initializing repair job.");
RepairJob job = new RepairJob(storeRepository, metadata, scanPermitWrapper);
RepairJob job = new RepairJob(storeRepository, metadata, scanPermitWrapper, voldemortConfig.getRepairJobMaxKeysScannedPerSec());
JmxUtils.registerMbean(job, JmxUtils.createObjectName(job.getClass()));
storeRepository.registerRepairJob(job);
}
Expand Down

0 comments on commit 9aa3b0d

Please sign in to comment.