diff --git a/src/java/voldemort/server/scheduler/DataCleanupJob.java b/src/java/voldemort/server/scheduler/DataCleanupJob.java index e5fdc67a40..185b8f82b2 100644 --- a/src/java/voldemort/server/scheduler/DataCleanupJob.java +++ b/src/java/voldemort/server/scheduler/DataCleanupJob.java @@ -65,30 +65,25 @@ public void run() { int deleted = 0; long now = time.getMilliseconds(); iterator = store.entries(); - try { - while(iterator.hasNext()) { - // check if we have been interrupted - if(Thread.currentThread().isInterrupted()) { - logger.info("Datacleanup job halted."); - return; - } - Pair> keyAndVal = iterator.next(); - VectorClock clock = (VectorClock) keyAndVal.getSecond().getVersion(); - if(now - clock.getTimestamp() > maxAgeMs) { - store.delete(keyAndVal.getFirst(), clock); - deleted++; - if(deleted % 10000 == 0) - logger.debug("Deleted item " + deleted); - } - throttler.maybeThrottle(clock.sizeInBytes()); + + while(iterator.hasNext()) { + // check if we have been interrupted + if(Thread.currentThread().isInterrupted()) { + logger.info("Datacleanup job halted."); + return; } - } catch(RuntimeException e) { - iterator.close(); - logger.error("Error during data cleanup", e); - throw e; - } finally { - if(iterator != null) - iterator.close(); + + Pair> keyAndVal = iterator.next(); + VectorClock clock = (VectorClock) keyAndVal.getSecond().getVersion(); + if(now - clock.getTimestamp() > maxAgeMs) { + store.delete(keyAndVal.getFirst(), clock); + deleted++; + if(deleted % 10000 == 0) + logger.debug("Deleted item " + deleted); + } + + // throttle on number of entries. + throttler.maybeThrottle(1); } logger.info("Data cleanup on store \"" + store.getName() + "\" is complete; " + deleted + " items deleted."); diff --git a/src/java/voldemort/server/storage/StorageService.java b/src/java/voldemort/server/storage/StorageService.java index 3953d9831c..4a1fc0100f 100644 --- a/src/java/voldemort/server/storage/StorageService.java +++ b/src/java/voldemort/server/storage/StorageService.java @@ -26,12 +26,17 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; +import javax.management.MBeanOperationInfo; + import org.apache.log4j.Logger; import voldemort.VoldemortException; import voldemort.annotations.jmx.JmxManaged; +import voldemort.annotations.jmx.JmxOperation; import voldemort.client.ClientThreadPool; import voldemort.cluster.Cluster; import voldemort.cluster.Node; @@ -187,6 +192,7 @@ public void registerEngine(StorageEngine engine) { if(voldemortConfig.isJmxEnabled()) JmxUtils.registerMbean(store.getName(), store); } + storeRepository.addLocalStore(store); } @@ -241,14 +247,13 @@ private void scheduleCleanupJob(StoreDefinition storeDef, // allow only one cleanup job at a time Date startTime = cal.getTime(); - Integer maxReadRate = storeDef.getRetentionThrottleRate(); + int maxReadRate = storeDef.hasRetentionScanThrottleRate() ? storeDef.getRetentionScanThrottleRate() + : Integer.MAX_VALUE; + logger.info("Scheduling data retention cleanup job for store '" + storeDef.getName() - + "' at " + startTime + "." - + (null == maxReadRate ? "" : " Max reading rate: " + maxReadRate + " Bytes/S")); - // If no throttle parameter was given, use MAX VALUE as the allowed MB/S - if(null == maxReadRate) { - maxReadRate = Integer.MAX_VALUE; - } + + "' at " + startTime + " with retention scan throttle rate:" + maxReadRate + + " Entries/second."); + IoThrottler throttler = new IoThrottler(maxReadRate); Runnable cleanupJob = new DataCleanupJob(engine, @@ -336,4 +341,51 @@ public StoreRepository getStoreRepository() { return this.storeRepository; } + @JmxOperation(description = "Force cleanup of old data based on retention policy, allows override of throttle-rate", impact = MBeanOperationInfo.ACTION) + public void forceCleanupOldData(String storeName) { + StoreDefinition storeDef = getMetadataStore().getStoreDef(storeName); + int throttleRate = storeDef.hasRetentionScanThrottleRate() ? storeDef.getRetentionScanThrottleRate() + : Integer.MAX_VALUE; + + forceCleanupOldDataThrottled(storeName, throttleRate); + } + + @JmxOperation(description = "Force cleanup of old data based on retention policy.", impact = MBeanOperationInfo.ACTION) + public void forceCleanupOldDataThrottled(String storeName, int entryScanThrottleRate) { + logger.info("forceCleanupOldData() called for store " + storeName + + " with retention scan throttle rate:" + entryScanThrottleRate + + " Entries/second."); + + try { + StoreDefinition storeDef = getMetadataStore().getStoreDef(storeName); + StorageEngine engine = storeRepository.getStorageEngine(storeName); + + if(null != engine) { + if(storeDef.hasRetentionPeriod()) { + ExecutorService executor = Executors.newFixedThreadPool(1); + try { + if(cleanupPermits.availablePermits() >= 1) { + + executor.execute(new DataCleanupJob(engine, + cleanupPermits, + storeDef.getRetentionDays() + * Time.MS_PER_DAY, + SystemTime.INSTANCE, + new IoThrottler(entryScanThrottleRate))); + } else { + logger.error("forceCleanupOldData() No permit available to run cleanJob already running multiple instance." + + engine.getName()); + } + } finally { + executor.shutdown(); + } + } else { + logger.error("forceCleanupOldData() No retention policy found for " + storeName); + } + } + } catch(Exception e) { + logger.error("Error while running forceCleanupOldData()", e); + throw new VoldemortException(e); + } + } } diff --git a/src/java/voldemort/store/StoreDefinition.java b/src/java/voldemort/store/StoreDefinition.java index 7c47d0cbb1..c7152ded77 100644 --- a/src/java/voldemort/store/StoreDefinition.java +++ b/src/java/voldemort/store/StoreDefinition.java @@ -45,7 +45,7 @@ public class StoreDefinition implements Serializable { private final Integer preferredReads; private final int requiredReads; private final Integer retentionPeriodDays; - private final Integer retentionThrottleRate; + private final Integer retentionScanThrottleRate; private final String routingStrategyType; public StoreDefinition(String name, @@ -72,7 +72,7 @@ public StoreDefinition(String name, this.keySerializer = Utils.notNull(keySerializer); this.valueSerializer = Utils.notNull(valueSerializer); this.retentionPeriodDays = retentionDays; - this.retentionThrottleRate = retentionThrottleRate; + this.retentionScanThrottleRate = retentionThrottleRate; this.routingStrategyType = routingStrategyType; checkParameterLegality(); } @@ -165,12 +165,12 @@ public Integer getRetentionDays() { return this.retentionPeriodDays; } - public boolean hasRetentionThrottleRate() { - return this.retentionThrottleRate != null; + public boolean hasRetentionScanThrottleRate() { + return this.retentionScanThrottleRate != null; } - public Integer getRetentionThrottleRate() { - return this.retentionThrottleRate; + public Integer getRetentionScanThrottleRate() { + return this.retentionScanThrottleRate; } @Override @@ -193,7 +193,7 @@ && getKeySerializer().equals(def.getKeySerializer()) && getValueSerializer().equals(def.getValueSerializer()) && getRoutingPolicy() == def.getRoutingPolicy() && Objects.equal(getRetentionDays(), def.getRetentionDays()) - && Objects.equal(getRetentionThrottleRate(), def.getRetentionThrottleRate()); + && Objects.equal(getRetentionScanThrottleRate(), def.getRetentionScanThrottleRate()); } @Override @@ -209,6 +209,6 @@ public int hashCode() { getPreferredReads(), getPreferredWrites(), getRetentionDays(), - getRetentionThrottleRate()); + getRetentionScanThrottleRate()); } } diff --git a/src/java/voldemort/xml/StoreDefinitionsMapper.java b/src/java/voldemort/xml/StoreDefinitionsMapper.java index 7f56713664..01ee6c0937 100644 --- a/src/java/voldemort/xml/StoreDefinitionsMapper.java +++ b/src/java/voldemort/xml/StoreDefinitionsMapper.java @@ -73,7 +73,7 @@ public class StoreDefinitionsMapper { public final static String STORE_REQUIRED_READS_ELMT = "required-reads"; public final static String STORE_PREFERRED_READS_ELMT = "preferred-reads"; public final static String STORE_RETENTION_POLICY_ELMT = "retention-days"; - public final static String STORE_RETENTION_THROTTLE_RATE_ELMT = "retention-throttle-rate"; + public final static String STORE_RETENTION_SCAN_THROTTLE_RATE_ELMT = "retention-scan-throttle-rate"; public final static String STORE_ROUTING_STRATEGY = "routing-strategy"; private final static String STORE_VERSION_ATTR = "version"; @@ -165,7 +165,7 @@ private StoreDefinition readStore(Element store) { Integer retentionThrottleRate = null; if(retention != null) { retentionPolicyDays = Integer.parseInt(retention.getText()); - Element throttleRate = store.getChild(STORE_RETENTION_THROTTLE_RATE_ELMT); + Element throttleRate = store.getChild(STORE_RETENTION_SCAN_THROTTLE_RATE_ELMT); if(throttleRate != null) retentionThrottleRate = Integer.parseInt(throttleRate.getText()); } @@ -256,8 +256,8 @@ private Element toElement(StoreDefinition storeDefinition) { if(storeDefinition.hasRetentionPeriod()) store.addContent(new Element(STORE_RETENTION_POLICY_ELMT).setText(Integer.toString(storeDefinition.getRetentionDays()))); - if(storeDefinition.hasRetentionThrottleRate()) - store.addContent(new Element(STORE_RETENTION_THROTTLE_RATE_ELMT).setText(Integer.toString(storeDefinition.getRetentionThrottleRate()))); + if(storeDefinition.hasRetentionScanThrottleRate()) + store.addContent(new Element(STORE_RETENTION_SCAN_THROTTLE_RATE_ELMT).setText(Integer.toString(storeDefinition.getRetentionScanThrottleRate()))); return store; } diff --git a/src/java/voldemort/xml/stores.xsd b/src/java/voldemort/xml/stores.xsd index 4ca52ae225..fa31444f3e 100644 --- a/src/java/voldemort/xml/stores.xsd +++ b/src/java/voldemort/xml/stores.xsd @@ -34,7 +34,7 @@ - diff --git a/test/integration/voldemort/CatBdbStore.java b/test/integration/voldemort/CatBdbStore.java index 694cfe5142..d609c77e35 100644 --- a/test/integration/voldemort/CatBdbStore.java +++ b/test/integration/voldemort/CatBdbStore.java @@ -20,11 +20,13 @@ import java.util.Iterator; import voldemort.serialization.StringSerializer; +import voldemort.server.VoldemortConfig; import voldemort.store.StorageEngine; import voldemort.store.bdb.BdbStorageEngine; import voldemort.store.serialized.SerializingStorageEngine; import voldemort.utils.ByteArray; import voldemort.utils.Pair; +import voldemort.utils.Props; import voldemort.utils.Utils; import voldemort.versioning.Versioned; @@ -36,20 +38,29 @@ public class CatBdbStore { public static void main(String[] args) throws Exception { - if(args.length != 1) - Utils.croak("USAGE: java " + CatBdbStore.class.getName() + " bdb_dir"); + if(args.length != 2) + Utils.croak("USAGE: java " + CatBdbStore.class.getName() + " bdb_dir" + " storeName" + + " server.properties.path"); + + String bdbDir = args[0]; + String storeName = args[1]; + String serverProperties = args[2]; + + VoldemortConfig config = new VoldemortConfig(new Props(new File(serverProperties))); EnvironmentConfig environmentConfig = new EnvironmentConfig(); environmentConfig.setTxnNoSync(true); environmentConfig.setAllowCreate(true); - environmentConfig.setTransactional(true); - Environment environment = new Environment(new File(args[0]), environmentConfig); + environmentConfig.setTransactional(config.isBdbWriteTransactionsEnabled()); + Environment environment = new Environment(new File(bdbDir), environmentConfig); DatabaseConfig databaseConfig = new DatabaseConfig(); databaseConfig.setAllowCreate(true); - databaseConfig.setTransactional(true); - databaseConfig.setSortedDuplicates(false); - Database database = environment.openDatabase(null, "test", databaseConfig); - StorageEngine store = new BdbStorageEngine("test", environment, database); + databaseConfig.setTransactional(config.isBdbWriteTransactionsEnabled()); + databaseConfig.setSortedDuplicates(config.isBdbSortedDuplicatesEnabled()); + Database database = environment.openDatabase(null, storeName, databaseConfig); + StorageEngine store = new BdbStorageEngine(storeName, + environment, + database); StorageEngine stringStore = new SerializingStorageEngine(store, new StringSerializer(), new StringSerializer());