Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: voldemort/voldemort
base: master
...
head fork: voldemort/voldemort
compare: li-r1124
Checking mergeability… Don’t worry, you can still create the pull request.
  • 9 commits
  • 8 files changed
  • 0 commit comments
  • 2 contributors
View
2  .classpath
@@ -39,7 +39,6 @@
<classpathentry kind="lib" path="lib/commons-pool-1.5.2.jar"/>
<classpathentry kind="lib" path="lib/protobuf-java-2.3.0.jar"/>
<classpathentry kind="lib" path="contrib/ec2-testing/lib/typica.jar"/>
- <classpathentry kind="lib" path="lib/google-collect-1.0.jar"/>
<classpathentry kind="lib" path="lib/je-4.0.92.jar"/>
<classpathentry kind="lib" path="lib/paranamer-2.1.jar"/>
<classpathentry kind="lib" path="lib/jackson-mapper-asl-1.4.0.jar"/>
@@ -49,5 +48,6 @@
<classpathentry kind="lib" path="lib/mockito-all-1.8.5.jar"/>
<classpathentry kind="lib" path="lib/avro-1.4.0.jar"/>
<classpathentry kind="lib" path="lib/libthrift-0.5.0.jar"/>
+ <classpathentry kind="lib" path="lib/google-collect-1.0-rc2.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
2  build.properties
@@ -34,4 +34,4 @@ tomcat.manager.password=tomcat
tomcat.context=/voldemort
## Release
-curr.release=0.81
+curr.release=0.94.li5
View
BIN  lib/google-collect-1.0-rc2.jar
Binary file not shown
View
BIN  lib/google-collect-1.0.jar
Binary file not shown
View
78 src/java/voldemort/client/rebalance/OrderedClusterTransition.java
@@ -11,6 +11,8 @@
import voldemort.utils.KeyDistributionGenerator;
import voldemort.utils.Utils;
+import com.google.common.collect.Lists;
+
/**
* Ordered representation of a cluster transition that guarantees that primary
* partition movements will take place before replicas.
@@ -23,7 +25,7 @@
private final Cluster currentCluster;
private final Cluster targetCluster;
private final RebalanceClusterPlan rebalanceClusterPlan;
- private final List<RebalanceNodePlan> orderedRebalanceNodePlanList;
+ private final List<RebalancePartitionsInfo> orderedRebalancePartitionsInfoList;
private final List<StoreDefinition> storeDefs;
private String printedContent;
private final int id;
@@ -37,7 +39,7 @@ public OrderedClusterTransition(final Cluster currentCluster,
this.targetCluster = targetCluster;
this.storeDefs = storeDefs;
this.rebalanceClusterPlan = rebalanceClusterPlan;
- this.orderedRebalanceNodePlanList = orderedClusterPlan(rebalanceClusterPlan);
+ this.orderedRebalancePartitionsInfoList = orderedClusterPlan(rebalanceClusterPlan);
}
public List<StoreDefinition> getStoreDefs() {
@@ -56,8 +58,8 @@ public Cluster getCurrentCluster() {
return currentCluster;
}
- public List<RebalanceNodePlan> getOrderedRebalanceNodePlanList() {
- return orderedRebalanceNodePlanList;
+ public List<RebalancePartitionsInfo> getOrderedRebalancePartitionsInfoList() {
+ return orderedRebalancePartitionsInfoList;
}
@Override
@@ -82,18 +84,16 @@ public String toString() {
.append(Utils.NEWLINE);
sb.append("- Ordered rebalance node plan : ")
.append(Utils.NEWLINE)
- .append(printRebalanceNodePlan(getOrderedRebalanceNodePlanList()));
+ .append(printRebalanceNodePlan(getOrderedRebalancePartitionsInfoList()));
printedContent = sb.toString();
}
return printedContent;
}
- private String printRebalanceNodePlan(List<RebalanceNodePlan> rebalanceNodePlanList) {
+ private String printRebalanceNodePlan(List<RebalancePartitionsInfo> rebalancePartitionInfoList) {
StringBuilder builder = new StringBuilder();
- for(RebalanceNodePlan plan: rebalanceNodePlanList) {
- for(RebalancePartitionsInfo partitionInfo: plan.getRebalanceTaskList()) {
- builder.append(partitionInfo).append(Utils.NEWLINE);
- }
+ for(RebalancePartitionsInfo partitionInfo: rebalancePartitionInfoList) {
+ builder.append(partitionInfo).append(Utils.NEWLINE);
}
return builder.toString();
}
@@ -109,68 +109,29 @@ private RebalanceClusterPlan getRebalanceClusterPlan() {
* @param rebalanceClusterPlan Rebalance cluster plan
* @return Returns a list of ordered rebalance node plan
*/
- private List<RebalanceNodePlan> orderedClusterPlan(final RebalanceClusterPlan rebalanceClusterPlan) {
+ private List<RebalancePartitionsInfo> orderedClusterPlan(final RebalanceClusterPlan rebalanceClusterPlan) {
Queue<RebalanceNodePlan> rebalancingTaskQueue = rebalanceClusterPlan.getRebalancingTaskQueue();
- List<RebalanceNodePlan> plans = new ArrayList<RebalanceNodePlan>();
+ List<RebalancePartitionsInfo> clusterRebalancePartitionsInfos = Lists.newArrayList();
for(RebalanceNodePlan rebalanceNodePlan: rebalancingTaskQueue) {
-
- // Order the individual partition plans first
- List<RebalancePartitionsInfo> orderedRebalancePartitionsInfos = orderedPartitionInfos(rebalanceNodePlan);
- plans.add(new RebalanceNodePlan(rebalanceNodePlan.getNodeId(),
- orderedRebalancePartitionsInfos,
- rebalanceNodePlan.isNodeStealer()));
-
+ clusterRebalancePartitionsInfos.addAll(rebalanceNodePlan.getRebalanceTaskList());
}
- return orderedNodePlans(plans);
+ return orderedPartitionInfos(clusterRebalancePartitionsInfos);
}
/**
- * Ordering the list of {@link RebalanceNodePlan} such that all plans with
+ * Ordering the list of {@link RebalancePartitionsInfo} such that all
* primary partition moves come first
*
- * @param rebalanceNodePlans List of Node plans
- * @return Returns a list of ordered {@link RebalanceNodePlan}
+ * @param clusterRebalancePartitionsInfos List of partition info
+ * @return Returns a list of ordered {@link RebalancePartitionsInfo}
*/
- private List<RebalanceNodePlan> orderedNodePlans(List<RebalanceNodePlan> rebalanceNodePlans) {
- List<RebalanceNodePlan> first = new ArrayList<RebalanceNodePlan>();
- List<RebalanceNodePlan> second = new ArrayList<RebalanceNodePlan>();
-
- for(RebalanceNodePlan plan: rebalanceNodePlans) {
- boolean found = false;
- for(RebalancePartitionsInfo partitionInfo: plan.getRebalanceTaskList()) {
- List<Integer> stealMasterPartitions = partitionInfo.getStealMasterPartitions();
- if(stealMasterPartitions != null && !stealMasterPartitions.isEmpty()) {
- found = true;
- break;
- }
- }
-
- if(found) {
- first.add(plan);
- } else {
- second.add(plan);
- }
- }
- first.addAll(second);
- return first;
- }
-
- /**
- * Ordering {@link RebalancePartitionsInfo} for a single node such that it
- * guarantees that primary partition movements will be before any replica
- * partition movements
- *
- * @param rebalanceNodePlan Node plan for a particular node ( either stealer
- * based or donor based )
- * @return List of ordered {@link RebalancePartitionsInfo}.
- */
- private List<RebalancePartitionsInfo> orderedPartitionInfos(final RebalanceNodePlan rebalanceNodePlan) {
+ private List<RebalancePartitionsInfo> orderedPartitionInfos(List<RebalancePartitionsInfo> clusterRebalancePartitionsInfo) {
List<RebalancePartitionsInfo> listPrimaries = new ArrayList<RebalancePartitionsInfo>();
List<RebalancePartitionsInfo> listReplicas = new ArrayList<RebalancePartitionsInfo>();
- for(RebalancePartitionsInfo partitionInfo: rebalanceNodePlan.getRebalanceTaskList()) {
+ for(RebalancePartitionsInfo partitionInfo: clusterRebalancePartitionsInfo) {
List<Integer> stealMasterPartitions = partitionInfo.getStealMasterPartitions();
if(stealMasterPartitions != null && !stealMasterPartitions.isEmpty()) {
listPrimaries.add(partitionInfo);
@@ -184,5 +145,4 @@ private RebalanceClusterPlan getRebalanceClusterPlan() {
return listPrimaries;
}
-
}
View
6 src/java/voldemort/client/rebalance/RebalanceController.java
@@ -355,9 +355,9 @@ private void rebalancePerClusterTransition(Cluster currentCluster,
*/
private void rebalancePerPartitionTransition(final OrderedClusterTransition orderedClusterTransition) {
try {
- final List<RebalanceNodePlan> rebalanceNodePlanList = orderedClusterTransition.getOrderedRebalanceNodePlanList();
+ final List<RebalancePartitionsInfo> rebalancePartitionsInfoList = orderedClusterTransition.getOrderedRebalancePartitionsInfoList();
- if(rebalanceNodePlanList.isEmpty()) {
+ if(rebalancePartitionsInfoList.isEmpty()) {
RebalanceUtils.printLog(orderedClusterTransition.getId(),
logger,
"Skipping rebalance task id "
@@ -372,7 +372,7 @@ private void rebalancePerPartitionTransition(final OrderedClusterTransition orde
+ orderedClusterTransition.getId());
// Flatten the node plans to partition plans
- List<RebalancePartitionsInfo> rebalancePartitionPlanList = RebalanceUtils.flattenNodePlans(rebalanceNodePlanList);
+ List<RebalancePartitionsInfo> rebalancePartitionPlanList = rebalancePartitionsInfoList;
// Split the store definitions
List<StoreDefinition> readOnlyStoreDefs = RebalanceUtils.filterStores(orderedClusterTransition.getStoreDefs(),
View
5 test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java
@@ -254,7 +254,6 @@ public void testRORWRebalance() throws Exception {
RebalanceClientConfig config = new RebalanceClientConfig();
config.setDeleteAfterRebalancingEnabled(true);
- config.setStealerBasedRebalancing(!useDonorBased());
RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster,
0),
config);
@@ -300,7 +299,6 @@ public void testRORWRebalanceWithReplication() throws Exception {
RebalanceClientConfig config = new RebalanceClientConfig();
config.setDeleteAfterRebalancingEnabled(true);
- config.setStealerBasedRebalancing(!useDonorBased());
RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster,
0),
config);
@@ -348,7 +346,6 @@ public void testRORebalanceWithReplication() throws Exception {
RebalanceClientConfig config = new RebalanceClientConfig();
config.setDeleteAfterRebalancingEnabled(true);
- config.setStealerBasedRebalancing(!useDonorBased());
RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster,
0),
config);
@@ -434,7 +431,6 @@ public void testProxyGetDuringRebalancing() throws Exception {
RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig();
rebalanceClientConfig.setMaxParallelRebalancing(2);
- rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased());
final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster,
0),
@@ -572,7 +568,6 @@ public void testServerSideRouting() throws Exception {
// populate data now.
RebalanceClientConfig rebalanceClientConfig = new RebalanceClientConfig();
rebalanceClientConfig.setMaxParallelRebalancing(2);
- rebalanceClientConfig.setStealerBasedRebalancing(!useDonorBased());
final RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(updatedCurrentCluster,
0),
View
334 test/unit/voldemort/client/rebalance/RebalanceClusterPlanTest.java
@@ -20,8 +20,10 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.StringReader;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import junit.framework.TestCase;
@@ -35,6 +37,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
public class RebalanceClusterPlanTest extends TestCase {
@@ -79,21 +82,26 @@ public void testRebalancePlanDelete() {
targetCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 1, 2, 3 },
{ 4, 5, 6, 7 }, { 0 } });
- List<RebalanceNodePlan> orderedRebalanceNodePlanList = createOrderedClusterTransition(currentCluster,
- targetCluster,
- Lists.newArrayList(ServerTestUtils.getStoreDef("test",
- 2,
- 1,
- 1,
- 1,
- 1,
- RoutingStrategyType.CONSISTENT_STRATEGY))).getOrderedRebalanceNodePlanList();
- assertEquals("There should be exactly 2 rebalancing node",
+ List<RebalancePartitionsInfo> orderedRebalancePartitionInfoList = createOrderedClusterTransition(currentCluster,
+ targetCluster,
+ Lists.newArrayList(ServerTestUtils.getStoreDef("test",
+ 2,
+ 1,
+ 1,
+ 1,
+ 1,
+ RoutingStrategyType.CONSISTENT_STRATEGY))).getOrderedRebalancePartitionsInfoList();
+ assertEquals("There should have exactly 2 rebalancing node",
2,
- orderedRebalanceNodePlanList.size());
+ getUniqueNodeCount(orderedRebalancePartitionInfoList, false));
+ assertEquals("There should be exactly 2 rebalancing partition info",
+ 2,
+ orderedRebalancePartitionInfoList.size());
assertEquals("Stealer 2 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(0).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(2, orderedRebalancePartitionInfoList));
+ // make sure partitionInfo is ordered with primary ones first
+ checkOrderedPartitionInfo(orderedRebalancePartitionInfoList);
// Partitions to move
HashMap<Integer, List<Integer>> partitionsToMove = Maps.newHashMap();
@@ -108,7 +116,8 @@ public void testRebalancePlanDelete() {
HashMap<String, HashMap<Integer, List<Integer>>> storeToPartitionsToDelete = Maps.newHashMap();
storeToPartitionsToDelete.put("test", partitionsToDelete);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(0),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(2,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(2,
0,
storeToPartitionsToMove,
@@ -118,7 +127,7 @@ public void testRebalancePlanDelete() {
assertEquals("Stealer 0 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(1).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList));
partitionsToMove = Maps.newHashMap();
partitionsToMove.put(1, Lists.newArrayList(0));
storeToPartitionsToMove = Maps.newHashMap();
@@ -129,7 +138,8 @@ public void testRebalancePlanDelete() {
storeToPartitionsToDelete = Maps.newHashMap();
storeToPartitionsToDelete.put("test", partitionsToDelete);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(1),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(0,
1,
storeToPartitionsToMove,
@@ -145,21 +155,27 @@ public void testRebalancePlanDelete() {
targetCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 1, 2, 3 },
{ 4, 5, 6, 7, 0 } });
- orderedRebalanceNodePlanList = createOrderedClusterTransition(currentCluster,
- targetCluster,
- Lists.newArrayList(ServerTestUtils.getStoreDef("test",
- 2,
- 1,
- 1,
- 1,
- 1,
- RoutingStrategyType.CONSISTENT_STRATEGY))).getOrderedRebalanceNodePlanList();
- assertEquals("There should be exactly 2 rebalancing node",
+ orderedRebalancePartitionInfoList = createOrderedClusterTransition(currentCluster,
+ targetCluster,
+ Lists.newArrayList(ServerTestUtils.getStoreDef("test",
+ 2,
+ 1,
+ 1,
+ 1,
+ 1,
+ RoutingStrategyType.CONSISTENT_STRATEGY))).getOrderedRebalancePartitionsInfoList();
+ assertEquals("There should have exactly 2 rebalancing node",
+ 2,
+ getUniqueNodeCount(orderedRebalancePartitionInfoList, false));
+ assertEquals("There should be exactly 2 rebalance partition info",
2,
- orderedRebalanceNodePlanList.size());
+ orderedRebalancePartitionInfoList.size());
assertEquals("Stealer 1 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(0).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList));
+ // make sure partitionInfo is ordered with primary ones first
+ checkOrderedPartitionInfo(orderedRebalancePartitionInfoList);
+
partitionsToMove = Maps.newHashMap();
partitionsToMove.put(0, Lists.newArrayList(0));
storeToPartitionsToMove = Maps.newHashMap();
@@ -168,7 +184,8 @@ public void testRebalancePlanDelete() {
partitionsToDelete = Maps.newHashMap();
storeToPartitionsToDelete = Maps.newHashMap();
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(0),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(1,
0,
storeToPartitionsToMove,
@@ -176,9 +193,9 @@ public void testRebalancePlanDelete() {
currentCluster,
0)));
- assertEquals("Stealer 1 should have 1 entry",
+ assertEquals("Stealer 0 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(1).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList));
partitionsToMove = Maps.newHashMap();
partitionsToMove.put(1, Lists.newArrayList(0));
storeToPartitionsToMove = Maps.newHashMap();
@@ -187,7 +204,8 @@ public void testRebalancePlanDelete() {
partitionsToDelete = Maps.newHashMap();
storeToPartitionsToDelete = Maps.newHashMap();
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(1),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(0,
1,
storeToPartitionsToMove,
@@ -208,16 +226,22 @@ public void testRebalancePlanDeleteLastNode() {
targetCluster = ServerTestUtils.getLocalCluster(4, new int[][] { { 0, 3, 6, 9, 12, 15 },
{ 1, 4, 7, 10, 13, 16 }, { 2, 5, 8, 11, 14, 17 }, {} });
- List<RebalanceNodePlan> orderedRebalanceNodePlanList = createOrderedClusterTransition(currentCluster,
- targetCluster,
- storeDefList2).getOrderedRebalanceNodePlanList();
- assertEquals("There should be exactly 1 rebalancing node",
+ List<RebalancePartitionsInfo> orderedRebalancePartitionInfoList = createOrderedClusterTransition(currentCluster,
+ targetCluster,
+ storeDefList2).getOrderedRebalancePartitionsInfoList();
+ assertEquals("There should have exactly 1 rebalancing node",
1,
- orderedRebalanceNodePlanList.size());
+ getUniqueNodeCount(orderedRebalancePartitionInfoList, false));
+ assertEquals("There should be exactly 1 rebalancing partition info",
+ 1,
+ orderedRebalancePartitionInfoList.size());
assertEquals("Stealer 0 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(0).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList));
+ // make sure partitionInfo is ordered with primary ones first
+ checkOrderedPartitionInfo(orderedRebalancePartitionInfoList);
+
HashMap<Integer, List<Integer>> partitionsToMove = Maps.newHashMap();
partitionsToMove.clear();
partitionsToMove.put(0, Lists.newArrayList(0));
@@ -226,7 +250,8 @@ public void testRebalancePlanDeleteLastNode() {
HashMap<String, HashMap<Integer, List<Integer>>> storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(0),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(0,
3,
storeToPartitionsToMove,
@@ -247,16 +272,20 @@ public void testRebalancePlanDeleteFirstNode() {
{ 2, 6 }, { 3, 7 } });
// PHASE 1
- List<RebalanceNodePlan> orderedRebalanceNodePlanList = createOrderedClusterTransition(currentCluster,
- targetCluster,
- storeDefList2).getOrderedRebalanceNodePlanList();
-
- assertEquals("There should be exactly 3 rebalancing node",
+ List<RebalancePartitionsInfo> orderedRebalancePartitionInfoList = createOrderedClusterTransition(currentCluster,
+ targetCluster,
+ storeDefList2).getOrderedRebalancePartitionsInfoList();
+ assertEquals("There should have exactly 3 rebalancing node",
+ 3,
+ getUniqueNodeCount(orderedRebalancePartitionInfoList, false));
+ assertEquals("There should be exactly 3 rebalancing partition info",
3,
- orderedRebalanceNodePlanList.size());
+ orderedRebalancePartitionInfoList.size());
assertEquals("Stealer 1 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(0).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList));
+ // make sure partitionInfo is ordered with primary ones first
+ checkOrderedPartitionInfo(orderedRebalancePartitionInfoList);
HashMap<Integer, List<Integer>> partitionsToMove = Maps.newHashMap();
partitionsToMove.clear();
@@ -266,7 +295,8 @@ public void testRebalancePlanDeleteFirstNode() {
HashMap<String, HashMap<Integer, List<Integer>>> storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(0),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(1,
0,
storeToPartitionsToMove,
@@ -275,13 +305,14 @@ public void testRebalancePlanDeleteFirstNode() {
0)));
assertEquals("Stealer 2 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(1).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(2, orderedRebalancePartitionInfoList));
partitionsToMove.clear();
partitionsToMove.put(1, Lists.newArrayList(0));
partitionsToMove.put(2, Lists.newArrayList(7));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(1),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(2,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(2,
1,
storeToPartitionsToMove,
@@ -291,12 +322,13 @@ public void testRebalancePlanDeleteFirstNode() {
assertEquals("Stealer 3 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(2).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList));
partitionsToMove.clear();
partitionsToMove.put(2, Lists.newArrayList(0));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(2),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(3,
2,
storeToPartitionsToMove,
@@ -311,23 +343,30 @@ public void testRebalancePlanDeleteFirstNode() {
targetCluster = ServerTestUtils.getLocalCluster(4, new int[][] { {}, { 0, 1, 5 },
{ 4, 2, 6 }, { 3, 7 } });
- orderedRebalanceNodePlanList = createOrderedClusterTransition(currentCluster,
- targetCluster,
- storeDefList2).getOrderedRebalanceNodePlanList();
+ orderedRebalancePartitionInfoList = createOrderedClusterTransition(currentCluster,
+ targetCluster,
+ storeDefList2).getOrderedRebalancePartitionsInfoList();
assertEquals("There should have exactly 3 rebalancing node",
3,
- orderedRebalanceNodePlanList.size());
+ getUniqueNodeCount(orderedRebalancePartitionInfoList, false));
+ assertEquals("There should have exactly 3 rebalancing partition info",
+ 3,
+ orderedRebalancePartitionInfoList.size());
assertEquals("Stealer 2 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(0).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(2, orderedRebalancePartitionInfoList));
+ // make sure partitionInfo is ordered with primary ones first
+ checkOrderedPartitionInfo(orderedRebalancePartitionInfoList);
+
partitionsToMove.clear();
partitionsToMove.put(0, Lists.newArrayList(4));
partitionsToMove.put(1, Lists.newArrayList(3));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(0),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(2,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(2,
0,
storeToPartitionsToMove,
@@ -336,12 +375,13 @@ public void testRebalancePlanDeleteFirstNode() {
0)));
assertEquals("Stealer 1 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(1).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList));
partitionsToMove.clear();
partitionsToMove.put(2, Lists.newArrayList(2));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(1),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(1,
0,
storeToPartitionsToMove,
@@ -351,12 +391,13 @@ public void testRebalancePlanDeleteFirstNode() {
assertEquals("Stealer 3 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(2).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList));
partitionsToMove.clear();
partitionsToMove.put(2, Lists.newArrayList(4));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(2),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(3,
2,
storeToPartitionsToMove,
@@ -373,23 +414,31 @@ public void testRebalanceDeletingMiddleNode() {
targetCluster = ServerTestUtils.getLocalCluster(4, new int[][] { { 0, 4 }, { 2, 1, 5 },
{ 6 }, { 3, 7 } });
- List<RebalanceNodePlan> orderedRebalanceNodePlanList = createOrderedClusterTransition(currentCluster,
- targetCluster,
- storeDefList2).getOrderedRebalanceNodePlanList();
+ List<RebalancePartitionsInfo> orderedRebalancePartitionInfoList = createOrderedClusterTransition(currentCluster,
+ targetCluster,
+ storeDefList2).getOrderedRebalancePartitionsInfoList();
assertEquals("There should have exactly 3 rebalancing node",
3,
- orderedRebalanceNodePlanList.size());
+ getUniqueNodeCount(orderedRebalancePartitionInfoList, false));
+
+ assertEquals("There should have exactly 3 rebalancing partition info",
+ 3,
+ orderedRebalancePartitionInfoList.size());
assertEquals("Stealer 1 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(0).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList));
+ // make sure partitionInfo is ordered with primary ones first
+ checkOrderedPartitionInfo(orderedRebalancePartitionInfoList);
+
HashMap<Integer, List<Integer>> partitionsToMove = Maps.newHashMap();
partitionsToMove.clear();
partitionsToMove.put(0, Lists.newArrayList(2));
HashMap<String, HashMap<Integer, List<Integer>>> storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(0),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(1,
2,
storeToPartitionsToMove,
@@ -398,12 +447,13 @@ public void testRebalanceDeletingMiddleNode() {
0)));
assertEquals("Stealer 0 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(1).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList));
partitionsToMove.clear();
partitionsToMove.put(2, Lists.newArrayList(1));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(1),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(0,
3,
storeToPartitionsToMove,
@@ -413,13 +463,14 @@ public void testRebalanceDeletingMiddleNode() {
assertEquals("Stealer 3 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(2).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList));
partitionsToMove.clear();
partitionsToMove.put(1, Lists.newArrayList(1));
partitionsToMove.put(2, Lists.newArrayList(0));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(2),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(3,
2,
storeToPartitionsToMove,
@@ -433,24 +484,31 @@ public void testRebalanceDeletingMiddleNode() {
targetCluster = ServerTestUtils.getLocalCluster(4, new int[][] { { 0, 4 }, { 2, 1, 5 }, {},
{ 6, 3, 7 } });
- orderedRebalanceNodePlanList = createOrderedClusterTransition(currentCluster,
- targetCluster,
- storeDefList2).getOrderedRebalanceNodePlanList();
+ orderedRebalancePartitionInfoList = createOrderedClusterTransition(currentCluster,
+ targetCluster,
+ storeDefList2).getOrderedRebalancePartitionsInfoList();
assertEquals("There should have exactly 3 rebalancing node",
3,
- orderedRebalanceNodePlanList.size());
+ getUniqueNodeCount(orderedRebalancePartitionInfoList, false));
+ assertEquals("There should have exactly 3 rebalancing partition info",
+ 3,
+ orderedRebalancePartitionInfoList.size());
assertEquals("Stealer 3 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(0).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList));
+ // make sure partitionInfo is ordered with primary ones first
+ checkOrderedPartitionInfo(orderedRebalancePartitionInfoList);
+
partitionsToMove.clear();
partitionsToMove.put(0, Lists.newArrayList(6));
partitionsToMove.put(1, Lists.newArrayList(5));
partitionsToMove.put(2, Lists.newArrayList(4));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(0),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(3,
2,
storeToPartitionsToMove,
@@ -459,13 +517,14 @@ public void testRebalanceDeletingMiddleNode() {
0)));
assertEquals("Stealer 0 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(1).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList));
partitionsToMove.clear();
partitionsToMove.put(1, Lists.newArrayList(6));
partitionsToMove.put(2, Lists.newArrayList(5));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(1),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(0,
3,
storeToPartitionsToMove,
@@ -475,12 +534,13 @@ public void testRebalanceDeletingMiddleNode() {
assertEquals("Stealer 1 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(2).getRebalanceTaskList().size());
+ getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList));
partitionsToMove.clear();
partitionsToMove.put(2, Lists.newArrayList(6));
storeToPartitionsToMove = Maps.newHashMap();
storeToPartitionsToMove.put("test", partitionsToMove);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(2),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(1,
0,
storeToPartitionsToMove,
@@ -497,22 +557,27 @@ public void testRebalancePlanWithReplicationChanges() {
targetCluster = ServerTestUtils.getLocalCluster(4, new int[][] { { 0, 2, 3 }, { 4, 6 },
{ 7, 8, 9 }, { 1, 5 } });
- List<RebalanceNodePlan> orderedRebalanceNodePlanList = createOrderedClusterTransition(currentCluster,
- targetCluster,
- storeDefList).getOrderedRebalanceNodePlanList();
+ List<RebalancePartitionsInfo> orderedRebalancePartitionInfoList = createOrderedClusterTransition(currentCluster,
+ targetCluster,
+ storeDefList).getOrderedRebalancePartitionsInfoList();
assertEquals("There should have exactly 3 rebalancing node",
3,
- orderedRebalanceNodePlanList.size());
+ this.getUniqueNodeCount(orderedRebalancePartitionInfoList, false));
+ assertEquals("There should have exactly 5 rebalancing partition info",
+ 5,
+ orderedRebalancePartitionInfoList.size());
assertEquals("Stealer 3 should have 3 entry",
3,
- orderedRebalanceNodePlanList.get(0).getRebalanceTaskList().size());
+ this.getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList));
assertEquals("Stealer 0 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(1).getRebalanceTaskList().size());
+ this.getStealerNodePartitionInfoCount(0, orderedRebalancePartitionInfoList));
assertEquals("Stealer 1 should have 1 entry",
1,
- orderedRebalanceNodePlanList.get(2).getRebalanceTaskList().size());
+ this.getStealerNodePartitionInfoCount(1, orderedRebalancePartitionInfoList));
+ // make sure partitionInfo is ordered with primary ones first
+ checkOrderedPartitionInfo(orderedRebalancePartitionInfoList);
HashMap<String, HashMap<Integer, List<Integer>>> storeToPartitionsToMove[] = new HashMap[5];
HashMap<String, HashMap<Integer, List<Integer>>> storeToPartitionsToDelete[] = new HashMap[5];
@@ -588,7 +653,8 @@ public void testRebalancePlanWithReplicationChanges() {
}
}
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(0),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(3,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(3,
0,
storeToPartitionsToMove[0],
@@ -607,14 +673,16 @@ public void testRebalancePlanWithReplicationChanges() {
storeToPartitionsToDelete[2],
currentCluster,
0)));
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(1),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(0,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(0,
1,
storeToPartitionsToMove[3],
storeToPartitionsToDelete[3],
currentCluster,
0)));
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(2),
+ checkAllRebalanceInfoPresent(getStealerNodePartitionInfoList(1,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(1,
2,
storeToPartitionsToMove[4],
@@ -634,17 +702,22 @@ public void testRebalanceAllReplicasBeingMigrated() {
targetCluster = ServerTestUtils.getLocalCluster(4, new int[][] { { 4 }, { 2, 3 }, { 1, 5 },
{ 0 } });
- List<RebalanceNodePlan> orderedRebalanceNodePlanList = createOrderedClusterTransition(currentCluster,
- targetCluster,
- storeDefList2).getOrderedRebalanceNodePlanList();
+ List<RebalancePartitionsInfo> orderedRebalancePartitionInfoList = createOrderedClusterTransition(currentCluster,
+ targetCluster,
+ storeDefList2).getOrderedRebalancePartitionsInfoList();
- assertEquals("There should have exactly 3 rebalancing node",
+ assertEquals("There should have exactly 1 rebalancing node",
1,
- orderedRebalanceNodePlanList.size());
-
+ this.getUniqueNodeCount(orderedRebalancePartitionInfoList, false));
+ assertEquals("There should have exactly 2 rebalancing partition info",
+ 2,
+ orderedRebalancePartitionInfoList.size());
assertEquals("Stealer 3 should have 2 entry",
2,
- orderedRebalanceNodePlanList.get(0).getRebalanceTaskList().size());
+ this.getStealerNodePartitionInfoCount(3, orderedRebalancePartitionInfoList));
+ // make sure partitionInfo is ordered with primary ones first
+ checkOrderedPartitionInfo(orderedRebalancePartitionInfoList);
+
HashMap<Integer, List<Integer>> partitionsToMove1 = Maps.newHashMap(), partitionsToMove2 = Maps.newHashMap();
partitionsToMove1.put(0, Lists.newArrayList(0));
partitionsToMove1.put(1, Lists.newArrayList(5));
@@ -655,7 +728,8 @@ public void testRebalanceAllReplicasBeingMigrated() {
HashMap<String, HashMap<Integer, List<Integer>>> storeToPartitionsToMove2 = Maps.newHashMap();
storeToPartitionsToMove2.put("test", partitionsToMove2);
- checkAllRebalanceInfoPresent(orderedRebalanceNodePlanList.get(0),
+ checkAllRebalanceInfoPresent(this.getStealerNodePartitionInfoList(3,
+ orderedRebalancePartitionInfoList),
Arrays.asList(new RebalancePartitionsInfo(3,
0,
storeToPartitionsToMove1,
@@ -670,11 +744,69 @@ public void testRebalanceAllReplicasBeingMigrated() {
0)));
}
- private void checkAllRebalanceInfoPresent(RebalanceNodePlan nodePlan,
+ private int getUniqueNodeCount(List<RebalancePartitionsInfo> rebalanceInfoList,
+ boolean isDonorBased) {
+ HashSet<Integer> uniqueNodeSet = Sets.newHashSet();
+ for(RebalancePartitionsInfo partitionInfo: rebalanceInfoList) {
+ int nodeId;
+ if(isDonorBased) {
+ nodeId = partitionInfo.getDonorId();
+ } else {
+ nodeId = partitionInfo.getStealerId();
+ }
+ if(!uniqueNodeSet.contains(nodeId)) {
+ uniqueNodeSet.add(nodeId);
+ }
+ }
+ return uniqueNodeSet.size();
+ }
+
+ private int getStealerNodePartitionInfoCount(int stealerId,
+ List<RebalancePartitionsInfo> rebalanceInfoList) {
+ int count = 0;
+
+ for(RebalancePartitionsInfo partitionInfo: rebalanceInfoList) {
+ if(partitionInfo.getStealerId() == stealerId) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private List<RebalancePartitionsInfo> getStealerNodePartitionInfoList(int stealerId,
+ List<RebalancePartitionsInfo> rebalanceInfoList) {
+ ArrayList<RebalancePartitionsInfo> partitionList = Lists.newArrayList();
+
+ for(RebalancePartitionsInfo partitionInfo: rebalanceInfoList) {
+ if(partitionInfo.getStealerId() == stealerId) {
+ partitionList.add(partitionInfo);
+ }
+ }
+ return partitionList;
+ }
+
+ private void checkOrderedPartitionInfo(List<RebalancePartitionsInfo> rebalancePartitionInfoList) {
+ boolean primaryMovesCompleted = false;
+ for(RebalancePartitionsInfo partitionInfo: rebalancePartitionInfoList) {
+ List<Integer> stealMasterPartitions = partitionInfo.getStealMasterPartitions();
+ if(stealMasterPartitions == null || stealMasterPartitions.isEmpty()) {
+ primaryMovesCompleted = true;
+ } else if(primaryMovesCompleted) {
+ // if partitionInfo contains primary movement and we have
+ // previously concluded that all partitionInfo with primary
+ // movements are processed, we are looking at an out-of-ordered
+ // partitionInfo
+ fail("OrderedPartitionInfo has primary partition move after non-primary partition moves: "
+ + rebalancePartitionInfoList);
+ }
+ }
+ }
+
+ private void checkAllRebalanceInfoPresent(List<RebalancePartitionsInfo> toCheckRebalanceInfoList,
List<RebalancePartitionsInfo> rebalanceInfoList) {
for(RebalancePartitionsInfo rebalanceInfo: rebalanceInfoList) {
boolean match = false;
- for(RebalancePartitionsInfo nodeRebalanceInfo: nodePlan.getRebalanceTaskList()) {
+ for(RebalancePartitionsInfo nodeRebalanceInfo: toCheckRebalanceInfoList) {
if(rebalanceInfo.getDonorId() == nodeRebalanceInfo.getDonorId()) {
assertEquals("Store lists should match",
rebalanceInfo.getUnbalancedStoreList(),
@@ -700,7 +832,7 @@ private void checkAllRebalanceInfoPresent(RebalanceNodePlan nodePlan,
assertNotSame("rebalancePartition Info " + rebalanceInfo
+ " should be present in the nodePlan "
- + nodePlan.getRebalanceTaskList(),
+ + toCheckRebalanceInfoList,
false,
match);
}

No commit comments for this range

Something went wrong with that request. Please try again.