Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Ec2 testing: Cluster object support in RemoteTestUtils, further work …

…on Ec2RebalancingTest.
  • Loading branch information...
commit 042c40177d5034ef8c9cab1dff6b76c2f5197aeb 1 parent ba7681c
@afeinberg afeinberg authored
View
2  contrib/ec2-testing/test/voldemort/utils/Ec2GossipTest.java
@@ -197,7 +197,7 @@ public void checkCondition() throws Exception, AssertionError {
List<String> newHostnames = toHostNames(newInstances);
if (logger.isInfoEnabled())
- logger.info("Sleeping for 15 to let new instances start up");
+ logger.info("Sleeping for 15 seconds to let new instances startup");
Thread.sleep(15000);
View
161 contrib/ec2-testing/test/voldemort/utils/Ec2RebalancingTest.java
@@ -1,21 +1,19 @@
package voldemort.utils;
+import com.google.common.base.Function;
+import com.google.common.collect.*;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.junit.*;
import voldemort.ServerTestUtils;
-import voldemort.client.protocol.admin.AdminClient;
-import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.rebalance.RebalanceClient;
import voldemort.client.rebalance.RebalanceClientConfig;
import voldemort.cluster.Cluster;
-import voldemort.xml.ClusterMapper;
+import voldemort.cluster.Node;
import java.io.File;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import static voldemort.utils.Ec2RemoteTestUtils.createInstances;
import static voldemort.utils.Ec2RemoteTestUtils.destroyInstances;
@@ -33,20 +31,20 @@
private static Ec2RebalancingTestConfig ec2RebalancingTestConfig;
private static List<HostNamePair> hostNamePairs;
private static List<String> hostNames;
- private static Map<String, Integer> nodeIds;
-
-
private static final Logger logger = Logger.getLogger(Ec2RebalancingTest.class);
+ private Cluster originalCluster;
+
private Map<String, String> testEntries;
+ private Map<String, Integer> nodeIds;
+ private int[][] partitionMap;
@BeforeClass
public static void setUpClass() throws Exception {
ec2RebalancingTestConfig = new Ec2RebalancingTestConfig();
hostNamePairs = createInstances(ec2RebalancingTestConfig);
hostNames = toHostNames(hostNamePairs);
- nodeIds = generateClusterDescriptor(hostNamePairs, "test", ec2RebalancingTestConfig, true);
if (logger.isInfoEnabled())
logger.info("Sleeping for 30 seconds to give EC2 instances some time to complete startup");
@@ -59,16 +57,62 @@ public static void setUpClass() throws Exception {
public static void tearDownClass() throws Exception {
if (hostNames != null)
destroyInstances(hostNames, ec2RebalancingTestConfig);
+ }
+
+ private int[][] getPartitionMap(int nodes, int perNode) {
+ int[][] partitionMap = new int[nodes][perNode];
+ int i, k;
+
+ for (i=0, k=0; i<nodes; i++)
+ for (int j=0; j < perNode; j++)
+ partitionMap[i][j] = k++;
+
+ return partitionMap;
+ }
+
+ private int[][] insertNode(int[][] template, int pivot) {
+ int len = template.length;
+
+ int[][] layout = new int[len+1][];
+ System.arraycopy(template, 0, layout, 0, len-1);
+
+ layout[len-1] = new int[len-1];
+ System.arraycopy(template[len-1], 0, layout[len-1], 0, pivot);
+
+ layout[len] = new int[template[len-1].length - pivot];
+ System.arraycopy(template[len-1], pivot, layout[len], 0, template[len-1].length - pivot);
+
+ return layout;
+ }
+
+ private int[] getPorts(int count) {
+ int[] ports = new int[count*3];
+ for (int i = 0; i < count; i += 3) {
+ ports[i] = 6665;
+ ports[i+1] = 6666;
+ ports[i+2] = 6667;
+ }
+ return ports;
}
@Before
public void setUp() throws Exception {
+ int clusterSize = ec2RebalancingTestConfig.getInstanceCount();
+ partitionMap = getPartitionMap(clusterSize, ec2RebalancingTestConfig.partitionsPerNode);
+ originalCluster = ServerTestUtils.getLocalCluster(clusterSize,
+ getPorts(clusterSize),
+ partitionMap);
+
+
deploy(hostNames, ec2RebalancingTestConfig);
startClusterAsync(hostNames, ec2RebalancingTestConfig, nodeIds);
+ nodeIds = generateClusterDescriptor(hostNamePairs, originalCluster, ec2RebalancingTestConfig);
testEntries = ServerTestUtils.createRandomKeyValueString(ec2RebalancingTestConfig.numKeys);
+ originalCluster = updateCluster(originalCluster, nodeIds);
+
if (logger.isInfoEnabled())
logger.info("Sleeping for 15 seconds to let the Voldemort cluster start");
@@ -81,59 +125,87 @@ public void tearDown() throws Exception {
stopClusterQuiet(hostNames, ec2RebalancingTestConfig);
}
- @Test
- public void testSingleRebalancing() throws Exception {
- Cluster originalCluster = getOriginalCluster();
- Cluster targetCluster = getTargetCluster(1);
-
- try {
- RebalanceClient rebalanceClient = new RebalanceClient(getBootstrapUrl(hostNames),
- new RebalanceClientConfig());
-
-
- } finally {
- stopCluster(hostNames, ec2RebalancingTestConfig);
+
+ private Cluster updateCluster(Cluster templateCluster, Map<String, Integer> nodeIds) {
+ List<Node> nodes = new LinkedList<Node>();
+ for (Map.Entry<String,Integer> entry: nodeIds.entrySet()) {
+ String hostName = entry.getKey();
+ int nodeId = entry.getValue();
+ Node templateNode = templateCluster.getNodeById(nodeId);
+ Node node = new Node(nodeId,
+ hostName,
+ templateNode.getHttpPort(),
+ templateNode.getSocketPort(),
+ templateNode.getAdminPort(),
+ templateNode.getPartitionIds());
+ nodes.add(node);
}
+ return new Cluster(templateCluster.getName(), nodes);
}
- public Cluster getOriginalCluster() {
- AdminClient adminClient = new AdminClient(getBootstrapUrl(hostNames), new AdminClientConfig());
- return adminClient.getAdminClientCluster();
- }
+ private Cluster expandCluster(int newNodes, Cluster newCluster) throws Exception {
+ assert(newNodes > 0);
- public Cluster getTargetCluster(int toAdd) throws Exception {
- List<HostNamePair> addlInstances = createInstances(toAdd, ec2RebalancingTestConfig);
- List<String> addlHostNames = toHostNames(addlInstances);
+ List<HostNamePair> newInstances = createInstances(newNodes, ec2RebalancingTestConfig);
if (logger.isInfoEnabled())
- logger.info("Sleeping for 15 seconds to let the new instances startup");
+ logger.info("Sleeping for 15 seconds to let new instances startup");
- hostNamePairs.addAll(addlInstances);
+ Thread.sleep(15000);
+
+ hostNamePairs.addAll(newInstances);
hostNames = toHostNames(hostNamePairs);
- nodeIds = generateClusterDescriptor(hostNamePairs, "test", ec2RebalancingTestConfig, true);
-
- deploy(addlHostNames, ec2RebalancingTestConfig);
- startClusterAsync(addlHostNames, ec2RebalancingTestConfig, nodeIds);
- if (logger.isInfoEnabled())
- logger.info("Sleeping for 15 seconds to start voldemort on the new nodes");
+ nodeIds = generateClusterDescriptor(hostNamePairs, newCluster, ec2RebalancingTestConfig);
- Thread.sleep(15000);
+ logger.info("Expanded the cluster. New layout: " + nodeIds);
- AdminClient adminClient = new AdminClient(getBootstrapUrl(addlHostNames), new AdminClientConfig());
+ return updateCluster(newCluster, nodeIds);
+ }
- return adminClient.getAdminClientCluster();
-
+ @Test
+ public void testSingleRebalancing() throws Exception {
+ int clusterSize = ec2RebalancingTestConfig.getInstanceCount();
+ int[][] targetLayout = insertNode(partitionMap, partitionMap[clusterSize-1].length-1);
+ Cluster targetCluster = ServerTestUtils.getLocalCluster(clusterSize+1,
+ getPorts(clusterSize+1),
+ targetLayout);
+ List<Integer> originalNodes = Lists.transform(Lists.<Node>newLinkedList(originalCluster.getNodes()),
+ new Function<Node, Integer> () {
+ public Integer apply(Node node) {
+ return node.getId();
+ }
+ });
+ targetCluster = expandCluster(targetCluster.getNumberOfNodes() - clusterSize, targetCluster);
+ try {
+ RebalanceClient rebalanceClient = new RebalanceClient(getBootstrapUrl(hostNames),
+ new RebalanceClientConfig());
+ populateData(originalCluster, originalNodes);
+ rebalanceAndCheck(originalCluster, targetCluster, rebalanceClient, Arrays.asList(clusterSize));
+ } finally {
+ stopCluster(hostNames, ec2RebalancingTestConfig);
+ }
}
- public String getBootstrapUrl(List<String> hostnames) {
+ private void populateData(Cluster cluster, List<Integer> nodeList) {
+ // TODO: implement this
+ }
+
+ private void rebalanceAndCheck(Cluster currentCluster,
+ Cluster targetCluster,
+ RebalanceClient rebalanceClient,
+ List<Integer> nodeCheckList) {
+ // TODO: implement this
+ }
+
+ private String getBootstrapUrl(List<String> hostnames) {
return "tcp://" + hostnames.get(0) + ":6666";
}
@Test
public void testProxyGetDuringRebalancing() throws Exception {
try {
-
+ // TODO: implement this
} finally {
stopCluster(hostNames, ec2RebalancingTestConfig);
}
@@ -142,6 +214,7 @@ public void testProxyGetDuringRebalancing() throws Exception {
private static class Ec2RebalancingTestConfig extends Ec2RemoteTestConfig {
private int numKeys;
+ private int partitionsPerNode;
private static String testStoreName = "test-replication-memory";
private static String storeDefFile = "test/common/voldemort/config/stores.xml";
private String configDirName;
@@ -151,7 +224,7 @@ protected void init(Properties properties) {
super.init(properties);
configDirName = properties.getProperty("ec2ConfigDirName");
numKeys = Integer.valueOf(properties.getProperty("rebalancingNumKeys", "10000"));
-
+ partitionsPerNode = Integer.valueOf(properties.getProperty("partitionsPerNode", "3"));
try {
FileUtils.copyFileToDirectory(new File(storeDefFile), new File(configDirName));
} catch (IOException e) {
View
61 contrib/ec2-testing/test/voldemort/utils/RemoteTestUtils.java
@@ -22,6 +22,9 @@
import java.util.List;
import java.util.Map;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -152,10 +155,20 @@ public static void stopClusterNode(String hostName, RemoteTestConfig remoteTestC
RemoteTestConfig remoteTestConfig,
boolean useExternal)
throws Exception {
- List<String> hostNames = new ArrayList<String>();
-
- for(HostNamePair hostNamePair: hostNamePairs)
- hostNames.add(useExternal ? hostNamePair.getExternalHostName() : hostNamePair.getInternalHostName());
+ // This isn't too elegant, but it works
+ List<String> hostNames = Lists.transform(hostNamePairs,
+ useExternal ?
+ new Function<HostNamePair, String> () {
+ public String apply(HostNamePair hostNamePair) {
+ return hostNamePair.getExternalHostName();
+ }
+ } :
+ new Function<HostNamePair, String> () {
+ public String apply(HostNamePair hostNamePair) {
+ return hostNamePair.getInternalHostName();
+ }
+ }
+ );
ClusterGenerator clusterGenerator = new ClusterGenerator();
List<ClusterNodeDescriptor> nodes = clusterGenerator.createClusterNodeDescriptors(hostNames,
@@ -165,17 +178,43 @@ public static void stopClusterNode(String hostName, RemoteTestConfig remoteTestC
Map<String, Integer> nodeIds = new HashMap<String, Integer>();
for(ClusterNodeDescriptor node: nodes) {
- // OK, yeah, this is super-inefficient...
- for(HostNamePair hostNamePair: hostNamePairs) {
- if(node.getHostName().equals(useExternal ?
- hostNamePair.getExternalHostName() :
- hostNamePair.getInternalHostName()))
- nodeIds.put(hostNamePair.getExternalHostName(), node.getId());
+ if (useExternal)
+ nodeIds.put(node.getHostName(), node.getId());
+ else {
+ // OK, yeah, this is super-inefficient...
+ for(HostNamePair hostNamePair: hostNamePairs)
+ if(node.getHostName().equals(hostNamePair.getInternalHostName()))
+ nodeIds.put(hostNamePair.getExternalHostName(), node.getId());
}
}
return nodeIds;
}
- // TODO: generate cluster descriptor given a template cluster
+ // TODO: move this out to a separate place
+
+ public static Map<String, Integer> generateClusterDescriptor(List<HostNamePair> hostNamePairs,
+ Cluster cluster,
+ RemoteTestConfig remoteTestConfig)
+ throws Exception {
+ List<String> hostNames = Lists.transform(hostNamePairs,
+ new Function<HostNamePair, String> () {
+ public String apply(HostNamePair hostNamePair) {
+ return hostNamePair.getExternalHostName();
+ }
+ });
+ ClusterGenerator clusterGenerator = new ClusterGenerator();
+ List<ClusterNodeDescriptor> nodes = clusterGenerator.createClusterNodeDescriptors(hostNames,
+ cluster);
+ String clusterXml = clusterGenerator.createClusterDescriptor(cluster.getName(), nodes);
+ FileUtils.writeStringToFile(remoteTestConfig.getClusterXmlFile(), clusterXml);
+ Map<String,Integer> nodeIds = new HashMap<String,Integer>();
+
+ for (ClusterNodeDescriptor node: nodes) {
+ nodeIds.put(node.getHostName(), node.getId());
+ }
+
+ return nodeIds;
+ }
+
}
Please sign in to comment.
Something went wrong with that request. Please try again.