Permalink
Browse files

Changed rebalancer API to only take one parameter.

  • Loading branch information...
1 parent a48ac00 commit 4316c3d93ab06b8f7515a404dbe17b9352550a44 @bbansal bbansal committed Jan 5, 2010
@@ -18,7 +18,7 @@
if [ $# -lt 3 ];
then
- echo 'USAGE: bin/voldemort-shell.sh bootstrapURL currentCluster.xml targetCluster.xml maxParallelRebalancing'
+ echo 'USAGE: bin/voldemort-shell.sh bootstrapURL targetCluster.xml maxParallelRebalancing'
exit 1
fi
@@ -17,6 +17,7 @@
import voldemort.store.rebalancing.RedirectingStore;
import voldemort.utils.RebalanceUtils;
import voldemort.versioning.VectorClock;
+import voldemort.versioning.Versioned;
import com.google.common.collect.ImmutableList;
@@ -56,19 +57,22 @@ public Thread newThread(Runnable r) {
* Voldemort dynamic cluster membership rebalancing mechanism. <br>
* Migrate partitions across nodes to managed changes in cluster
* memberships. <br>
- * Takes two cluster configurations currentCluster and targetCluster as
- * parameters compares and makes a list of partitions need to be
- * transferred.<br>
+ * Takes targetCluster as parameters, fetches the current cluster
+ * configuration from the cluster compares and makes a list of partitions
+ * need to be transferred.<br>
* The cluster is kept consistent during rebalancing using a proxy mechanism
* via {@link RedirectingStore}<br>
*
*
* @param targetCluster: target Cluster configuration
*/
- public void rebalance(final Cluster currentCluster, final Cluster targetCluster) {
- logger.info("Current Cluster configuration:" + currentCluster);
+ public void rebalance(final Cluster targetCluster) {
+ Versioned<Cluster> currentVersionedCluster = RebalanceUtils.getLatestCluster(new ArrayList<Integer>(),
+ adminClient);
+ logger.info("Current Cluster configuration:" + currentVersionedCluster);
logger.info("Target Cluster configuration:" + targetCluster);
+ Cluster currentCluster = currentVersionedCluster.getValue();
adminClient.setAdminClientCluster(currentCluster);
List<String> storeList = RebalanceUtils.getStoreNameList(currentCluster, adminClient);
@@ -78,8 +82,8 @@ public void rebalance(final Cluster currentCluster, final Cluster targetCluster)
}
final RebalanceClusterPlan rebalanceClusterPlan = new RebalanceClusterPlan(currentCluster,
- targetCluster,
- storeList);
+ targetCluster,
+ storeList);
logger.info(rebalanceClusterPlan);
// start all threads
@@ -89,14 +93,12 @@ public void rebalance(final Cluster currentCluster, final Cluster targetCluster)
public void run() {
// pick one node to rebalance from queue
while(!rebalanceClusterPlan.getRebalancingTaskQueue().isEmpty()) {
- logger.info("rebalanceTaskQueue size:"
- + rebalanceClusterPlan.getRebalancingTaskQueue().size());
- RebalanceNodePlan rebalanceTask = rebalanceClusterPlan.getRebalancingTaskQueue()
- .poll();
- if(null != rebalanceTask) {
- int stealerNodeId = rebalanceTask.getStealerNode();
- List<RebalancePartitionsInfo> rebalanceSubTaskList = rebalanceTask.getRebalanceTaskList();
+ RebalanceNodePlan rebalanceNodePlan = rebalanceClusterPlan.getRebalancingTaskQueue()
+ .poll();
+ if(null != rebalanceNodePlan) {
+ int stealerNodeId = rebalanceNodePlan.getStealerNode();
+ List<RebalancePartitionsInfo> rebalanceSubTaskList = rebalanceNodePlan.getRebalanceTaskList();
while(rebalanceSubTaskList.size() > 0) {
int index = (int) Math.random() * rebalanceSubTaskList.size();
@@ -106,12 +108,12 @@ public void run() {
try {
- // first commit cluster changes on
- // nodes.
+ // commit cluster changes to all nodes
commitClusterChanges(stealerNodeId,
targetCluster,
rebalanceSubTask);
- // attempt to rebalance for all stores.
+
+ // attempt rebalancing.
attemptRebalanceSubTask(rebalanceSubTask);
logger.info("Successfully finished RebalanceSubTask attempt:"
@@ -16,19 +16,18 @@
private static StoreDefinitionsMapper storesMapper = new StoreDefinitionsMapper();
public static void main(String[] args) throws Exception {
- if(args.length != 4)
- Utils.croak("USAGE: java RebalanceCommandShell bootstrapURL currentCluster.xml targetCluster.xml maxParallelRebalancing");
+ if(args.length != 3)
+ Utils.croak("USAGE: java RebalanceCommandShell bootstrapURL targetCluster.xml maxParallelRebalancing");
String bootstrapURL = args[0];
- Cluster currentCluster = clusterMapper.readCluster(new File(args[1]));
- Cluster targetCluster = clusterMapper.readCluster(new File(args[2]));
- int maxParallelRebalancing = Integer.parseInt(args[3]);
+ Cluster targetCluster = clusterMapper.readCluster(new File(args[1]));
+ int maxParallelRebalancing = Integer.parseInt(args[2]);
RebalanceClientConfig config = new RebalanceClientConfig();
config.setMaxParallelRebalancing(maxParallelRebalancing);
rebalanceClient = new RebalanceClient(bootstrapURL, config);
- rebalanceClient.rebalance(currentCluster, targetCluster);
+ rebalanceClient.rebalance(targetCluster);
}
}
@@ -107,7 +107,7 @@ private void attemptRebalance(RebalancePartitionsInfo stealInfo) {
metadataStore.getCluster());
try {
int rebalanceAsyncId = rebalanceLocalNode(storeName, stealInfo);
-
+
adminClient.waitForCompletion(stealInfo.getStealerId(),
rebalanceAsyncId,
voldemortConfig.getAdminSocketTimeout(),
@@ -159,10 +159,10 @@ public void operate() throws Exception {
try {
logger.info("Rebalancer: rebalance " + stealInfo + " starting.");
- // check and create redirectingSocketStore if needed.
- checkAndCreateRedirectingSocketStore(storeName,
- adminClient.getAdminClientCluster()
- .getNodeById(stealInfo.getDonorId()));
+ // create redirectingSocketStore for redirection.
+ createRedirectingSocketStore(storeName,
+ adminClient.getAdminClientCluster()
+ .getNodeById(stealInfo.getDonorId()));
checkCurrentState(metadataStore, stealInfo);
setRebalancingState(metadataStore, stealInfo);
@@ -232,20 +232,22 @@ private void checkCurrentState(MetadataStore metadataStore, RebalancePartitionsI
+ " rejecting rebalance request:" + stealInfo);
}
- private void checkAndCreateRedirectingSocketStore(String storeName, Node donorNode) {
- if(!storeRepository.hasRedirectingSocketStore(storeName, donorNode.getId())) {
+ /**
+ * Create redirectingSocketStore for the donorNode when rebalancing.
+ *
+ * @param storeName
+ * @param donorNode
+ */
+ private void createRedirectingSocketStore(String storeName, Node donorNode) {
+ if(voldemortConfig.isRedirectRoutingEnabled()
+ && !storeRepository.hasRedirectingSocketStore(storeName, donorNode.getId())) {
storeRepository.addRedirectingSocketStore(donorNode.getId(),
- createRedirectingSocketStore(storeName,
- donorNode));
+ new SocketStore(storeName,
+ new SocketDestination(donorNode.getHost(),
+ donorNode.getSocketPort(),
+ RequestFormatType.PROTOCOL_BUFFERS),
+ socketPool,
+ RequestRoutingType.IGNORE_CHECKS));
}
}
-
- private SocketStore createRedirectingSocketStore(String storeName, Node node) {
- return new SocketStore(storeName,
- new SocketDestination(node.getHost(),
- node.getSocketPort(),
- RequestFormatType.PROTOCOL_BUFFERS),
- socketPool,
- RequestRoutingType.IGNORE_CHECKS);
- }
}
@@ -40,13 +40,11 @@
import voldemort.annotations.jmx.JmxManaged;
import voldemort.annotations.jmx.JmxOperation;
import voldemort.client.ClientThreadPool;
-import voldemort.client.protocol.RequestFormatType;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.serialization.ByteArraySerializer;
import voldemort.serialization.SlopSerializer;
import voldemort.server.AbstractService;
-import voldemort.server.RequestRoutingType;
import voldemort.server.ServiceType;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
@@ -186,11 +184,6 @@ public void openStore(StoreDefinition storeDef) {
if(voldemortConfig.isServerRoutingEnabled())
registerNodeStores(storeDef, metadata.getCluster(), voldemortConfig.getNodeId());
- if(voldemortConfig.isRedirectRoutingEnabled())
- registerRedirectingSocketStores(storeDef,
- metadata.getCluster(),
- voldemortConfig.getNodeId());
-
if(storeDef.hasRetentionPeriod())
scheduleCleanupJob(storeDef, engine);
}
@@ -290,21 +283,6 @@ public void registerNodeStores(StoreDefinition def, Cluster cluster, int localNo
false);
}
- private void registerRedirectingSocketStores(StoreDefinition def, Cluster cluster, int localNode) {
- for(Node node: cluster.getNodes()) {
- Store<ByteArray, byte[]> store;
- if(node.getId() != localNode) {
- store = new SocketStore(def.getName(),
- new SocketDestination(node.getHost(),
- node.getSocketPort(),
- RequestFormatType.PROTOCOL_BUFFERS),
- socketPool,
- RequestRoutingType.IGNORE_CHECKS);
- this.storeRepository.addRedirectingSocketStore(node.getId(), store);
- }
- }
- }
-
/**
* Schedule a data retention cleanup job for the given store
*
@@ -47,9 +47,8 @@
* The RedirectingRoutedStore extends {@link RoutedStore}
*
* catch all InvalidMetadataException and updates the routed store with latest
- * cluster metadata.<br>
- * ServerSide Routing side of re-bootstrap implemented in
- * {@link DefaultStoreClient}
+ * cluster metadata, client rebootstrapping behavior same in
+ * {@link DefaultStoreClient} for server side routing<br>
*
*/
public class RebalancingRoutedStore extends RoutedStore {
@@ -105,7 +104,8 @@ private void reinit() {
/**
* Check that all nodes in the new cluster have a corrosponding entry in
- * storeRepositiry and innerStores. add a NodeStore if not present.
+ * storeRepositiry and innerStores. add a NodeStore if not present, is
+ * needed as with rebalancing we can add new nodes on the fly.
*
*/
private void checkAndAddNodeStore() {
@@ -663,7 +663,7 @@ public void put(final ByteArray key, final Versioned<byte[]> versioned)
} catch(ObsoleteVersionException e) {
// if this version is obsolete on the master, then bail out
// of this operation
- throw e;
+ failures.add(e);
} catch(Exception e) {
failures.add(e);
}
@@ -496,7 +496,7 @@ private void rebalanceAndCheck(Cluster currentCluster,
Cluster targetCluster,
RebalanceClient rebalanceClient,
List<Integer> nodeCheckList) {
- rebalanceClient.rebalance(currentCluster, targetCluster);
+ rebalanceClient.rebalance(targetCluster);
for(int nodeId: nodeCheckList) {
List<Integer> availablePartitions = targetCluster.getNodeById(nodeId).getPartitionIds();

0 comments on commit 4316c3d

Please sign in to comment.