Skip to content

Commit

Permalink
Added tests for auto-rebootstrap, Failure detector fix to track just …
Browse files Browse the repository at this point in the history
…one state of the topology (instead of immutable states), added ZenStoreClient
  • Loading branch information
Chinmay Soman committed Sep 6, 2012
2 parents fcffeb5 + e77ee0f commit a86a9e5
Show file tree
Hide file tree
Showing 41 changed files with 1,041 additions and 706 deletions.
Expand Up @@ -150,7 +150,7 @@ public void testAllNodesOffline() throws Exception {
test(store);
assertEquals(hostNamePairs.size(), failureDetector.getAvailableNodeCount());

for(Node n: failureDetector.getConfig().getNodes())
for(Node n: failureDetector.getConfig().getCluster().getNodes())
assertTrue(failureDetector.isAvailable(n));

// 2. Stop all the nodes, then test enough that we can cause the nodes
Expand All @@ -159,19 +159,19 @@ public void testAllNodesOffline() throws Exception {
test(store);
assertEquals(0, failureDetector.getAvailableNodeCount());

for(Node n: failureDetector.getConfig().getNodes())
for(Node n: failureDetector.getConfig().getCluster().getNodes())
assertFalse(failureDetector.isAvailable(n));

// 3. Now start the cluster up, test, and make sure everything's OK.
startClusterAsync(hostNames, ec2FailureDetectorTestConfig, nodeIds);

for(Node n: failureDetector.getConfig().getNodes())
for(Node n: failureDetector.getConfig().getCluster().getNodes())
failureDetector.waitForAvailability(n);

test(store);
assertEquals(hostNamePairs.size(), failureDetector.getAvailableNodeCount());

for(Node n: failureDetector.getConfig().getNodes())
for(Node n: failureDetector.getConfig().getCluster().getNodes())
assertTrue(failureDetector.isAvailable(n));
}

Expand Down Expand Up @@ -252,7 +252,7 @@ private Node getNodeByHostName(String hostName, FailureDetector failureDetector)
throws Exception {
Integer offlineNodeId = nodeIds.get(hostName);

for(Node n: failureDetector.getConfig().getNodes()) {
for(Node n: failureDetector.getConfig().getCluster().getNodes()) {
if(offlineNodeId.equals(n.getId()))
return n;
}
Expand Down
181 changes: 117 additions & 64 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -19,6 +19,7 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
Expand All @@ -36,6 +37,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

Expand All @@ -47,7 +49,6 @@
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;

import voldemort.client.SystemStore;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
Expand All @@ -56,6 +57,7 @@
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
import voldemort.serialization.StringSerializer;
import voldemort.server.rebalance.RebalancerState;
import voldemort.store.StoreDefinition;
import voldemort.store.compress.CompressionStrategy;
Expand Down Expand Up @@ -227,7 +229,10 @@ public static void main(String[] args) throws Exception {
.withRequiredArg()
.describedAs("version")
.ofType(Long.class);

parser.accepts("verify-metadata-version",
"Verify the version of Metadata on all the cluster nodes");
parser.accepts("synchronize-metadata-version",
"Synchronize the metadata versions across all the nodes.");
OptionSet options = parser.parse(args);

if(options.has("help")) {
Expand All @@ -243,7 +248,7 @@ public static void main(String[] args) throws Exception {
|| options.has("ro-metadata") || options.has("set-metadata")
|| options.has("get-metadata") || options.has("check-metadata") || options.has("key-distribution"))
|| options.has("truncate") || options.has("clear-rebalancing-metadata")
|| options.has("async") || options.has("native-backup") || options.has("rollback"))) {
|| options.has("async") || options.has("native-backup") || options.has("rollback") || options.has("verify-metadata-version"))) {
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
printHelp(System.err, parser);
System.exit(1);
Expand All @@ -255,14 +260,13 @@ public static void main(String[] args) throws Exception {
int parallelism = CmdUtils.valueOf(options, "restore", 5);
Integer zoneId = CmdUtils.valueOf(options, "zone", -1);

AdminClient adminClient = new AdminClient(url, new AdminClientConfig());
int zone = zoneId == -1 ? 0 : zoneId;
AdminClient adminClient = new AdminClient(url, new AdminClientConfig(), zone);

// Initialize the system store for stores.xml version
String[] bootstrapUrls = new String[1];
bootstrapUrls[0] = url;
SystemStore<String, String> sysStoreVersion = new SystemStore<String, String>(SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name(),
bootstrapUrls,
0);
if(options.has("verify-metadata-version")) {
checkMetadataVersion(adminClient);
return;
}

String ops = "";
if(options.has("delete-partitions")) {
Expand Down Expand Up @@ -325,11 +329,15 @@ public static void main(String[] args) throws Exception {
}
ops += "o";
}
if(options.has("synchronize-metadata-version")) {
ops += "z";
}

if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
+ "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, "
+ "set-metadata, check-metadata, key-distribution, clear-rebalancing-metadata, async, "
+ "repair-job, native-backup) must be specified");
+ "repair-job, native-backup, verify-metadata-version) must be specified");
}

List<String> storeNames = null;
Expand Down Expand Up @@ -432,9 +440,6 @@ public static void main(String[] args) throws Exception {
adminClient,
MetadataStore.CLUSTER_KEY,
mapper.writeCluster(newCluster));

// Update the cluster.xml version info
updateMetadataversion(CLUSTER_VERSION_KEY, sysStoreVersion);
} else if(metadataKey.compareTo(MetadataStore.SERVER_STATE_KEY) == 0) {
VoldemortState newState = VoldemortState.valueOf(metadataValue);
executeSetMetadata(nodeId,
Expand All @@ -450,16 +455,6 @@ public static void main(String[] args) throws Exception {
adminClient,
MetadataStore.STORES_KEY,
mapper.writeStoreList(storeDefs));

/*
* Update the store metadata version
*
* TODO: For now write one version for the entire
* stores.xml When we split the stores.xml, make this
* more granular
*/
updateMetadataversion(STORES_VERSION_KEY, sysStoreVersion);

} else if(metadataKey.compareTo(MetadataStore.REBALANCING_STEAL_INFO) == 0) {
if(!Utils.isReadableFile(metadataValue))
throw new VoldemortException("Rebalancing steal info file path incorrect");
Expand Down Expand Up @@ -507,12 +502,97 @@ public static void main(String[] args) throws Exception {
long pushVersion = (Long) options.valueOf("version");
executeRollback(nodeId, storeName, pushVersion, adminClient);
}
if(ops.contains("z")) {
synchronizeMetadataVersion(adminClient, nodeId);
}
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
}
}

private static String getMetadataVersionsForNode(AdminClient adminClient, int nodeId) {
List<Integer> partitionIdList = Lists.newArrayList();
for(Node node: adminClient.getAdminClientCluster().getNodes()) {
partitionIdList.addAll(node.getPartitionIds());
}

Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = adminClient.fetchEntries(nodeId,
SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name(),
partitionIdList,
null,
true);
Serializer<String> serializer = new StringSerializer("UTF8");
String keyObject = null;
String valueObject = null;

while(entriesIterator.hasNext()) {
try {
Pair<ByteArray, Versioned<byte[]>> kvPair = entriesIterator.next();
byte[] keyBytes = kvPair.getFirst().get();
byte[] valueBytes = kvPair.getSecond().getValue();
keyObject = serializer.toObject(keyBytes);
if(!keyObject.equals(MetadataVersionStoreUtils.VERSIONS_METADATA_KEY)) {
continue;
}
valueObject = serializer.toObject(valueBytes);
} catch(Exception e) {
System.err.println("Error while retrieving Metadata versions from node : " + nodeId
+ ". Exception = \n");
e.printStackTrace();
System.exit(-1);
}
}

return valueObject;
}

private static void checkMetadataVersion(AdminClient adminClient) {
Map<Properties, Integer> versionsNodeMap = new HashMap<Properties, Integer>();

for(Node node: adminClient.getAdminClientCluster().getNodes()) {
String valueObject = getMetadataVersionsForNode(adminClient, node.getId());
Properties props = new Properties();
try {
props.load(new ByteArrayInputStream(valueObject.getBytes()));
} catch(IOException e) {
System.err.println("Error while parsing Metadata versions for node : "
+ node.getId() + ". Exception = \n");
e.printStackTrace();
System.exit(-1);
}

versionsNodeMap.put(props, node.getId());
}

if(versionsNodeMap.keySet().size() > 1) {
System.err.println("Mismatching versions detected !!!");
for(Entry<Properties, Integer> entry: versionsNodeMap.entrySet()) {
System.out.println("**************************** Node: " + entry.getValue()
+ " ****************************");
System.out.println(entry.getKey());
}
} else {
System.err.println("All the nodes have the same metadata versions .");
}

}

private static void synchronizeMetadataVersion(AdminClient adminClient, int baseNodeId) {
String valueObject = getMetadataVersionsForNode(adminClient, baseNodeId);
Properties props = new Properties();
try {
props.load(new ByteArrayInputStream(valueObject.getBytes()));
adminClient.setMetadataversion(props);
System.out.println("Metadata versions synchronized successfully.");
} catch(IOException e) {
System.err.println("Error while retrieving Metadata versions from node : " + baseNodeId
+ ". Exception = \n");
e.printStackTrace();
System.exit(-1);
}
}

private static void executeRollback(Integer nodeId,
String storeName,
long pushVersion,
Expand Down Expand Up @@ -749,21 +829,6 @@ private static void executeCheckMetadata(AdminClient adminClient, String metadat
}
}

// Get the metadata version for the given key (cluster or store)
public static void updateMetadataversion(String versionKey,
SystemStore<String, String> sysStoreVersion) {
Properties props = MetadataVersionStoreUtils.getProperties(sysStoreVersion);
if(props.getProperty(versionKey) != null) {
System.out.println("Version obtained = " + props.getProperty(versionKey));
long newValue = Long.parseLong(props.getProperty(versionKey)) + 1;
props.setProperty(versionKey, Long.toString(newValue));
} else {
System.err.println("Current version is null. Assuming version 0.");
props.setProperty(versionKey, "0");
}
MetadataVersionStoreUtils.setProperties(sysStoreVersion, props);
}

public static void executeSetMetadata(Integer nodeId,
AdminClient adminClient,
String key,
Expand Down Expand Up @@ -792,21 +857,9 @@ public static void executeSetMetadata(Integer nodeId,
System.currentTimeMillis());
nodeIds.add(nodeId);
}
for(Integer currentNodeId: nodeIds) {
System.out.println("Setting "
+ key
+ " for "
+ adminClient.getAdminClientCluster()
.getNodeById(currentNodeId)
.getHost()
+ ":"
+ adminClient.getAdminClientCluster()
.getNodeById(currentNodeId)
.getId());
adminClient.updateRemoteMetadata(currentNodeId,
key,
Versioned.value(value.toString(), updatedVersion));
}
adminClient.updateRemoteMetadata(nodeIds,
key,
Versioned.value(value.toString(), updatedVersion));
}

private static void executeROMetadata(Integer nodeId,
Expand Down Expand Up @@ -1002,9 +1055,9 @@ private static void executeFetchEntries(Integer nodeId,
stores = Lists.newArrayList();
stores.addAll(storeDefinitionMap.keySet());
} else {
// add system store to the map so they can be fetched when specified
// explicitly
storeDefinitionMap.putAll(getSystemStoreDef());
// add system stores to the map so they can be fetched when
// specified explicitly
storeDefinitionMap.putAll(getSystemStoreDefs());
}

// Pick up all the partitions
Expand Down Expand Up @@ -1049,13 +1102,13 @@ private static void executeFetchEntries(Integer nodeId,
}
}

private static Map<String, StoreDefinition> getSystemStoreDef() {
Map<String, StoreDefinition> sysStoreDef = Maps.newHashMap();
private static Map<String, StoreDefinition> getSystemStoreDefs() {
Map<String, StoreDefinition> sysStoreDefMap = Maps.newHashMap();
List<StoreDefinition> storesDefs = SystemStoreConstants.getAllSystemStoreDefs();
for(StoreDefinition def: storesDefs) {
sysStoreDef.put(def.getName(), def);
sysStoreDefMap.put(def.getName(), def);
}
return sysStoreDef;
return sysStoreDefMap;
}

private static void executeUpdateEntries(Integer nodeId,
Expand Down Expand Up @@ -1255,9 +1308,9 @@ private static void executeFetchKeys(Integer nodeId,
stores = Lists.newArrayList();
stores.addAll(storeDefinitionMap.keySet());
} else {
// add system store to the map so they can be fetched when specified
// explicitly
storeDefinitionMap.putAll(getSystemStoreDef());
// add system stores to the map so they can be fetched when
// specified explicitly
storeDefinitionMap.putAll(getSystemStoreDefs());
}

// Pick up all the partitions
Expand Down

0 comments on commit a86a9e5

Please sign in to comment.