Permalink
Browse files

sort RebalancePartitionInfo based on partition replica type

  • Loading branch information...
1 parent d1bed13 commit 2ddcd2f2254a68381a92489839178155ca1d087d Lei Gao committed Jul 29, 2011
@@ -11,6 +11,8 @@
import voldemort.utils.KeyDistributionGenerator;
import voldemort.utils.Utils;
+import com.google.common.collect.Lists;
+
/**
* Ordered representation of a cluster transition that guarantees that primary
* partition movements will take place before replicas.
@@ -23,7 +25,7 @@
private final Cluster currentCluster;
private final Cluster targetCluster;
private final RebalanceClusterPlan rebalanceClusterPlan;
- private final List<RebalanceNodePlan> orderedRebalanceNodePlanList;
+ private final List<RebalancePartitionsInfo> orderedRebalancePartitionsInfoList;
private final List<StoreDefinition> storeDefs;
private String printedContent;
private final int id;
@@ -37,7 +39,7 @@ public OrderedClusterTransition(final Cluster currentCluster,
this.targetCluster = targetCluster;
this.storeDefs = storeDefs;
this.rebalanceClusterPlan = rebalanceClusterPlan;
- this.orderedRebalanceNodePlanList = orderedClusterPlan(rebalanceClusterPlan);
+ this.orderedRebalancePartitionsInfoList = orderedClusterPlan(rebalanceClusterPlan);
}
public List<StoreDefinition> getStoreDefs() {
@@ -56,8 +58,8 @@ public Cluster getCurrentCluster() {
return currentCluster;
}
- public List<RebalanceNodePlan> getOrderedRebalanceNodePlanList() {
- return orderedRebalanceNodePlanList;
+ public List<RebalancePartitionsInfo> getOrderedRebalancePartitionsInfoList() {
+ return orderedRebalancePartitionsInfoList;
}
@Override
@@ -82,18 +84,16 @@ public String toString() {
.append(Utils.NEWLINE);
sb.append("- Ordered rebalance node plan : ")
.append(Utils.NEWLINE)
- .append(printRebalanceNodePlan(getOrderedRebalanceNodePlanList()));
+ .append(printRebalanceNodePlan(getOrderedRebalancePartitionsInfoList()));
printedContent = sb.toString();
}
return printedContent;
}
- private String printRebalanceNodePlan(List<RebalanceNodePlan> rebalanceNodePlanList) {
+ private String printRebalanceNodePlan(List<RebalancePartitionsInfo> rebalancePartitionInfoList) {
StringBuilder builder = new StringBuilder();
- for(RebalanceNodePlan plan: rebalanceNodePlanList) {
- for(RebalancePartitionsInfo partitionInfo: plan.getRebalanceTaskList()) {
- builder.append(partitionInfo).append(Utils.NEWLINE);
- }
+ for(RebalancePartitionsInfo partitionInfo: rebalancePartitionInfoList) {
+ builder.append(partitionInfo).append(Utils.NEWLINE);
}
return builder.toString();
}
@@ -109,68 +109,29 @@ private RebalanceClusterPlan getRebalanceClusterPlan() {
* @param rebalanceClusterPlan Rebalance cluster plan
* @return Returns a list of ordered rebalance node plan
*/
- private List<RebalanceNodePlan> orderedClusterPlan(final RebalanceClusterPlan rebalanceClusterPlan) {
+ private List<RebalancePartitionsInfo> orderedClusterPlan(final RebalanceClusterPlan rebalanceClusterPlan) {
Queue<RebalanceNodePlan> rebalancingTaskQueue = rebalanceClusterPlan.getRebalancingTaskQueue();
- List<RebalanceNodePlan> plans = new ArrayList<RebalanceNodePlan>();
+ List<RebalancePartitionsInfo> clusterRebalancePartitionsInfos = Lists.newArrayList();
for(RebalanceNodePlan rebalanceNodePlan: rebalancingTaskQueue) {
-
- // Order the individual partition plans first
- List<RebalancePartitionsInfo> orderedRebalancePartitionsInfos = orderedPartitionInfos(rebalanceNodePlan);
- plans.add(new RebalanceNodePlan(rebalanceNodePlan.getNodeId(),
- orderedRebalancePartitionsInfos,
- rebalanceNodePlan.isNodeStealer()));
-
+ clusterRebalancePartitionsInfos.addAll(rebalanceNodePlan.getRebalanceTaskList());
}
- return orderedNodePlans(plans);
+ return orderedPartitionInfos(clusterRebalancePartitionsInfos);
}
/**
- * Ordering the list of {@link RebalanceNodePlan} such that all plans with
+ * Ordering the list of {@link RebalancePartitionsInfo} such that all
* primary partition moves come first
*
- * @param rebalanceNodePlans List of Node plans
- * @return Returns a list of ordered {@link RebalanceNodePlan}
+ * @param clusterRebalancePartitionsInfos List of partition info
+ * @return Returns a list of ordered {@link RebalancePartitionsInfo}
*/
- private List<RebalanceNodePlan> orderedNodePlans(List<RebalanceNodePlan> rebalanceNodePlans) {
- List<RebalanceNodePlan> first = new ArrayList<RebalanceNodePlan>();
- List<RebalanceNodePlan> second = new ArrayList<RebalanceNodePlan>();
-
- for(RebalanceNodePlan plan: rebalanceNodePlans) {
- boolean found = false;
- for(RebalancePartitionsInfo partitionInfo: plan.getRebalanceTaskList()) {
- List<Integer> stealMasterPartitions = partitionInfo.getStealMasterPartitions();
- if(stealMasterPartitions != null && !stealMasterPartitions.isEmpty()) {
- found = true;
- break;
- }
- }
-
- if(found) {
- first.add(plan);
- } else {
- second.add(plan);
- }
- }
- first.addAll(second);
- return first;
- }
-
- /**
- * Ordering {@link RebalancePartitionsInfo} for a single node such that it
- * guarantees that primary partition movements will be before any replica
- * partition movements
- *
- * @param rebalanceNodePlan Node plan for a particular node ( either stealer
- * based or donor based )
- * @return List of ordered {@link RebalancePartitionsInfo}.
- */
- private List<RebalancePartitionsInfo> orderedPartitionInfos(final RebalanceNodePlan rebalanceNodePlan) {
+ private List<RebalancePartitionsInfo> orderedPartitionInfos(List<RebalancePartitionsInfo> clusterRebalancePartitionsInfo) {
List<RebalancePartitionsInfo> listPrimaries = new ArrayList<RebalancePartitionsInfo>();
List<RebalancePartitionsInfo> listReplicas = new ArrayList<RebalancePartitionsInfo>();
- for(RebalancePartitionsInfo partitionInfo: rebalanceNodePlan.getRebalanceTaskList()) {
+ for(RebalancePartitionsInfo partitionInfo: clusterRebalancePartitionsInfo) {
List<Integer> stealMasterPartitions = partitionInfo.getStealMasterPartitions();
if(stealMasterPartitions != null && !stealMasterPartitions.isEmpty()) {
listPrimaries.add(partitionInfo);
@@ -184,5 +145,4 @@ private RebalanceClusterPlan getRebalanceClusterPlan() {
return listPrimaries;
}
-
}
@@ -355,9 +355,9 @@ private void rebalancePerClusterTransition(Cluster currentCluster,
*/
private void rebalancePerPartitionTransition(final OrderedClusterTransition orderedClusterTransition) {
try {
- final List<RebalanceNodePlan> rebalanceNodePlanList = orderedClusterTransition.getOrderedRebalanceNodePlanList();
+ final List<RebalancePartitionsInfo> rebalancePartitionsInfoList = orderedClusterTransition.getOrderedRebalancePartitionsInfoList();
- if(rebalanceNodePlanList.isEmpty()) {
+ if(rebalancePartitionsInfoList.isEmpty()) {
RebalanceUtils.printLog(orderedClusterTransition.getId(),
logger,
"Skipping rebalance task id "
@@ -372,7 +372,7 @@ private void rebalancePerPartitionTransition(final OrderedClusterTransition orde
+ orderedClusterTransition.getId());
// Flatten the node plans to partition plans
- List<RebalancePartitionsInfo> rebalancePartitionPlanList = RebalanceUtils.flattenNodePlans(rebalanceNodePlanList);
+ List<RebalancePartitionsInfo> rebalancePartitionPlanList = rebalancePartitionsInfoList;
// Split the store definitions
List<StoreDefinition> readOnlyStoreDefs = RebalanceUtils.filterStores(orderedClusterTransition.getStoreDefs(),
@@ -254,7 +254,6 @@ public void testRORWRebalance() throws Exception {
RebalanceClientConfig config = new RebalanceClientConfig();
config.setDeleteAfterRebalancingEnabled(true);
- config.setStealerBasedRebalancing(!useDonorBased());
RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster,
0),
config);
@@ -300,7 +299,6 @@ public void testRORWRebalanceWithReplication() throws Exception {
RebalanceClientConfig config = new RebalanceClientConfig();
config.setDeleteAfterRebalancingEnabled(true);
- config.setStealerBasedRebalancing(!useDonorBased());
RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster,
0),
config);
@@ -348,7 +346,6 @@ public void testRORebalanceWithReplication() throws Exception {
RebalanceClientConfig config = new RebalanceClientConfig();
config.setDeleteAfterRebalancingEnabled(true);
- config.setStealerBasedRebalancing(!useDonorBased());
RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster,
0),
config);
@@ -434,7 +431,6 @@ public void testProxyGetDuringRebalancing() throws Exception {
RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig();
rebalanceClientConfig.setMaxParallelRebalancing(2);
- rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased());
final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster,
0),
@@ -572,7 +568,6 @@ public void testServerSideRouting() throws Exception {
// populate data now.
RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig();
rebalanceClientConfig.setMaxParallelRebalancing(2);
- rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased());
final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster,
0),
Oops, something went wrong.

0 comments on commit 2ddcd2f

Please sign in to comment.