Skip to content

Commit

Permalink
Added the ability to auto-bootstrap on store definition changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed Oct 12, 2012
1 parent 5e00c76 commit 8804f79
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 18 deletions.
11 changes: 11 additions & 0 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -504,6 +504,17 @@ public static void main(String[] args) throws Exception {
adminClient,
MetadataStore.STORES_KEY,
mapper.writeStoreList(storeDefs));
if(storeNames != null) {
System.out.println("Updating metadata version for the following stores: "
+ storeNames);
try {
for(String name: storeNames) {
adminClient.updateMetadataversion(name);
}
} catch(Exception e) {
System.err.println("Error while updating metadata version for the specified store.");
}
}
} else if(metadataKey.compareTo(MetadataStore.REBALANCING_STEAL_INFO) == 0) {
if(!Utils.isReadableFile(metadataValue))
throw new VoldemortException("Rebalancing steal info file path incorrect");
Expand Down
3 changes: 2 additions & 1 deletion src/java/voldemort/client/ZenStoreClient.java
Expand Up @@ -170,7 +170,8 @@ public Void call() throws Exception {
};

asyncMetadataManager = new AsyncMetadataVersionManager(this.sysRepository,
rebootstrapCallback);
rebootstrapCallback,
this.storeName);

// schedule the job to run every 'checkInterval' period, starting
// now
Expand Down
Expand Up @@ -44,37 +44,56 @@
public class AsyncMetadataVersionManager implements Runnable {

public static final String CLUSTER_VERSION_KEY = "cluster.xml";
public static String STORES_VERSION_KEY = "stores.xml";
public static final String VERSIONS_METADATA_STORE = "metadata-versions";

private final Logger logger = Logger.getLogger(this.getClass());
private Long currentClusterVersion;
private Long currentStoreVersion;
private final Callable<Void> storeClientThunk;
private final SystemStoreRepository systemStoreRepository;
public boolean isActive = false;

public AsyncMetadataVersionManager(SystemStoreRepository sysRepository,
Callable<Void> storeClientThunk) {
Callable<Void> storeClientThunk,
String storeName) {
this.systemStoreRepository = sysRepository;

if(storeName != null) {
STORES_VERSION_KEY = storeName;
}

// Get the properties object from the system store (containing versions)
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.systemStoreRepository.getMetadataVersionStore());

try {
this.currentClusterVersion = getCurrentVersion(CLUSTER_VERSION_KEY, versionProps);
} catch(Exception e) {
logger.error("Exception while getting currentClusterVersion : " + e);
}
// Initialize base cluster version to do all subsequent comparisons
this.currentClusterVersion = initializeVersion(CLUSTER_VERSION_KEY, versionProps);

// Initialize base store version to do all subsequent comparisons
this.currentStoreVersion = initializeVersion(STORES_VERSION_KEY, versionProps);

// If the received version is null, assume version 0
if(currentClusterVersion == null) {
currentClusterVersion = new Long(0);
}
logger.debug("Initial cluster.xml version = " + this.currentClusterVersion);
logger.debug("Initial store '" + storeName + "' version = " + this.currentClusterVersion);

this.storeClientThunk = storeClientThunk;
this.isActive = true;
}

private Long initializeVersion(String versionKey, Properties versionProps) {
Long baseVersion = null;
try {
baseVersion = getCurrentVersion(versionKey, versionProps);
} catch(Exception e) {
logger.error("Exception while getting version for key : " + versionKey
+ " Exception : " + e);
}

if(baseVersion == null) {
baseVersion = new Long(0);
}
return baseVersion;
}

public Long getCurrentVersion(String versionKey, Properties versionProps) {
Long versionValue = null;

Expand Down Expand Up @@ -114,7 +133,7 @@ public Long fetchNewVersion(String versionKey, Long curVersion, Properties versi

// Swallow all exceptions here (we dont want to fail the client).
catch(Exception e) {
logger.debug("Could not retrieve Metadata Version.");
logger.debug("Could not retrieve Metadata Version. Exception : " + e);
}

return null;
Expand All @@ -128,18 +147,30 @@ public void run() {
* versions)
*/
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.systemStoreRepository.getMetadataVersionStore());

Long newClusterVersion = fetchNewVersion(CLUSTER_VERSION_KEY,
currentClusterVersion,
this.currentClusterVersion,
versionProps);
Long newStoreVersion = fetchNewVersion(STORES_VERSION_KEY,
this.currentStoreVersion,
versionProps);

// If nothing has been updated, continue
if(newClusterVersion != null) {
// Check if something has been updated
if((newClusterVersion != null) || (newStoreVersion != null)) {
logger.info("Metadata version mismatch detected. Re-bootstrapping!");
try {
logger.info("Updating cluster version");
currentClusterVersion = newClusterVersion;
if(newClusterVersion != null) {
logger.info("Updating cluster version");
currentClusterVersion = newClusterVersion;
}

if(newStoreVersion != null) {
logger.info("Updating store : '" + STORES_VERSION_KEY + "' version");
this.currentStoreVersion = newStoreVersion;
}

this.storeClientThunk.call();

} catch(Exception e) {
if(logger.isDebugEnabled()) {
e.printStackTrace();
Expand All @@ -158,6 +189,10 @@ public Long getClusterMetadataVersion() {
return this.currentClusterVersion;
}

public Long getStoreMetadataVersion() {
return this.currentStoreVersion;
}

// Fetch the latest versions for cluster metadata
public void updateMetadataVersions() {
Properties versionProps = MetadataVersionStoreUtils.getProperties(this.systemStoreRepository.getMetadataVersionStore());
Expand Down
Expand Up @@ -141,7 +141,8 @@ public Void call() throws Exception {

// Starting the Version Metadata Manager
this.asyncCheckMetadata = new AsyncMetadataVersionManager(this.repository,
rebootstrapCallback);
rebootstrapCallback,
null);
scheduler.schedule(asyncCheckMetadata.getClass().getName(),
asyncCheckMetadata,
new Date(),
Expand Down

0 comments on commit 8804f79

Please sign in to comment.