Skip to content

Commit

Permalink
Added support for copying replication mapping changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbansal committed Jan 22, 2010
1 parent 66c8323 commit b76bb67
Show file tree
Hide file tree
Showing 16 changed files with 391 additions and 170 deletions.
6 changes: 3 additions & 3 deletions clients/python/voldemort_admin_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 48 additions & 33 deletions contrib/ec2-testing/test/voldemort/utils/Ec2RebalanceTest.java
@@ -1,20 +1,32 @@
package voldemort.utils;

import static org.junit.Assert.fail;
import static voldemort.utils.Ec2RemoteTestUtils.createInstances;
import static voldemort.utils.Ec2RemoteTestUtils.destroyInstances;
import static voldemort.utils.RemoteTestUtils.cleanupCluster;
import static voldemort.utils.RemoteTestUtils.deploy;
import static voldemort.utils.RemoteTestUtils.generateClusterDescriptor;
import static voldemort.utils.RemoteTestUtils.startClusterAsync;
import static voldemort.utils.RemoteTestUtils.startClusterNode;
import static voldemort.utils.RemoteTestUtils.stopCluster;
import static voldemort.utils.RemoteTestUtils.stopClusterQuiet;
import static voldemort.utils.RemoteTestUtils.cleanupCluster;
import static voldemort.utils.RemoteTestUtils.toHostNames;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.junit.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import voldemort.ServerTestUtils;
import voldemort.client.protocol.RequestFormatType;
Expand All @@ -31,10 +43,6 @@
import voldemort.store.socket.SocketStore;
import voldemort.versioning.Versioned;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
* @author afeinberg
*/
Expand All @@ -45,7 +53,7 @@ public class Ec2RebalanceTest extends AbstractRebalanceTest {
private static List<HostNamePair> hostNamePairs;
private static List<String> hostNames;

private Map<Integer,String> nodeIdsInv = new HashMap<Integer,String>();
private Map<Integer, String> nodeIdsInv = new HashMap<Integer, String>();
private List<String> activeHostNames = new ArrayList<String>();

@BeforeClass
Expand All @@ -62,13 +70,13 @@ public static void ec2Setup() throws Exception {

@AfterClass
public static void ec2TearDown() throws Exception {
if (hostNames != null)
if(hostNames != null)
destroyInstances(hostNames, ec2RebalanceTestConfig);
}

@After
public void ec2Cleanup() throws Exception {
if (activeHostNames.size() > 0) {
if(activeHostNames.size() > 0) {
stopClusterQuiet(activeHostNames, ec2RebalanceTestConfig);
cleanupCluster(activeHostNames, ec2RebalanceTestConfig);
}
Expand All @@ -77,7 +85,7 @@ public void ec2Cleanup() throws Exception {
@Override
protected Cluster updateCluster(Cluster template) {
List<Node> nodes = new ArrayList<Node>();
for (Map.Entry<Integer,String> entry: nodeIdsInv.entrySet()) {
for(Map.Entry<Integer, String> entry: nodeIdsInv.entrySet()) {
int nodeId = entry.getKey();
String hostName = entry.getValue();
Node tmplNode = template.getNodeById(nodeId);
Expand All @@ -103,13 +111,18 @@ protected SocketStore getSocketStore(String storeName, String host, int port, bo
}

@Override
protected Cluster startServers(Cluster template, String StoreDefXmlFile, List<Integer> nodeToStart, Map<String, String> configProps) throws Exception {
if (ec2RebalanceTestConfig.getInstanceCount() < template.getNumberOfNodes())
protected Cluster startServers(Cluster template,
String StoreDefXmlFile,
List<Integer> nodeToStart,
Map<String, String> configProps) throws Exception {
if(ec2RebalanceTestConfig.getInstanceCount() < template.getNumberOfNodes())
throw new IllegalStateException("instanceCount must be >= number of nodes in the cluster");

Map<String,Integer> nodeIds = generateClusterDescriptor(hostNamePairs, template, ec2RebalanceTestConfig);
Map<String, Integer> nodeIds = generateClusterDescriptor(hostNamePairs,
template,
ec2RebalanceTestConfig);
List<Node> nodes = new ArrayList<Node>();
for (Map.Entry<String,Integer> entry: nodeIds.entrySet()) {
for(Map.Entry<String, Integer> entry: nodeIds.entrySet()) {
String hostName = entry.getKey();
int nodeId = entry.getValue();
Node tmplNode = template.getNodeById(nodeId);
Expand Down Expand Up @@ -138,7 +151,7 @@ protected Cluster startServers(Cluster template, String StoreDefXmlFile, List<In
@Override
protected void stopServer(List<Integer> nodesToStop) throws Exception {
List<String> hostsToStop = new ArrayList<String>();
for (int nodeId: nodesToStop) {
for(int nodeId: nodesToStop) {
hostsToStop.add(nodeIdsInv.get(nodeId));
}
stopCluster(hostsToStop, ec2RebalanceTestConfig);
Expand All @@ -147,12 +160,12 @@ protected void stopServer(List<Integer> nodesToStop) throws Exception {
@Test
public void testGracefulRecovery() throws Exception {
Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] {
{ 0, 1, 2, 3, 4, 5, 6, 7, 8 }, {} });
{ 0, 1, 2, 3, 4, 5, 6, 7, 8 }, {} });

Cluster targetCluster = ServerTestUtils.getLocalCluster(2, new int[][] {
{ 0, 1, 4, 5, 6, 7, 8 }, { 2, 3 } });
{ 0, 1, 4, 5, 6, 7, 8 }, { 2, 3 } });

List<Integer> serverList = Arrays.asList(0,1);
List<Integer> serverList = Arrays.asList(0, 1);
currentCluster = startServers(currentCluster, storeDefFile, serverList, null);
targetCluster = updateCluster(targetCluster);

Expand All @@ -162,9 +175,10 @@ public void testGracefulRecovery() throws Exception {
new AdminClientConfig());
RebalancePartitionsInfo rebalancePartitionsInfo = new RebalancePartitionsInfo(1,
0,
Arrays.asList(2, 3),
Arrays.asList(2,
3),
new ArrayList<Integer>(0),
Arrays.asList(testStoreName),
false,
0);
int requestId = adminClient.rebalanceNode(rebalancePartitionsInfo);
logger.info("started rebalanceNode, request id = " + requestId);
Expand All @@ -179,26 +193,26 @@ public void testGracefulRecovery() throws Exception {
startClusterNode(hostName, ec2RebalanceTestConfig, 1);

adminClient.stop();
adminClient = new AdminClient(getBootstrapUrl(currentCluster, 0),
new AdminClientConfig());
adminClient = new AdminClient(getBootstrapUrl(currentCluster, 0), new AdminClientConfig());
Versioned<MetadataStore.VoldemortState> serverState = adminClient.getRemoteServerState(1);

int delay = 250;
int maxDelay = 1000 * 30;
int timeout = 5 * 1000 * 60;
long start = System.currentTimeMillis();
while (System.currentTimeMillis() < start + timeout &&
serverState.getValue() != MetadataStore.VoldemortState.NORMAL_SERVER) {
while(System.currentTimeMillis() < start + timeout
&& serverState.getValue() != MetadataStore.VoldemortState.NORMAL_SERVER) {
Thread.sleep(delay);
if (delay < maxDelay)
if(delay < maxDelay)
delay *= 2;
serverState = adminClient.getRemoteServerState(1);
logger.info("serverState -> " + serverState.getValue());
}

if (serverState.getValue() == MetadataStore.VoldemortState.NORMAL_SERVER) {
for (int nodeId: Arrays.asList(1)) {
List<Integer> availablePartitions = targetCluster.getNodeById(nodeId).getPartitionIds();

if(serverState.getValue() == MetadataStore.VoldemortState.NORMAL_SERVER) {
for(int nodeId: Arrays.asList(1)) {
List<Integer> availablePartitions = targetCluster.getNodeById(nodeId)
.getPartitionIds();
List<Integer> unavailablePartitions = getUnavailablePartitions(targetCluster,
availablePartitions);

Expand All @@ -207,15 +221,16 @@ public void testGracefulRecovery() throws Exception {
targetCluster,
unavailablePartitions,
availablePartitions);
} catch (InvalidMetadataException e) {
} catch(InvalidMetadataException e) {
logger.warn(e);
}
}
} else
} else
fail("Server state never reached NORMAL_SERVER");
}

private static class Ec2RebalanceTestConfig extends Ec2RemoteTestConfig {

private String configDirName;
private int numKeys;

Expand All @@ -227,7 +242,7 @@ protected void init(Properties properties) {

try {
FileUtils.copyFile(new File(storeDefFile), new File(configDirName + "/stores.xml"));
} catch (IOException e) {
} catch(IOException e) {
throw new RuntimeException(e);
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -465,12 +465,10 @@ public Thread newThread(Runnable r) {
List<StoreDefinition> storeDefList = getRemoteStoreDefList(nodeId).getValue();
Cluster cluster = getRemoteCluster(nodeId).getValue();

List<String> writableStores = RebalanceUtils.getWritableStores(storeDefList);
List<StoreDefinition> writableStores = RebalanceUtils.getWritableStores(storeDefList);

for(StoreDefinition def: storeDefList) {
if(writableStores.contains(def.getName())) {
restoreStoreFromReplication(nodeId, cluster, def, executors);
}
for(StoreDefinition def: writableStores) {
restoreStoreFromReplication(nodeId, cluster, def, executors);
}
} finally {
executors.shutdown();
Expand Down Expand Up @@ -594,7 +592,7 @@ public int rebalanceNode(RebalancePartitionsInfo stealInfo) {
.setStealerId(stealInfo.getStealerId())
.addAllPartitions(stealInfo.getPartitionList())
.addAllUnbalancedStore(stealInfo.getUnbalancedStoreList())
.setDeleteDonorPartitions(stealInfo.isDeleteDonorPartitions())
.addAllDeletePartitions(stealInfo.getDeletePartitionsList())
.build();
VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder()
.setType(VAdminProto.AdminRequestType.INITIATE_REBALANCE_NODE)
Expand Down

0 comments on commit b76bb67

Please sign in to comment.