diff --git a/src/java/voldemort/server/VoldemortConfig.java b/src/java/voldemort/server/VoldemortConfig.java index a1a0da62e2..a2d4a2a1f3 100644 --- a/src/java/voldemort/server/VoldemortConfig.java +++ b/src/java/voldemort/server/VoldemortConfig.java @@ -238,6 +238,8 @@ public class VoldemortConfig implements Serializable { private int restServiceStorageThreadPoolQueueSize; private int maxHttpAggregatedContentLength; + private int repairJobMaxKeysScannedPerSec; + public VoldemortConfig(Properties props) { this(new Props(props)); } @@ -521,6 +523,10 @@ public VoldemortConfig(Props props) { numRestServiceStorageThreads); this.maxHttpAggregatedContentLength = props.getInt("max.http.aggregated.content.length", 1048576); + + this.repairJobMaxKeysScannedPerSec = props.getInt("repairjob.max.keys.scanned.per.sec", + Integer.MAX_VALUE); + validateParams(); } @@ -2900,4 +2906,19 @@ public void setMaxHttpAggregatedContentLength(int maxHttpAggregatedContentLength this.maxHttpAggregatedContentLength = maxHttpAggregatedContentLength; } + public int getRepairJobMaxKeysScannedPerSec() { + return repairJobMaxKeysScannedPerSec; + } + + /** + * Global throttle limit for repair jobs + * + * + */ + public void setRepairJobMaxKeysScannedPerSec(int maxKeysPerSecond) { + this.repairJobMaxKeysScannedPerSec = maxKeysPerSecond; + } } diff --git a/src/java/voldemort/server/storage/DataMaintenanceJob.java b/src/java/voldemort/server/storage/DataMaintenanceJob.java index 40d3a413ae..be5784452f 100644 --- a/src/java/voldemort/server/storage/DataMaintenanceJob.java +++ b/src/java/voldemort/server/storage/DataMaintenanceJob.java @@ -29,6 +29,7 @@ import voldemort.store.readonly.ReadOnlyStorageConfiguration; import voldemort.utils.ByteArray; import voldemort.utils.ClosableIterator; +import voldemort.utils.EventThrottler; import voldemort.utils.Utils; /** @@ -49,16 +50,25 @@ public abstract class DataMaintenanceJob implements Runnable { protected long totalKeysScanned = 0; protected long totalKeysUpdated = 0; protected AtomicBoolean isRunning; + protected final EventThrottler throttler; public DataMaintenanceJob(StoreRepository storeRepo, MetadataStore metadataStore, - ScanPermitWrapper scanPermits) { + ScanPermitWrapper scanPermits, + int maxRatePerSecond) { this.storeRepo = storeRepo; this.metadataStore = metadataStore; this.scanPermits = Utils.notNull(scanPermits); this.numKeysScannedThisRun = new AtomicLong(0); this.numKeysUpdatedThisRun = new AtomicLong(0); this.isRunning = new AtomicBoolean(false); + this.throttler = new EventThrottler(maxRatePerSecond); + } + + public DataMaintenanceJob(StoreRepository storeRepo, + MetadataStore metadataStore, + ScanPermitWrapper scanPermits) { + this(storeRepo, metadataStore, scanPermits, Integer.MAX_VALUE); } @Override diff --git a/src/java/voldemort/server/storage/RepairJob.java b/src/java/voldemort/server/storage/RepairJob.java index a2dc2793be..e3b962033a 100644 --- a/src/java/voldemort/server/storage/RepairJob.java +++ b/src/java/voldemort/server/storage/RepairJob.java @@ -31,6 +31,8 @@ import voldemort.utils.ByteArray; import voldemort.utils.ClosableIterator; +import com.google.common.primitives.Ints; + /** * This is a background job that should be run after successful rebalancing. The * job deletes all data that does not belong to the server. @@ -43,8 +45,9 @@ public class RepairJob extends DataMaintenanceJob { public RepairJob(StoreRepository storeRepo, MetadataStore metadataStore, - ScanPermitWrapper repairPermits) { - super(storeRepo, metadataStore, repairPermits); + ScanPermitWrapper repairPermits, + int maxKeysScannedPerSecond) { + super(storeRepo, metadataStore, repairPermits, maxKeysScannedPerSecond); } @JmxOperation(description = "Start the Repair Job thread", impact = MBeanOperationInfo.ACTION) @@ -98,6 +101,8 @@ public void run() { numDeletedKeys = this.numKeysUpdatedThisRun.incrementAndGet(); } itemsScanned = this.numKeysScannedThisRun.incrementAndGet(); + // Throttle the itemsScanned + throttler.maybeThrottle(Ints.checkedCast(itemsScanned)); if(itemsScanned % STAT_RECORDS_INTERVAL == 0) { logger.info("#Scanned:" + itemsScanned + " #Deleted:" + numDeletedKeys); } diff --git a/src/java/voldemort/server/storage/StorageService.java b/src/java/voldemort/server/storage/StorageService.java index 3597248f87..957f63738e 100644 --- a/src/java/voldemort/server/storage/StorageService.java +++ b/src/java/voldemort/server/storage/StorageService.java @@ -345,7 +345,7 @@ protected void startInner() { // Create a repair job object and register it with Store repository if(voldemortConfig.isRepairEnabled()) { logger.info("Initializing repair job."); - RepairJob job = new RepairJob(storeRepository, metadata, scanPermitWrapper); + RepairJob job = new RepairJob(storeRepository, metadata, scanPermitWrapper, voldemortConfig.getRepairJobMaxKeysScannedPerSec()); JmxUtils.registerMbean(job, JmxUtils.createObjectName(job.getClass())); storeRepository.registerRepairJob(job); }