Skip to content

Commit

Permalink
Code cleanup, bug fixes for system stores and auto-rebootstrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed Sep 11, 2012
1 parent a86a9e5 commit 440832e
Show file tree
Hide file tree
Showing 24 changed files with 128 additions and 112 deletions.
3 changes: 2 additions & 1 deletion src/java/voldemort/client/ClientConfig.java
Expand Up @@ -626,8 +626,9 @@ public int getClientZoneId() {
return this.clientZoneId;
}

public void enableDefaultClient(boolean enableDefault) {
public ClientConfig enableDefaultClient(boolean enableDefault) {
this.useDefaultClient = enableDefault;
return this;
}

public boolean isDefaultClientEnabled() {
Expand Down
19 changes: 10 additions & 9 deletions src/java/voldemort/client/SystemStore.java
Expand Up @@ -46,7 +46,11 @@ public SystemStore(String storeName,
.setClientZoneId(clientZoneID);
this.systemStoreFactory = new SystemStoreClientFactory(config);
this.storeName = storeName;
this.sysStore = this.systemStoreFactory.getSystemStore(this.storeName, clusterXml, fd);
try {
this.sysStore = this.systemStoreFactory.getSystemStore(this.storeName, clusterXml, fd);
} catch(Exception e) {
logger.debug("Error while creating a system store client for store : " + this.storeName);
}
}

public Version putSysStore(K key, V value) {
Expand All @@ -61,9 +65,8 @@ public Version putSysStore(K key, V value) {
this.sysStore.put(key, versioned, null);
version = versioned.getVersion();
} catch(Exception e) {
logger.info("Exception caught during putSysStore: " + e);
if(logger.isDebugEnabled()) {
e.printStackTrace();
logger.debug("Exception caught during putSysStore: " + e);
}
}
return version;
Expand All @@ -76,9 +79,8 @@ public Version putSysStore(K key, Versioned<V> value) {
this.sysStore.put(key, value, null);
version = value.getVersion();
} catch(Exception e) {
logger.info("Exception caught during putSysStore: " + e);
if(logger.isDebugEnabled()) {
e.printStackTrace();
logger.debug("Exception caught during putSysStore: " + e);
}
}
return version;
Expand All @@ -89,6 +91,7 @@ public Versioned<V> getSysStore(K key) {
Versioned<V> versioned = null;
try {
List<Versioned<V>> items = this.sysStore.get(key, null);

if(items.size() == 1)
versioned = items.get(0);
else if(items.size() > 1)
Expand All @@ -100,9 +103,8 @@ else if(items.size() > 1)
else
logger.debug("Got null value");
} catch(Exception e) {
logger.info("Exception caught during getSysStore: " + e);
if(logger.isDebugEnabled()) {
e.printStackTrace();
logger.debug("Exception caught during getSysStore: " + e);
}
}
return versioned;
Expand All @@ -119,9 +121,8 @@ public V getValueSysStore(K key) {
value = versioned.getValue();
}
} catch(Exception e) {
logger.info("Exception caught during getSysStore: " + e);
if(logger.isDebugEnabled()) {
e.printStackTrace();
logger.debug("Exception caught during getSysStore: " + e);
}
}
return value;
Expand Down
21 changes: 16 additions & 5 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -213,9 +213,13 @@ public AdminClient(String bootstrapURL, AdminClientConfig adminClientConfig, int
private void initSystemStoreClient(String bootstrapURL, int zoneID) {
String[] bootstrapUrls = new String[1];
bootstrapUrls[0] = bootstrapURL;
this.sysStoreVersion = new SystemStore<String, String>(SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name(),
bootstrapUrls,
zoneID);
try {
this.sysStoreVersion = new SystemStore<String, String>(SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name(),
bootstrapUrls,
zoneID);
} catch(Exception e) {
logger.debug("Error while creating a system store client for metadata version store.");
}
}

/**
Expand All @@ -226,12 +230,15 @@ private void initSystemStoreClient(String bootstrapURL, int zoneID) {
*/
public void updateMetadataversion(String versionKey) {
Properties props = MetadataVersionStoreUtils.getProperties(this.sysStoreVersion);
if(props.getProperty(versionKey) != null) {
if(props != null && props.getProperty(versionKey) != null) {
logger.debug("Version obtained = " + props.getProperty(versionKey));
long newValue = Long.parseLong(props.getProperty(versionKey)) + 1;
props.setProperty(versionKey, Long.toString(newValue));
} else {
logger.debug("Current version is null. Assuming version 0.");
if(props == null) {
props = new Properties();
}
props.setProperty(versionKey, "0");
}
MetadataVersionStoreUtils.setProperties(this.sysStoreVersion, props);
Expand Down Expand Up @@ -2403,7 +2410,11 @@ public void rebalanceStateChange(Cluster existingCluster,
* metadata
*/
if(changeClusterMetadata) {
updateMetadataversion(CLUSTER_VERSION_KEY);
try {
updateMetadataversion(CLUSTER_VERSION_KEY);
} catch(Exception e) {
logger.info("Exception occurred while setting cluster metadata version during Rebalance state change !!!");
}
}
} catch(Exception e) {

Expand Down
Expand Up @@ -94,51 +94,59 @@ public Long fetchNewVersion(String versionKey, Long curVersion, Properties versi

// Swallow all exceptions here (we dont want to fail the client).
catch(Exception e) {
logger.info("Could not retrieve Metadata Version. Exception : " + e);
logger.debug("Could not retrieve Metadata Version.");
}

return null;
}

public void run() {

// Get the properties object from the system store (containing versions)
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.sysRepository.getMetadataVersionStore());
Long newClusterVersion = fetchNewVersion(CLUSTER_VERSION_KEY,
currentClusterVersion,
versionProps);

// If nothing has been updated, continue
if(newClusterVersion != null) {

logger.info("Metadata version mismatch detected.");
try {
// Get the properties object from the system store (containing
// versions)
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.sysRepository.getMetadataVersionStore());
Long newClusterVersion = fetchNewVersion(CLUSTER_VERSION_KEY,
currentClusterVersion,
versionProps);

// Determine a random delta delay between 0 to DELTA_MAX to sleep
int delta = randomGenerator.nextInt(DELTA_MAX);
// If nothing has been updated, continue
if(newClusterVersion != null) {

try {
logger.info("Sleeping for delta : " + delta + " (ms) before re-bootstrapping.");
Thread.sleep(delta);
} catch(InterruptedException e) {
// do nothing, continue.
}
logger.info("Metadata version mismatch detected.");

/*
* Do another check for mismatch here since the versions might have
* been updated while we were sleeping
*/
if(!newClusterVersion.equals(currentClusterVersion)) {
// Determine a random delta delay between 0 to DELTA_MAX to
// sleep
int delta = randomGenerator.nextInt(DELTA_MAX);

try {
logger.info("Updating cluster version");
currentClusterVersion = newClusterVersion;
logger.info("Sleeping for delta : " + delta + " (ms) before re-bootstrapping.");
Thread.sleep(delta);
} catch(InterruptedException e) {
// do nothing, continue.
}

this.storeClientThunk.call();
} catch(Exception e) {
e.printStackTrace();
logger.info(e.getMessage());
/*
* Do another check for mismatch here since the versions might
* have been updated while we were sleeping
*/
if(!newClusterVersion.equals(currentClusterVersion)) {

try {
logger.info("Updating cluster version");
currentClusterVersion = newClusterVersion;

this.storeClientThunk.call();
} catch(Exception e) {
if(logger.isDebugEnabled()) {
e.printStackTrace();
logger.debug(e.getMessage());
}
}
}
}
} catch(Exception e) {
logger.debug("Could not retrieve metadata versions from the server.");
}

}
Expand Down
Expand Up @@ -548,7 +548,6 @@ public FailureDetectorConfig setRequestLengthThreshold(long requestLengthThresho
*/

public synchronized Cluster getCluster() {
System.err.println("Cluster = " + cluster);
return this.cluster;
}

Expand Down Expand Up @@ -577,6 +576,7 @@ public synchronized FailureDetectorConfig setCluster(Cluster cluster) {
*/

public synchronized Collection<Node> getNodes() {
System.err.println("DEPRECATED !!! Please use getCluster().getNodes() method instead !");
return ImmutableSet.copyOf(this.cluster.getNodes());
}

Expand All @@ -593,6 +593,7 @@ public synchronized Collection<Node> getNodes() {
*/

public synchronized FailureDetectorConfig setNodes(Collection<Node> nodes) {
System.err.println("DEPRECATED !!! Please use setCluster method instead !");
Utils.notNull(nodes);
this.nodes = new HashSet<Node>(nodes);
return this;
Expand Down
28 changes: 0 additions & 28 deletions src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -372,7 +372,6 @@ protected void initializeMetadataVersions(List<StoreDefinition> storeDefs) {
for(String propName: props.stringPropertyNames()) {
finalVersionList.append(propName + "=" + props.getProperty(propName) + "\n");
}
System.err.println(finalVersionList);
versionStore.put(metadataVersionsKey,
new Versioned<byte[]>(finalVersionList.toString().getBytes(), newClock),
null);
Expand Down Expand Up @@ -424,33 +423,6 @@ public void registerSystemEngine(StorageEngine<ByteArray, byte[], byte[]> engine
store = new LoggingStore<ByteArray, byte[], byte[]>(store,
cluster.getName(),
SystemTime.INSTANCE);
/* TODO: Do we really need rebalancing for system stores? */
if(voldemortConfig.isEnableRebalanceService()) {
store = new RedirectingStore(store,
metadata,
storeRepository,
failureDetector,
storeFactory);
if(voldemortConfig.isJmxEnabled()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = null;
if(this.voldemortConfig.isEnableJmxClusterName())
name = JmxUtils.createObjectName(cluster.getName()
+ "."
+ JmxUtils.getPackageName(RedirectingStore.class),
store.getName());
else
name = JmxUtils.createObjectName(JmxUtils.getPackageName(RedirectingStore.class),
store.getName());

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);
JmxUtils.registerMbean(mbeanServer, JmxUtils.createModelMBean(store), name);
}

}
}

if(voldemortConfig.isMetadataCheckingEnabled())
store = new InvalidMetadataCheckingStore(metadata.getNodeId(), store, metadata);
Expand Down
Expand Up @@ -12,7 +12,6 @@ public class FileBackedCachingStorageConfiguration implements StorageConfigurati

public FileBackedCachingStorageConfiguration(VoldemortConfig config) {
this.inputPath = config.getMetadataDirectory();
System.err.println("Created a new File backed caching engine config ...");
}

public StorageEngine<ByteArray, byte[], byte[]> getStore(String name) {
Expand Down
Expand Up @@ -54,7 +54,9 @@ public FileBackedCachingStorageEngine(String name, String inputDirectory) {
this.inputPath = inputDirectory + "/" + name;
this.metadataMap = new HashMap<String, String>();
this.loadData();
logger.debug("Created a new File backed caching engine. File location = " + inputPath);
if(logger.isDebugEnabled()) {
logger.debug("Created a new File backed caching engine. File location = " + inputPath);
}
}

private File getVersionFile() {
Expand Down
5 changes: 2 additions & 3 deletions src/java/voldemort/store/logging/LoggingStore.java
Expand Up @@ -128,9 +128,8 @@ public void put(K key, Versioned<V> value, T transform) throws VoldemortExceptio
private void printTimedMessage(String operation, boolean success, long startNs) {
if(logger.isDebugEnabled()) {
double elapsedMs = (time.getNanoseconds() - startNs) / (double) Time.NS_PER_MS;
logger.debug(instanceName + operation + " " + getName()
+ " " + (success ? "successful" : "unsuccessful") + " in "
+ elapsedMs + " ms");
logger.debug(instanceName + operation + " " + getName() + " "
+ (success ? "successful" : "unsuccessful") + " in " + elapsedMs + " ms");
}
}

Expand Down
Expand Up @@ -47,6 +47,10 @@ public List<Node> getNodes(ByteArray key) {

try {
nodes = super.getNodes(key);
if(nodes == null) {
return null;
}

String currentHost = InetAddress.getLocalHost().getHostName();
for(Node n: nodes) {
if(currentHost.contains(n.getHost()) || n.getHost().contains(currentHost)) {
Expand Down
22 changes: 15 additions & 7 deletions src/java/voldemort/utils/MetadataVersionStoreUtils.java
Expand Up @@ -44,14 +44,22 @@ public static Properties getProperties(SystemStore<String, String> versionStore)
* @param props The Properties object to write to the System store
*/
public static void setProperties(SystemStore<String, String> versionStore, Properties props) {
StringBuilder finalVersionList = new StringBuilder();
for(String propName: props.stringPropertyNames()) {
if(finalVersionList.length() == 0) {
finalVersionList.append(propName + "=" + props.getProperty(propName));
} else {
finalVersionList.append("\n" + propName + "=" + props.getProperty(propName));
if(props == null) {
return;
}

try {
StringBuilder finalVersionList = new StringBuilder();
for(String propName: props.stringPropertyNames()) {
if(finalVersionList.length() == 0) {
finalVersionList.append(propName + "=" + props.getProperty(propName));
} else {
finalVersionList.append("\n" + propName + "=" + props.getProperty(propName));
}
}
versionStore.putSysStore(VERSIONS_METADATA_KEY, finalVersionList.toString());
} catch(Exception e) {
logger.debug("Got exception in setting properties : " + e.getMessage());
}
versionStore.putSysStore(VERSIONS_METADATA_KEY, finalVersionList.toString());
}
}
Expand Up @@ -16,9 +16,9 @@

package voldemort.cluster.failuredetector;

import static voldemort.cluster.failuredetector.MutableStoreVerifier.create;
import static voldemort.VoldemortTestConstants.getNineNodeCluster;
import static voldemort.cluster.failuredetector.FailureDetectorUtils.create;
import static voldemort.cluster.failuredetector.MutableStoreVerifier.create;

import java.io.IOException;

Expand Down Expand Up @@ -71,7 +71,7 @@ protected FailureDetectorPerformanceTest(String[] args) {
failureDetectorConfig.getThresholdInterval());
Cluster cluster = getNineNodeCluster();

failureDetectorConfig.setNodes(cluster.getNodes())
failureDetectorConfig.setCluster(cluster)
.setStoreVerifier(create(cluster.getNodes()))
.setAsyncRecoveryInterval(asyncScanInterval)
.setBannagePeriod(bannagePeriod)
Expand Down
Expand Up @@ -195,7 +195,7 @@ public static void main(String[] args) throws Throwable {
}

FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig().setImplementationClassName(BannagePeriodFailureDetector.class.getName())
.setNodes(cluster.getNodes())
.setCluster(cluster)
.setStoreVerifier(MutableStoreVerifier.create(stores));
FailureDetector failureDetector = FailureDetectorUtils.create(failureDetectorConfig, false);

Expand Down

0 comments on commit 440832e

Please sign in to comment.