Permalink
Browse files

Migrated RebalanceClusterPlan to MigratePartitionPlan

  • Loading branch information...
1 parent 0c9fec9 commit b5d65b2652faadd14b34859f454afd6a77e4a8cd @rsumbaly rsumbaly committed with afeinberg Jan 7, 2011
View
10 src/java/voldemort/client/rebalance/MigratePartitions.java
@@ -93,12 +93,10 @@ public MigratePartitions(Cluster currentCluster,
this.stealerNodeIds = stealerNodeIds;
this.voldemortConfig = Utils.notNull(voldemortConfig);
this.transitionToNormal = transitionToNormal;
- RebalanceClusterPlan plan = new RebalanceClusterPlan(currentCluster,
- targetCluster,
- currentStoreDefs,
- targetStoreDefs,
- false,
- null);
+ MigratePartitionsPlan plan = new MigratePartitionsPlan(currentCluster,
+ targetCluster,
+ currentStoreDefs,
+ targetStoreDefs);
logger.info("Rebalance cluster plan => " + plan);
View
343 src/java/voldemort/client/rebalance/MigratePartitionsPlan.java
@@ -0,0 +1,343 @@
+package voldemort.client.rebalance;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import voldemort.VoldemortException;
+import voldemort.cluster.Cluster;
+import voldemort.cluster.Node;
+import voldemort.store.StoreDefinition;
+import voldemort.utils.CmdUtils;
+import voldemort.utils.RebalanceUtils;
+import voldemort.utils.Utils;
+import voldemort.xml.ClusterMapper;
+import voldemort.xml.StoreDefinitionsMapper;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+/**
+ * Compares the currentCluster configuration with the desired
+ * targetConfiguration and returns a map of Target node-id to map of source
+ * node-ids and partitions desired to be stolen/fetched.<br>
+ * <b> returned Queue is threadsafe </b>
+ *
+ * @param currentCluster The current cluster definition
+ * @param targetCluster The target cluster definition
+ * @param storeDefList The list of store definitions to rebalance
+ * @param currentROStoreVersions A mapping of nodeId to map of store names to
+ * version ids
+ * @param deleteDonorPartition Delete the RW partition on the donor side after
+ * rebalance
+ *
+ */
+public class MigratePartitionsPlan {
+
+ private Queue<RebalanceNodePlan> rebalanceTaskQueue;
+ private List<StoreDefinition> oldStoreDefList;
+ private List<StoreDefinition> newStoreDefList;
+
+ public MigratePartitionsPlan(Cluster currentCluster,
+ Cluster targetCluster,
+ List<StoreDefinition> oldStoreDefList,
+ List<StoreDefinition> newStoreDefList) {
+ initialize(currentCluster, targetCluster, oldStoreDefList, newStoreDefList);
+
+ }
+
+ public MigratePartitionsPlan(Cluster currentCluster,
+ Cluster targetCluster,
+ List<StoreDefinition> storeDefList) {
+ initialize(currentCluster, targetCluster, storeDefList, storeDefList);
+ }
+
+ private void initialize(Cluster currentCluster,
+ Cluster targetCluster,
+ List<StoreDefinition> oldStoreDefList,
+ List<StoreDefinition> newStoreDefList) {
+ this.rebalanceTaskQueue = new ConcurrentLinkedQueue<RebalanceNodePlan>();
+ this.oldStoreDefList = oldStoreDefList;
+ this.newStoreDefList = newStoreDefList;
+
+ if(currentCluster.getNumberOfPartitions() != targetCluster.getNumberOfPartitions())
+ throw new VoldemortException("Total number of partitions should not change !!");
+
+ if(!RebalanceUtils.hasSameStores(oldStoreDefList, newStoreDefList))
+ throw new VoldemortException("Either the number of stores has changed or some stores are missing");
+
+ for(Node node: targetCluster.getNodes()) {
+ List<RebalancePartitionsInfo> rebalanceNodeList = getRebalanceNodeTask(currentCluster,
+ targetCluster,
+ RebalanceUtils.getStoreNames(oldStoreDefList),
+ node.getId());
+ if(rebalanceNodeList.size() > 0) {
+ rebalanceTaskQueue.offer(new RebalanceNodePlan(node.getId(), rebalanceNodeList));
+ }
+ }
+ }
+
+ public Queue<RebalanceNodePlan> getRebalancingTaskQueue() {
+ return rebalanceTaskQueue;
+ }
+
+ /**
+ * Returns a map of stealer node to their corresponding node plan
+ *
+ * @return Map of stealer node to plan
+ */
+ public HashMap<Integer, RebalanceNodePlan> getRebalancingTaskQueuePerNode() {
+ HashMap<Integer, RebalanceNodePlan> rebalanceMap = Maps.newHashMap();
+ Iterator<RebalanceNodePlan> iter = rebalanceTaskQueue.iterator();
+ while(iter.hasNext()) {
+ RebalanceNodePlan plan = iter.next();
+ rebalanceMap.put(plan.getStealerNode(), plan);
+ }
+ return rebalanceMap;
+ }
+
+ /**
+ * For a particular stealer node retrieves a list of plans corresponding to
+ * each donor node.
+ *
+ * @param currentCluster The cluster definition of the current cluster
+ * @param targetCluster The cluster definition of the target cluster
+ * @param storeList The list of stores
+ * @param stealNodeId The node id of the stealer node
+ * @return List of plans per donor node
+ */
+ private List<RebalancePartitionsInfo> getRebalanceNodeTask(Cluster currentCluster,
+ Cluster targetCluster,
+ List<String> storeList,
+ int stealNodeId) {
+ Map<Integer, Integer> currentPartitionsToNodeMap = RebalanceUtils.getCurrentPartitionMapping(currentCluster);
+ List<Integer> stealList = getStealList(currentCluster, targetCluster, stealNodeId);
+
+ Map<Integer, List<Integer>> masterPartitionsMap = getStealMasterPartitions(stealList,
+ currentPartitionsToNodeMap);
+
+ Map<Integer, List<Integer>> replicationPartitionsMap = getReplicationChanges(currentCluster,
+ targetCluster,
+ stealNodeId,
+ currentPartitionsToNodeMap);
+
+ List<RebalancePartitionsInfo> stealInfoList = new ArrayList<RebalancePartitionsInfo>();
+ for(Node donorNode: currentCluster.getNodes()) {
+ Set<Integer> stealPartitions = new HashSet<Integer>();
+ Set<Integer> stealMasterPartitions = new HashSet<Integer>();
+
+ if(masterPartitionsMap.containsKey(donorNode.getId())) {
+ stealPartitions.addAll(masterPartitionsMap.get(donorNode.getId()));
+ stealMasterPartitions.addAll(masterPartitionsMap.get(donorNode.getId()));
+ }
+
+ if(replicationPartitionsMap.containsKey(donorNode.getId())) {
+ stealPartitions.addAll(replicationPartitionsMap.get(donorNode.getId()));
+ }
+
+ if(stealPartitions.size() > 0) {
+ stealInfoList.add(new RebalancePartitionsInfo(stealNodeId,
+ donorNode.getId(),
+ new ArrayList<Integer>(stealPartitions),
+ new ArrayList<Integer>(),
+ new ArrayList<Integer>(stealMasterPartitions),
+ storeList,
+ new HashMap<String, String>(),
+ new HashMap<String, String>(),
+ 0));
+ }
+ }
+
+ return stealInfoList;
+ }
+
+ /**
+ * For a particular stealer node find all the partitions it will steal
+ *
+ * @param currentCluster The cluster definition of the existing cluster
+ * @param targetCluster The target cluster definition
+ * @param stealNodeId The id of the stealer node
+ * @return Returns a list of partitions which this stealer node will get
+ */
+ private List<Integer> getStealList(Cluster currentCluster,
+ Cluster targetCluster,
+ int stealNodeId) {
+ List<Integer> targetList = new ArrayList<Integer>(targetCluster.getNodeById(stealNodeId)
+ .getPartitionIds());
+
+ List<Integer> currentList = new ArrayList<Integer>();
+ if(RebalanceUtils.containsNode(currentCluster, stealNodeId))
+ currentList = currentCluster.getNodeById(stealNodeId).getPartitionIds();
+
+ // remove all current partitions from targetList
+ targetList.removeAll(currentList);
+
+ return targetList;
+ }
+
+ /**
+ * For a particular stealer node id returns a mapping of donor node ids to
+ * their respective partition ids which we need to steal due to the change
+ * of replication
+ *
+ * @param currentCluster Current cluster definition
+ * @param targetCluster Cluster definition of the target
+ * @param stealNodeId The node id of the stealer node
+ * @param currentPartitionsToNodeMap The mapping of current partitions to
+ * their nodes
+ * @return Map of donor node ids to their partitions they'll donate
+ */
+ private Map<Integer, List<Integer>> getReplicationChanges(Cluster currentCluster,
+ Cluster targetCluster,
+ int stealNodeId,
+ Map<Integer, Integer> currentPartitionsToNodeMap) {
+ Map<Integer, List<Integer>> replicationMapping = new HashMap<Integer, List<Integer>>();
+ List<Integer> targetList = targetCluster.getNodeById(stealNodeId).getPartitionIds();
+
+ // get changing replication mapping
+ Multimap<Integer, Integer> replicationChanges = HashMultimap.create();
+
+ // for every unique store
+ for(StoreDefinition storeDef: RebalanceUtils.getUniqueStoreDefinitions(this.oldStoreDefList)) {
+ RebalanceClusterTool clusterTool = new RebalanceClusterTool(currentCluster, storeDef);
+
+ Multimap<Integer, Integer> storeLevelReplicationChanges = clusterTool.getRemappedReplicas(targetCluster,
+ RebalanceUtils.getStore(this.newStoreDefList,
+ storeDef.getName()));
+ replicationChanges.putAll(storeLevelReplicationChanges);
+ }
+
+ for(final Entry<Integer, Integer> entry: replicationChanges.entries()) {
+ int newReplicationPartition = entry.getValue();
+ if(targetList.contains(newReplicationPartition)) {
+ // stealerNode need to replicate some new partition now.
+ int donorNode = currentPartitionsToNodeMap.get(entry.getKey());
+ if(donorNode != stealNodeId)
+ createAndAdd(replicationMapping, donorNode, entry.getKey());
+ }
+ }
+
+ return replicationMapping;
+ }
+
+ /**
+ * Converts a list of partitions ids which a stealer is going to receive to
+ * a map of donor node ids to the corresponding partitions
+ *
+ * @param stealList The partitions ids going to be stolen
+ * @param currentPartitionsToNodeMap Mapping of current partitions to their
+ * respective nodes ids
+ * @return Returns a mapping of donor node ids to the partitions being
+ * stolen
+ */
+ private Map<Integer, List<Integer>> getStealMasterPartitions(List<Integer> stealList,
+ Map<Integer, Integer> currentPartitionsToNodeMap) {
+ HashMap<Integer, List<Integer>> stealPartitionsMap = new HashMap<Integer, List<Integer>>();
+ for(int p: stealList) {
+ int donorNode = currentPartitionsToNodeMap.get(p);
+ createAndAdd(stealPartitionsMap, donorNode, p);
+ }
+
+ return stealPartitionsMap;
+ }
+
+ private void createAndAdd(Map<Integer, List<Integer>> map, int key, int value) {
+ // create array if needed
+ if(!map.containsKey(key)) {
+ map.put(key, new ArrayList<Integer>());
+ }
+
+ // add partition to list.
+ map.get(key).add(value);
+ }
+
+ @Override
+ public String toString() {
+
+ if(rebalanceTaskQueue.isEmpty()) {
+ return "Cluster is already balanced, No rebalancing needed";
+ }
+
+ StringBuilder builder = new StringBuilder();
+ builder.append("Cluster Rebalancing Plan:\n");
+ for(RebalanceNodePlan nodePlan: rebalanceTaskQueue) {
+ builder.append("StealerNode:" + nodePlan.getStealerNode() + "\n");
+ for(RebalancePartitionsInfo stealInfo: nodePlan.getRebalanceTaskList()) {
+ builder.append("\t" + stealInfo + "\n");
+ }
+ }
+
+ return builder.toString();
+ }
+
+ public static void main(String args[]) throws IOException {
+ OptionParser parser = new OptionParser();
+ parser.accepts("help", "print help information");
+ parser.accepts("cluster-xml", "[REQUIRED] old cluster xml file location")
+ .withRequiredArg()
+ .describedAs("path");
+ parser.accepts("stores-xml", "[REQUIRED] stores xml file location")
+ .withRequiredArg()
+ .describedAs("path");
+ parser.accepts("target-stores-xml", "new stores xml file location")
+ .withRequiredArg()
+ .describedAs("path");
+ parser.accepts("target-cluster-xml", "[REQUIRED] new cluster xml file location")
+ .withRequiredArg()
+ .describedAs("path");
+
+ OptionSet options = parser.parse(args);
+
+ if(options.has("help")) {
+ parser.printHelpOn(System.out);
+ System.exit(0);
+ }
+
+ Set<String> missing = CmdUtils.missing(options,
+ "cluster-xml",
+ "stores-xml",
+ "target-cluster-xml");
+ if(missing.size() > 0) {
+ System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
+ parser.printHelpOn(System.err);
+ System.exit(1);
+ }
+
+ String newClusterXml = (String) options.valueOf("target-cluster-xml");
+ String oldClusterXml = (String) options.valueOf("cluster-xml");
+ String oldStoresXml = (String) options.valueOf("stores-xml");
+ String newStoresXml = oldStoresXml;
+ if(options.has("target-stores-xml")) {
+ newStoresXml = (String) options.valueOf("target-stores-xml");
+ }
+
+ if(!Utils.isReadableFile(newClusterXml) || !Utils.isReadableFile(oldClusterXml)
+ || !Utils.isReadableFile(oldStoresXml) || !Utils.isReadableFile(newStoresXml)) {
+ System.err.println("Could not read metadata files from path provided");
+ parser.printHelpOn(System.err);
+ System.exit(1);
+ }
+
+ ClusterMapper clusterMapper = new ClusterMapper();
+ StoreDefinitionsMapper storeDefMapper = new StoreDefinitionsMapper();
+
+ MigratePartitionsPlan plan = new MigratePartitionsPlan(clusterMapper.readCluster(new File(oldClusterXml)),
+ clusterMapper.readCluster(new File(newClusterXml)),
+ storeDefMapper.readStoreList(new File(oldStoresXml)),
+ storeDefMapper.readStoreList(new File(newStoresXml)));
+ System.out.println(plan);
+ }
+}
View
120 src/java/voldemort/client/rebalance/RebalanceClusterPlan.java
@@ -5,7 +5,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -20,13 +19,13 @@
import voldemort.cluster.Node;
import voldemort.store.StoreDefinition;
import voldemort.utils.CmdUtils;
+import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;
import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
/**
@@ -46,58 +45,24 @@
*/
public class RebalanceClusterPlan {
- private Queue<RebalanceNodePlan> rebalanceTaskQueue;
- private List<StoreDefinition> oldStoreDefList;
- private List<StoreDefinition> newStoreDefList;
-
- public RebalanceClusterPlan(Cluster currentCluster,
- Cluster targetCluster,
- List<StoreDefinition> oldStoreDefList,
- List<StoreDefinition> newStoreDefList,
- boolean deleteDonorPartition,
- Map<Integer, Map<String, String>> currentROStoreVersionsDirs) {
- initialize(currentCluster,
- targetCluster,
- oldStoreDefList,
- newStoreDefList,
- deleteDonorPartition,
- currentROStoreVersionsDirs);
-
- }
+ private final Queue<RebalanceNodePlan> rebalanceTaskQueue;
+ private final List<StoreDefinition> storeDefList;
public RebalanceClusterPlan(Cluster currentCluster,
Cluster targetCluster,
List<StoreDefinition> storeDefList,
boolean deleteDonorPartition,
Map<Integer, Map<String, String>> currentROStoreVersionsDirs) {
- initialize(currentCluster,
- targetCluster,
- storeDefList,
- storeDefList,
- deleteDonorPartition,
- currentROStoreVersionsDirs);
- }
-
- private void initialize(Cluster currentCluster,
- Cluster targetCluster,
- List<StoreDefinition> oldStoreDefList,
- List<StoreDefinition> newStoreDefList,
- boolean deleteDonorPartition,
- Map<Integer, Map<String, String>> currentROStoreVersionsDirs) {
this.rebalanceTaskQueue = new ConcurrentLinkedQueue<RebalanceNodePlan>();
- this.oldStoreDefList = oldStoreDefList;
- this.newStoreDefList = newStoreDefList;
+ this.storeDefList = storeDefList;
if(currentCluster.getNumberOfPartitions() != targetCluster.getNumberOfPartitions())
throw new VoldemortException("Total number of partitions should not change !!");
- if(!RebalanceUtils.hasSameStores(oldStoreDefList, newStoreDefList))
- throw new VoldemortException("Either the number of stores has changed or some stores are missing");
-
for(Node node: targetCluster.getNodes()) {
List<RebalancePartitionsInfo> rebalanceNodeList = getRebalanceNodeTask(currentCluster,
targetCluster,
- RebalanceUtils.getStoreNames(oldStoreDefList),
+ RebalanceUtils.getStoreNames(storeDefList),
node.getId(),
deleteDonorPartition);
if(rebalanceNodeList.size() > 0) {
@@ -117,21 +82,6 @@ private void initialize(Cluster currentCluster,
}
/**
- * Returns a map of stealer node to their corresponding node plan
- *
- * @return Map of stealer node to plan
- */
- public HashMap<Integer, RebalanceNodePlan> getRebalancingTaskQueuePerNode() {
- HashMap<Integer, RebalanceNodePlan> rebalanceMap = Maps.newHashMap();
- Iterator<RebalanceNodePlan> iter = rebalanceTaskQueue.iterator();
- while(iter.hasNext()) {
- RebalanceNodePlan plan = iter.next();
- rebalanceMap.put(plan.getStealerNode(), plan);
- }
- return rebalanceMap;
- }
-
- /**
* For a particular stealer node retrieves a list of plans corresponding to
* each donor node.
*
@@ -142,11 +92,11 @@ private void initialize(Cluster currentCluster,
* @param deleteDonorPartition Delete the donor partitions after rebalance
* @return List of plans per donor node
*/
- private List<RebalancePartitionsInfo> getRebalanceNodeTask(Cluster currentCluster,
- Cluster targetCluster,
- List<String> storeList,
- int stealNodeId,
- boolean deleteDonorPartition) {
+ public List<RebalancePartitionsInfo> getRebalanceNodeTask(Cluster currentCluster,
+ Cluster targetCluster,
+ List<String> storeList,
+ int stealNodeId,
+ boolean deleteDonorPartition) {
Map<Integer, Integer> currentPartitionsToNodeMap = RebalanceUtils.getCurrentPartitionMapping(currentCluster);
List<Integer> stealList = getStealList(currentCluster, targetCluster, stealNodeId);
@@ -240,22 +190,11 @@ private void initialize(Cluster currentCluster,
// get changing replication mapping
RebalanceClusterTool clusterTool = new RebalanceClusterTool(currentCluster,
- RebalanceUtils.getMaxReplicationStore(this.oldStoreDefList));
-
- /**
- * Case 1: if newStoreDef = oldStoreDef, gives you replication mapping
- * changes only due to cluster geometry change
- *
- * Case 2: if newStoreDef != oldStoreDef, also takes into account change
- * in (a) routing strategy [ Assumption is that all stores change their
- * routing strategy at once ] (b) increase in replication factor
- *
- */
- Multimap<Integer, Integer> replicationChanges = clusterTool.getRemappedReplicas(targetCluster,
- RebalanceUtils.getMaxReplicationStore(this.newStoreDefList));
-
- for(final Entry<Integer, Integer> entry: replicationChanges.entries()) {
- int newReplicationPartition = entry.getValue();
+ RebalanceUtils.getMaxReplicationStore(this.storeDefList));
+ Multimap<Integer, Pair<Integer, Integer>> replicationChanges = clusterTool.getRemappedReplicas(targetCluster);
+
+ for(final Entry<Integer, Pair<Integer, Integer>> entry: replicationChanges.entries()) {
+ int newReplicationPartition = entry.getValue().getSecond();
if(targetList.contains(newReplicationPartition)) {
// stealerNode need to replicate some new partition now.
int donorNode = currentPartitionsToNodeMap.get(entry.getKey());
@@ -320,16 +259,13 @@ public String toString() {
public static void main(String args[]) throws IOException {
OptionParser parser = new OptionParser();
parser.accepts("help", "print help information");
- parser.accepts("cluster-xml", "[REQUIRED] old cluster xml file location")
+ parser.accepts("cluster-xml", "[REQUIRED] cluster xml file location")
.withRequiredArg()
.describedAs("path");
parser.accepts("stores-xml", "[REQUIRED] stores xml file location")
.withRequiredArg()
.describedAs("path");
- parser.accepts("target-stores-xml", "new stores xml file location")
- .withRequiredArg()
- .describedAs("path");
- parser.accepts("target-cluster-xml", "[REQUIRED] new cluster xml file location")
+ parser.accepts("old-cluster-xml", "[REQUIRED] old cluster xml file location")
.withRequiredArg()
.describedAs("path");
@@ -343,25 +279,20 @@ public static void main(String args[]) throws IOException {
Set<String> missing = CmdUtils.missing(options,
"cluster-xml",
"stores-xml",
- "target-cluster-xml");
+ "old-cluster-xml");
if(missing.size() > 0) {
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
parser.printHelpOn(System.err);
System.exit(1);
}
- String newClusterXml = (String) options.valueOf("target-cluster-xml");
- String oldClusterXml = (String) options.valueOf("cluster-xml");
- String oldStoresXml = (String) options.valueOf("stores-xml");
- String newStoresXml = oldStoresXml;
- if(options.has("target-stores-xml")) {
- newStoresXml = (String) options.valueOf("target-stores-xml");
- }
+ String newClusterXml = (String) options.valueOf("cluster-xml");
+ String oldClusterXml = (String) options.valueOf("old-cluster-xml");
+ String storesXml = (String) options.valueOf("stores-xml");
if(!Utils.isReadableFile(newClusterXml) || !Utils.isReadableFile(oldClusterXml)
- || !Utils.isReadableFile(oldStoresXml) || !Utils.isReadableFile(newStoresXml)) {
- System.err.println("Could not read metadata files from path provided");
- parser.printHelpOn(System.err);
+ || !Utils.isReadableFile(storesXml)) {
+ System.err.println("Could not read files");
System.exit(1);
}
@@ -370,10 +301,9 @@ public static void main(String args[]) throws IOException {
RebalanceClusterPlan plan = new RebalanceClusterPlan(clusterMapper.readCluster(new File(oldClusterXml)),
clusterMapper.readCluster(new File(newClusterXml)),
- storeDefMapper.readStoreList(new File(oldStoresXml)),
- storeDefMapper.readStoreList(new File(newStoresXml)),
+ storeDefMapper.readStoreList(new File(storesXml)),
false,
null);
System.out.println(plan);
}
-}
+}
View
3 src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java
@@ -155,7 +155,8 @@ public void setDonorNodeROStoreToDir(Map<String, String> donorNodeROStoreToDir)
public String toString() {
return "RebalancePartitionsInfo(" + getStealerId() + " <--- " + getDonorId()
+ " partitions:" + getPartitionList() + " steal master partitions:"
- + getStealMasterPartitions() + " stores:" + getUnbalancedStoreList() + ")";
+ + getStealMasterPartitions() + " delete: " + getDeletePartitionsList() + " stores:"
+ + getUnbalancedStoreList() + ")";
}
public String toJsonString() {
View
31 src/java/voldemort/server/VoldemortConfig.java
@@ -71,6 +71,7 @@
private int bdbCleanerMinUtilization;
private boolean bdbCursorPreload;
private int bdbCleanerThreads;
+ private long bdbLockTimeoutMs;
private String mysqlUsername;
private String mysqlPassword;
@@ -131,7 +132,7 @@
private String slopStoreType;
private String pusherType;
- private final long slopFrequencyMs;
+ private long slopFrequencyMs;
private final long repairFrequencyMs;
private long slopMaxWriteBytesPerSec;
private long slopMaxReadBytesPerSec;
@@ -195,6 +196,7 @@ public VoldemortConfig(Props props) {
this.bdbCleanerMinFileUtilization = props.getInt("bdb.cleaner.min.file.utilization", 5);
this.bdbCleanerMinUtilization = props.getInt("bdb.cleaner.minUtilization", 50);
this.bdbCleanerThreads = props.getInt("bdb.cleaner.threads", 1);
+ this.bdbLockTimeoutMs = props.getLong("bdb.lock.timeout.ms", 500);
// enabling preload make cursor slow for insufficient bdb cache size.
this.bdbCursorPreload = props.getBoolean("bdb.cursor.preload", false);
@@ -555,6 +557,29 @@ public final void setBdbCleanerThreads(int bdbCleanerThreads) {
/**
*
+ * The lock timeout for all transactional and non-transactional operations.
+ * Value of zero disables lock timeouts i.e. a deadlock scenario will block
+ * forever
+ *
+ * <ul>
+ * <li>property: "bdb.lock.timeout.ms"</li>
+ * <li>default: 500</li>
+ * <li>minimum: 0</li>
+ * <li>maximum: 75 * 60 * 1000</li>
+ * </ul>
+ */
+ public long getBdbLockTimeoutMs() {
+ return bdbLockTimeoutMs;
+ }
+
+ public final void setBdbLockTimeoutMs(long bdbLockTimeoutMs) {
+ if(bdbLockTimeoutMs < 0)
+ throw new IllegalArgumentException("bdbLockTimeoutMs should be greater than 0");
+ this.bdbLockTimeoutMs = bdbLockTimeoutMs;
+ }
+
+ /**
+ *
* The cleaner will keep the total disk space utilization percentage above
* this value.
*
@@ -817,6 +842,10 @@ public long getSlopFrequencyMs() {
return this.slopFrequencyMs;
}
+ public void setSlopFrequencyMs(long slopFrequencyMs) {
+ this.slopFrequencyMs = slopFrequencyMs;
+ }
+
public long getRepairFrequencyMs() {
return this.repairFrequencyMs;
}
View
2 src/java/voldemort/store/bdb/BdbStorageConfiguration.java
@@ -18,6 +18,7 @@
import java.io.File;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
@@ -89,6 +90,7 @@ public BdbStorageConfiguration(VoldemortConfig config) {
Integer.toString(config.getBdbCleanerMinUtilization()));
environmentConfig.setConfigParam(EnvironmentConfig.CLEANER_THREADS,
Integer.toString(config.getBdbCleanerThreads()));
+ environmentConfig.setLockTimeout(config.getBdbLockTimeoutMs(), TimeUnit.MILLISECONDS);
databaseConfig = new DatabaseConfig();
databaseConfig.setAllowCreate(true);
databaseConfig.setSortedDuplicates(config.isBdbSortedDuplicatesEnabled());
View
12 src/java/voldemort/store/grandfather/GrandfatheringStore.java
@@ -109,8 +109,10 @@ public void run() {
Versioned.value(slop, version),
null);
} catch(Exception e) {
- logger.warn("Failed to put DELETE operation on " + getName()
- + " to node " + futureNodeId + " to slop store", e);
+ if(logger.isDebugEnabled())
+ logger.debug("Failed to put DELETE operation on "
+ + getName() + " to node " + futureNodeId
+ + " to slop store", e);
}
}
} catch(Exception e) {
@@ -166,8 +168,10 @@ public void run() {
Versioned.value(slop, value.getVersion()),
null);
} catch(Exception e) {
- logger.warn("Failed to put PUT operation on " + getName()
- + " to node " + futureNodeId + " to slop store", e);
+ if(logger.isDebugEnabled())
+ logger.debug("Failed to put PUT operation on " + getName()
+ + " to node " + futureNodeId
+ + " to slop store", e);
}
}
} catch(Exception e) {
View
69 src/java/voldemort/utils/RebalanceUtils.java
@@ -34,12 +34,15 @@
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
+import voldemort.routing.RoutingStrategyType;
import voldemort.server.VoldemortConfig;
import voldemort.store.StoreDefinition;
import voldemort.versioning.Occured;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
+import com.google.common.collect.Lists;
+
/**
* RebalanceUtils provide basic functionality for rebalancing. Some of these
* functions are not utils function but are forced move here to allow more
@@ -64,6 +67,54 @@ public static boolean containsNode(Cluster cluster, int nodeId) {
}
/**
+ * Returns a list of unique store definitions given a list of store
+ * definitions, where unique is defined as having a different
+ * "replication-factor" and "routing strategy"
+ *
+ * @param storeDefs List of store definitions
+ * @return Returns list of unique store definitions
+ */
+ public static List<StoreDefinition> getUniqueStoreDefinitions(List<StoreDefinition> storeDefs) {
+
+ List<StoreDefinition> uniqueStoreDefs = Lists.newArrayList();
+ for(StoreDefinition storeDef: storeDefs) {
+ if(uniqueStoreDefs.isEmpty()) {
+ uniqueStoreDefs.add(storeDef);
+ } else {
+ boolean unique = true;
+ for(StoreDefinition uniqueStoreDef: uniqueStoreDefs) {
+ if(uniqueStoreDef.getReplicationFactor() == storeDef.getReplicationFactor()
+ && uniqueStoreDef.getRoutingStrategyType()
+ .compareTo(storeDef.getRoutingStrategyType()) == 0) {
+ unique = false;
+ // Further check for the zone routing case
+ if(uniqueStoreDef.getRoutingStrategyType()
+ .compareTo(RoutingStrategyType.ZONE_STRATEGY) == 0) {
+ boolean zonesSame = true;
+ for(int zoneId: uniqueStoreDef.getZoneReplicationFactor().keySet()) {
+ if(storeDef.getZoneReplicationFactor().get(zoneId) == null
+ || storeDef.getZoneReplicationFactor().get(zoneId) != uniqueStoreDef.getZoneReplicationFactor()
+ .get(zoneId)) {
+ zonesSame = false;
+ break;
+ }
+ }
+ if(!zonesSame) {
+ unique = true;
+ }
+ }
+ }
+ }
+ if(unique) {
+ uniqueStoreDefs.add(storeDef);
+ }
+ }
+ }
+
+ return uniqueStoreDefs;
+ }
+
+ /**
* Checks if two lists of store definitions has the same number of stores
* and same stores. This is not same as
* {@link voldemort.store.StoreDefinition#equals(Object)} because we expect
@@ -370,6 +421,24 @@ public static StoreDefinition getMaxReplicationStore(List<StoreDefinition> store
return storeList;
}
+ /**
+ * Returns the store definition from the list with the store name specified,
+ * else returns null
+ *
+ * @param storeDefs The list of store definitions
+ * @param storeName The name of the store which is required
+ * @return The store definition else null
+ */
+ public static StoreDefinition getStore(List<StoreDefinition> storeDefs, String storeName) {
+
+ for(StoreDefinition storeDef: storeDefs) {
+ if(storeDef.getName().compareTo(storeName) == 0) {
+ return storeDef;
+ }
+ }
+ return null;
+ }
+
public static void executorShutDown(ExecutorService executorService, int timeOutSec) {
try {
executorService.shutdown();
View
11 test/common/voldemort/ServerTestUtils.java
@@ -310,10 +310,13 @@ public static Cluster getLocalCluster(int numberOfNodes, int[] ports, int[][] pa
* <b>numberOfZones</b>
*
* @param numberOfNodes Number of nodes in the cluster
+ * @param partitionsPerNode Number of partitions in one node
* @param numberOfZones Number of zones
* @return Cluster
*/
- public static Cluster getLocalCluster(int numberOfNodes, int numberOfZones) {
+ public static Cluster getLocalCluster(int numberOfNodes,
+ int partitionsPerNode,
+ int numberOfZones) {
if(numberOfZones > 0 && numberOfNodes > 0 && numberOfNodes % numberOfZones != 0) {
throw new VoldemortException("The number of nodes (" + numberOfNodes
@@ -323,10 +326,9 @@ public static Cluster getLocalCluster(int numberOfNodes, int numberOfZones) {
int[] ports = findFreePorts(3 * numberOfNodes);
- // Generate partitions, 10 partitions per node
List<Integer> partitions = Lists.newArrayList();
- for(int i = 0; i < 10 * numberOfNodes; i++)
+ for(int i = 0; i < partitionsPerNode * numberOfNodes; i++)
partitions.add(i);
Collections.shuffle(partitions);
@@ -341,7 +343,8 @@ public static Cluster getLocalCluster(int numberOfNodes, int numberOfZones) {
ports[3 * i + 1],
ports[3 * i + 2],
i / numberOfNodesPerZone,
- partitions.subList(10 * i, 10 * i + 10)));
+ partitions.subList(partitionsPerNode * i, partitionsPerNode * i
+ + partitionsPerNode)));
}
// Generate zones
View
74 ...t/rebalance/RebalanceClusterPlanTest.java → .../rebalance/MigratePartitionsPlanTest.java
@@ -19,7 +19,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-public class RebalanceClusterPlanTest {
+public class MigratePartitionsPlanTest {
private final int NUM_NODES = 4;
private final int NUM_ZONES = 2;
@@ -113,11 +113,9 @@ public void testConsistentToConsistent() {
1,
1,
RoutingStrategyType.CONSISTENT_STRATEGY);
- RebalanceClusterPlan plan = new RebalanceClusterPlan(consistentRoutingCluster,
- consistentRoutingClusterModified,
- Lists.newArrayList(storeDefRepFactor1),
- false,
- null);
+ MigratePartitionsPlan plan = new MigratePartitionsPlan(consistentRoutingCluster,
+ consistentRoutingClusterModified,
+ Lists.newArrayList(storeDefRepFactor1));
HashMap<Integer, RebalanceNodePlan> nodePlan = plan.getRebalancingTaskQueuePerNode();
Assert.assertEquals(nodePlan.size(), 1);
@@ -133,11 +131,9 @@ public void testConsistentToConsistent() {
1,
1,
RoutingStrategyType.CONSISTENT_STRATEGY);
- plan = new RebalanceClusterPlan(consistentRoutingCluster,
- consistentRoutingClusterModified,
- Lists.newArrayList(storeDefRepFactor2),
- false,
- null);
+ plan = new MigratePartitionsPlan(consistentRoutingCluster,
+ consistentRoutingClusterModified,
+ Lists.newArrayList(storeDefRepFactor2));
nodePlan = plan.getRebalancingTaskQueuePerNode();
Assert.assertEquals(nodePlan.size(), 2);
@@ -174,12 +170,10 @@ public void testConsistentToConsistentWithRepFactorChange() {
1,
RoutingStrategyType.CONSISTENT_STRATEGY);
- RebalanceClusterPlan plan = new RebalanceClusterPlan(consistentRoutingCluster,
- consistentRoutingClusterModified,
- Lists.newArrayList(beforeStoreDef),
- Lists.newArrayList(afterStoreDef),
- false,
- null);
+ MigratePartitionsPlan plan = new MigratePartitionsPlan(consistentRoutingCluster,
+ consistentRoutingClusterModified,
+ Lists.newArrayList(beforeStoreDef),
+ Lists.newArrayList(afterStoreDef));
HashMap<Integer, RebalanceNodePlan> nodePlan = plan.getRebalancingTaskQueuePerNode();
Assert.assertEquals(nodePlan.size(), 4);
@@ -224,12 +218,10 @@ public void testConsistentToConsistentWithRepFactorChange() {
1,
RoutingStrategyType.CONSISTENT_STRATEGY);
- plan = new RebalanceClusterPlan(consistentRoutingCluster,
- consistentRoutingClusterModified,
- Lists.newArrayList(beforeStoreDef),
- Lists.newArrayList(afterStoreDef),
- false,
- null);
+ plan = new MigratePartitionsPlan(consistentRoutingCluster,
+ consistentRoutingClusterModified,
+ Lists.newArrayList(beforeStoreDef),
+ Lists.newArrayList(afterStoreDef));
nodePlan = plan.getRebalancingTaskQueuePerNode();
Assert.assertEquals(nodePlan.size(), 4);
@@ -281,11 +273,9 @@ public void testZoneToZone() {
zoneReplicationFactors,
HintedHandoffStrategyType.PROXIMITY_STRATEGY,
RoutingStrategyType.ZONE_STRATEGY);
- RebalanceClusterPlan plan = new RebalanceClusterPlan(zoneRoutingCluster,
- zoneRoutingClusterModified,
- Lists.newArrayList(storeDefRepFactor1),
- false,
- null);
+ MigratePartitionsPlan plan = new MigratePartitionsPlan(zoneRoutingCluster,
+ zoneRoutingClusterModified,
+ Lists.newArrayList(storeDefRepFactor1));
HashMap<Integer, RebalanceNodePlan> nodePlan = plan.getRebalancingTaskQueuePerNode();
Assert.assertEquals(nodePlan.size(), 2);
@@ -318,11 +308,9 @@ public void testZoneToZone() {
zoneReplicationFactors,
HintedHandoffStrategyType.PROXIMITY_STRATEGY,
RoutingStrategyType.ZONE_STRATEGY);
- plan = new RebalanceClusterPlan(zoneRoutingCluster,
- zoneRoutingClusterModified,
- Lists.newArrayList(storeDefRepFactor2),
- false,
- null);
+ plan = new MigratePartitionsPlan(zoneRoutingCluster,
+ zoneRoutingClusterModified,
+ Lists.newArrayList(storeDefRepFactor2));
nodePlan = plan.getRebalancingTaskQueuePerNode();
Assert.assertEquals(nodePlan.size(), 2);
@@ -370,12 +358,10 @@ public void testConsistentToZoneWithRepFactorChange() {
HintedHandoffStrategyType.PROXIMITY_STRATEGY,
RoutingStrategyType.ZONE_STRATEGY);
- RebalanceClusterPlan plan = new RebalanceClusterPlan(consistentRoutingCluster,
- zoneRoutingClusterModified,
- Lists.newArrayList(beforeStoreDef),
- Lists.newArrayList(afterStoreDef),
- false,
- null);
+ MigratePartitionsPlan plan = new MigratePartitionsPlan(consistentRoutingCluster,
+ zoneRoutingClusterModified,
+ Lists.newArrayList(beforeStoreDef),
+ Lists.newArrayList(afterStoreDef));
HashMap<Integer, RebalanceNodePlan> nodePlan = plan.getRebalancingTaskQueuePerNode();
Assert.assertEquals(nodePlan.size(), 4);
@@ -432,12 +418,10 @@ public void testConsistentToZoneWithRepFactorChange() {
HintedHandoffStrategyType.PROXIMITY_STRATEGY,
RoutingStrategyType.ZONE_STRATEGY);
- plan = new RebalanceClusterPlan(consistentRoutingCluster,
- zoneRoutingClusterModified,
- Lists.newArrayList(beforeStoreDef),
- Lists.newArrayList(afterStoreDef),
- false,
- null);
+ plan = new MigratePartitionsPlan(consistentRoutingCluster,
+ zoneRoutingClusterModified,
+ Lists.newArrayList(beforeStoreDef),
+ Lists.newArrayList(afterStoreDef));
nodePlan = plan.getRebalancingTaskQueuePerNode();
Assert.assertEquals(nodePlan.size(), 4);
View
166 test/unit/voldemort/utils/RebalanceUtilsTest.java
@@ -31,9 +31,14 @@
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
+import voldemort.routing.RoutingStrategyType;
import voldemort.store.StoreDefinition;
+import voldemort.store.slop.strategy.HintedHandoffStrategyType;
import voldemort.xml.StoreDefinitionsMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
public class RebalanceUtilsTest extends TestCase {
private static String storeDefFile = "test/common/voldemort/config/stores.xml";
@@ -57,6 +62,157 @@ public void setUp() {
}
}
+ public void testUniqueStoreDefinitions() {
+ List<StoreDefinition> storeDefs = Lists.newArrayList();
+
+ // Put zero store
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).size(), 0);
+
+ // Put one store
+ StoreDefinition consistentStore1 = ServerTestUtils.getStoreDef("a",
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ RoutingStrategyType.CONSISTENT_STRATEGY);
+ storeDefs.add(consistentStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).size(), 1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(0), consistentStore1);
+
+ // Put another store with same rep-factor + strategy but different name
+ StoreDefinition consistentStore2 = ServerTestUtils.getStoreDef("b",
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ RoutingStrategyType.CONSISTENT_STRATEGY);
+ storeDefs.add(consistentStore2);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).size(), 1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(0), consistentStore1);
+
+ // Put another store with different rep-factor
+ StoreDefinition consistentStore3 = ServerTestUtils.getStoreDef("c",
+ 2,
+ 1,
+ 1,
+ 1,
+ 1,
+ RoutingStrategyType.CONSISTENT_STRATEGY);
+ storeDefs.add(consistentStore3);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).size(), 2);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(0), consistentStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(1), consistentStore3);
+
+ // Put another store with same rep-factor but zone routing
+ HashMap<Integer, Integer> zoneRepFactor = Maps.newHashMap();
+ zoneRepFactor.put(0, 1);
+ zoneRepFactor.put(1, 1);
+ StoreDefinition zoneStore1 = ServerTestUtils.getStoreDef("d",
+ 1,
+ 1,
+ 1,
+ 1,
+ 0,
+ 0,
+ zoneRepFactor,
+ HintedHandoffStrategyType.PROXIMITY_STRATEGY,
+ RoutingStrategyType.ZONE_STRATEGY);
+ storeDefs.add(zoneStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).size(), 3);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(0), consistentStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(1), consistentStore3);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(2), zoneStore1);
+
+ // Put another store with different rep-factor but zone routing
+ zoneRepFactor = Maps.newHashMap();
+ zoneRepFactor.put(0, 2);
+ zoneRepFactor.put(1, 1);
+ StoreDefinition zoneStore2 = ServerTestUtils.getStoreDef("e",
+ 1,
+ 1,
+ 1,
+ 1,
+ 0,
+ 0,
+ zoneRepFactor,
+ HintedHandoffStrategyType.PROXIMITY_STRATEGY,
+ RoutingStrategyType.ZONE_STRATEGY);
+ storeDefs.add(zoneStore2);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).size(), 4);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(0), consistentStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(1), consistentStore3);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(2), zoneStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(3), zoneStore2);
+
+ // Put another zone store with same rep factor
+ zoneRepFactor = Maps.newHashMap();
+ zoneRepFactor.put(0, 1);
+ zoneRepFactor.put(1, 1);
+ StoreDefinition zoneStore3 = ServerTestUtils.getStoreDef("f",
+ 1,
+ 1,
+ 2,
+ 1,
+ 0,
+ 0,
+ zoneRepFactor,
+ HintedHandoffStrategyType.PROXIMITY_STRATEGY,
+ RoutingStrategyType.ZONE_STRATEGY);
+ storeDefs.add(zoneStore3);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).size(), 4);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(0), consistentStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(1), consistentStore3);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(2), zoneStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(3), zoneStore2);
+
+ // Put another zone store with same rep factor
+ zoneRepFactor = Maps.newHashMap();
+ zoneRepFactor.put(0, 2);
+ zoneRepFactor.put(1, 1);
+ StoreDefinition zoneStore4 = ServerTestUtils.getStoreDef("g",
+ 1,
+ 1,
+ 1,
+ 1,
+ 0,
+ 0,
+ zoneRepFactor,
+ HintedHandoffStrategyType.PROXIMITY_STRATEGY,
+ RoutingStrategyType.ZONE_STRATEGY);
+ storeDefs.add(zoneStore4);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).size(), 4);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(0), consistentStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(1), consistentStore3);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(2), zoneStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(3), zoneStore2);
+
+ // Put another store with same total rep-factor but different indiv
+ // rep-factor, zone routing
+ zoneRepFactor = Maps.newHashMap();
+ zoneRepFactor.put(0, 1);
+ zoneRepFactor.put(1, 2);
+ StoreDefinition zoneStore5 = ServerTestUtils.getStoreDef("h",
+ 1,
+ 1,
+ 1,
+ 1,
+ 0,
+ 0,
+ zoneRepFactor,
+ HintedHandoffStrategyType.PROXIMITY_STRATEGY,
+ RoutingStrategyType.ZONE_STRATEGY);
+ storeDefs.add(zoneStore5);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).size(), 5);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(0), consistentStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(1), consistentStore3);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(2), zoneStore1);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(3), zoneStore2);
+ assertEquals(RebalanceUtils.getUniqueStoreDefinitions(storeDefs).get(4), zoneStore5);
+
+ }
+
public void testRebalancePlan() {
RebalanceClusterPlan rebalancePlan = new RebalanceClusterPlan(currentCluster,
targetCluster,
@@ -74,15 +230,7 @@ public void testRebalancePlan() {
rebalanceNodeInfo.getRebalanceTaskList().size());
RebalancePartitionsInfo expected = new RebalancePartitionsInfo(rebalanceNodeInfo.getStealerNode(),
0,
- Arrays.asList(0,
- 1,
- 2,
- 3,
- 4,
- 5,
- 6,
- 7,
- 8),
+ Arrays.asList(2, 3),
Arrays.asList(2, 3),
Arrays.asList(2, 3),
RebalanceUtils.getStoreNames(storeDefList),

0 comments on commit b5d65b2

Please sign in to comment.