Skip to content

Commit

Permalink
Merge branch 'li-r940'
Browse files Browse the repository at this point in the history
  • Loading branch information
bbansal committed Oct 6, 2009
2 parents 58df0dd + bfd86e8 commit 97ddd2b
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 51 deletions.
41 changes: 18 additions & 23 deletions src/java/voldemort/server/scheduler/DataCleanupJob.java
Expand Up @@ -65,30 +65,25 @@ public void run() {
int deleted = 0;
long now = time.getMilliseconds();
iterator = store.entries();
try {
while(iterator.hasNext()) {
// check if we have been interrupted
if(Thread.currentThread().isInterrupted()) {
logger.info("Datacleanup job halted.");
return;
}
Pair<K, Versioned<V>> keyAndVal = iterator.next();
VectorClock clock = (VectorClock) keyAndVal.getSecond().getVersion();
if(now - clock.getTimestamp() > maxAgeMs) {
store.delete(keyAndVal.getFirst(), clock);
deleted++;
if(deleted % 10000 == 0)
logger.debug("Deleted item " + deleted);
}
throttler.maybeThrottle(clock.sizeInBytes());

while(iterator.hasNext()) {
// check if we have been interrupted
if(Thread.currentThread().isInterrupted()) {
logger.info("Datacleanup job halted.");
return;
}
} catch(RuntimeException e) {
iterator.close();
logger.error("Error during data cleanup", e);
throw e;
} finally {
if(iterator != null)
iterator.close();

Pair<K, Versioned<V>> keyAndVal = iterator.next();
VectorClock clock = (VectorClock) keyAndVal.getSecond().getVersion();
if(now - clock.getTimestamp() > maxAgeMs) {
store.delete(keyAndVal.getFirst(), clock);
deleted++;
if(deleted % 10000 == 0)
logger.debug("Deleted item " + deleted);
}

// throttle on number of entries.
throttler.maybeThrottle(1);
}
logger.info("Data cleanup on store \"" + store.getName() + "\" is complete; " + deleted
+ " items deleted.");
Expand Down
66 changes: 59 additions & 7 deletions src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -26,12 +26,17 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

import javax.management.MBeanOperationInfo;

import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.annotations.jmx.JmxManaged;
import voldemort.annotations.jmx.JmxOperation;
import voldemort.client.ClientThreadPool;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
Expand Down Expand Up @@ -187,6 +192,7 @@ public void registerEngine(StorageEngine<ByteArray, byte[]> engine) {
if(voldemortConfig.isJmxEnabled())
JmxUtils.registerMbean(store.getName(), store);
}

storeRepository.addLocalStore(store);
}

Expand Down Expand Up @@ -241,14 +247,13 @@ private void scheduleCleanupJob(StoreDefinition storeDef,
// allow only one cleanup job at a time
Date startTime = cal.getTime();

Integer maxReadRate = storeDef.getRetentionThrottleRate();
int maxReadRate = storeDef.hasRetentionScanThrottleRate() ? storeDef.getRetentionScanThrottleRate()
: Integer.MAX_VALUE;

logger.info("Scheduling data retention cleanup job for store '" + storeDef.getName()
+ "' at " + startTime + "."
+ (null == maxReadRate ? "" : " Max reading rate: " + maxReadRate + " Bytes/S"));
// If no throttle parameter was given, use MAX VALUE as the allowed MB/S
if(null == maxReadRate) {
maxReadRate = Integer.MAX_VALUE;
}
+ "' at " + startTime + " with retention scan throttle rate:" + maxReadRate
+ " Entries/second.");

IoThrottler throttler = new IoThrottler(maxReadRate);

Runnable cleanupJob = new DataCleanupJob<ByteArray, byte[]>(engine,
Expand Down Expand Up @@ -336,4 +341,51 @@ public StoreRepository getStoreRepository() {
return this.storeRepository;
}

@JmxOperation(description = "Force cleanup of old data based on retention policy, allows override of throttle-rate", impact = MBeanOperationInfo.ACTION)
public void forceCleanupOldData(String storeName) {
StoreDefinition storeDef = getMetadataStore().getStoreDef(storeName);
int throttleRate = storeDef.hasRetentionScanThrottleRate() ? storeDef.getRetentionScanThrottleRate()
: Integer.MAX_VALUE;

forceCleanupOldDataThrottled(storeName, throttleRate);
}

@JmxOperation(description = "Force cleanup of old data based on retention policy.", impact = MBeanOperationInfo.ACTION)
public void forceCleanupOldDataThrottled(String storeName, int entryScanThrottleRate) {
logger.info("forceCleanupOldData() called for store " + storeName
+ " with retention scan throttle rate:" + entryScanThrottleRate
+ " Entries/second.");

try {
StoreDefinition storeDef = getMetadataStore().getStoreDef(storeName);
StorageEngine<ByteArray, byte[]> engine = storeRepository.getStorageEngine(storeName);

if(null != engine) {
if(storeDef.hasRetentionPeriod()) {
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
if(cleanupPermits.availablePermits() >= 1) {

executor.execute(new DataCleanupJob<ByteArray, byte[]>(engine,
cleanupPermits,
storeDef.getRetentionDays()
* Time.MS_PER_DAY,
SystemTime.INSTANCE,
new IoThrottler(entryScanThrottleRate)));
} else {
logger.error("forceCleanupOldData() No permit available to run cleanJob already running multiple instance."
+ engine.getName());
}
} finally {
executor.shutdown();
}
} else {
logger.error("forceCleanupOldData() No retention policy found for " + storeName);
}
}
} catch(Exception e) {
logger.error("Error while running forceCleanupOldData()", e);
throw new VoldemortException(e);
}
}
}
16 changes: 8 additions & 8 deletions src/java/voldemort/store/StoreDefinition.java
Expand Up @@ -45,7 +45,7 @@ public class StoreDefinition implements Serializable {
private final Integer preferredReads;
private final int requiredReads;
private final Integer retentionPeriodDays;
private final Integer retentionThrottleRate;
private final Integer retentionScanThrottleRate;
private final String routingStrategyType;

public StoreDefinition(String name,
Expand All @@ -72,7 +72,7 @@ public StoreDefinition(String name,
this.keySerializer = Utils.notNull(keySerializer);
this.valueSerializer = Utils.notNull(valueSerializer);
this.retentionPeriodDays = retentionDays;
this.retentionThrottleRate = retentionThrottleRate;
this.retentionScanThrottleRate = retentionThrottleRate;
this.routingStrategyType = routingStrategyType;
checkParameterLegality();
}
Expand Down Expand Up @@ -165,12 +165,12 @@ public Integer getRetentionDays() {
return this.retentionPeriodDays;
}

public boolean hasRetentionThrottleRate() {
return this.retentionThrottleRate != null;
public boolean hasRetentionScanThrottleRate() {
return this.retentionScanThrottleRate != null;
}

public Integer getRetentionThrottleRate() {
return this.retentionThrottleRate;
public Integer getRetentionScanThrottleRate() {
return this.retentionScanThrottleRate;
}

@Override
Expand All @@ -193,7 +193,7 @@ && getKeySerializer().equals(def.getKeySerializer())
&& getValueSerializer().equals(def.getValueSerializer())
&& getRoutingPolicy() == def.getRoutingPolicy()
&& Objects.equal(getRetentionDays(), def.getRetentionDays())
&& Objects.equal(getRetentionThrottleRate(), def.getRetentionThrottleRate());
&& Objects.equal(getRetentionScanThrottleRate(), def.getRetentionScanThrottleRate());
}

@Override
Expand All @@ -209,6 +209,6 @@ public int hashCode() {
getPreferredReads(),
getPreferredWrites(),
getRetentionDays(),
getRetentionThrottleRate());
getRetentionScanThrottleRate());
}
}
8 changes: 4 additions & 4 deletions src/java/voldemort/xml/StoreDefinitionsMapper.java
Expand Up @@ -73,7 +73,7 @@ public class StoreDefinitionsMapper {
public final static String STORE_REQUIRED_READS_ELMT = "required-reads";
public final static String STORE_PREFERRED_READS_ELMT = "preferred-reads";
public final static String STORE_RETENTION_POLICY_ELMT = "retention-days";
public final static String STORE_RETENTION_THROTTLE_RATE_ELMT = "retention-throttle-rate";
public final static String STORE_RETENTION_SCAN_THROTTLE_RATE_ELMT = "retention-scan-throttle-rate";
public final static String STORE_ROUTING_STRATEGY = "routing-strategy";
private final static String STORE_VERSION_ATTR = "version";

Expand Down Expand Up @@ -165,7 +165,7 @@ private StoreDefinition readStore(Element store) {
Integer retentionThrottleRate = null;
if(retention != null) {
retentionPolicyDays = Integer.parseInt(retention.getText());
Element throttleRate = store.getChild(STORE_RETENTION_THROTTLE_RATE_ELMT);
Element throttleRate = store.getChild(STORE_RETENTION_SCAN_THROTTLE_RATE_ELMT);
if(throttleRate != null)
retentionThrottleRate = Integer.parseInt(throttleRate.getText());
}
Expand Down Expand Up @@ -256,8 +256,8 @@ private Element toElement(StoreDefinition storeDefinition) {
if(storeDefinition.hasRetentionPeriod())
store.addContent(new Element(STORE_RETENTION_POLICY_ELMT).setText(Integer.toString(storeDefinition.getRetentionDays())));

if(storeDefinition.hasRetentionThrottleRate())
store.addContent(new Element(STORE_RETENTION_THROTTLE_RATE_ELMT).setText(Integer.toString(storeDefinition.getRetentionThrottleRate())));
if(storeDefinition.hasRetentionScanThrottleRate())
store.addContent(new Element(STORE_RETENTION_SCAN_THROTTLE_RATE_ELMT).setText(Integer.toString(storeDefinition.getRetentionScanThrottleRate())));

return store;
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/xml/stores.xsd
Expand Up @@ -34,7 +34,7 @@
<xs:element name="value-serializer" type="serializer" />
<xs:element name="retention-days" type="xs:nonNegativeInteger"
minOccurs="0" maxOccurs="1" />
<xs:element name="retention-throttle-rate" type="xs:nonNegativeInteger"
<xs:element name="retention-scan-throttle-rate" type="xs:nonNegativeInteger"
minOccurs="0" maxOccurs="1" />
</xs:all>
</xs:complexType>
Expand Down
27 changes: 19 additions & 8 deletions test/integration/voldemort/CatBdbStore.java
Expand Up @@ -20,11 +20,13 @@
import java.util.Iterator;

import voldemort.serialization.StringSerializer;
import voldemort.server.VoldemortConfig;
import voldemort.store.StorageEngine;
import voldemort.store.bdb.BdbStorageEngine;
import voldemort.store.serialized.SerializingStorageEngine;
import voldemort.utils.ByteArray;
import voldemort.utils.Pair;
import voldemort.utils.Props;
import voldemort.utils.Utils;
import voldemort.versioning.Versioned;

Expand All @@ -36,20 +38,29 @@
public class CatBdbStore {

public static void main(String[] args) throws Exception {
if(args.length != 1)
Utils.croak("USAGE: java " + CatBdbStore.class.getName() + " bdb_dir");
if(args.length != 2)
Utils.croak("USAGE: java " + CatBdbStore.class.getName() + " bdb_dir" + " storeName"
+ " server.properties.path");

String bdbDir = args[0];
String storeName = args[1];
String serverProperties = args[2];

VoldemortConfig config = new VoldemortConfig(new Props(new File(serverProperties)));

EnvironmentConfig environmentConfig = new EnvironmentConfig();
environmentConfig.setTxnNoSync(true);
environmentConfig.setAllowCreate(true);
environmentConfig.setTransactional(true);
Environment environment = new Environment(new File(args[0]), environmentConfig);
environmentConfig.setTransactional(config.isBdbWriteTransactionsEnabled());
Environment environment = new Environment(new File(bdbDir), environmentConfig);
DatabaseConfig databaseConfig = new DatabaseConfig();
databaseConfig.setAllowCreate(true);
databaseConfig.setTransactional(true);
databaseConfig.setSortedDuplicates(false);
Database database = environment.openDatabase(null, "test", databaseConfig);
StorageEngine<ByteArray, byte[]> store = new BdbStorageEngine("test", environment, database);
databaseConfig.setTransactional(config.isBdbWriteTransactionsEnabled());
databaseConfig.setSortedDuplicates(config.isBdbSortedDuplicatesEnabled());
Database database = environment.openDatabase(null, storeName, databaseConfig);
StorageEngine<ByteArray, byte[]> store = new BdbStorageEngine(storeName,
environment,
database);
StorageEngine<String, String> stringStore = new SerializingStorageEngine<String, String>(store,
new StringSerializer(),
new StringSerializer());
Expand Down

0 comments on commit 97ddd2b

Please sign in to comment.