Skip to content

Commit

Permalink
Clean up Rebalance(Plan|Controller)CLI
Browse files Browse the repository at this point in the history
RebalancePlan(CLI)
- set batch size default to INTEGER.MAX_VALUE

RebalanceControllerCLI
- Cleaned up handling of all optional arguments
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 039b7fc commit 8a9a791
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 48 deletions.
9 changes: 7 additions & 2 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ public class RebalancePlan {

private static final Logger logger = Logger.getLogger(RebalancePlan.class);

// TODO: Rename this and set to (effectively) infinite.
public final static int PRIMARY_PARTITION_BATCH_SIZE = 1;
/**
* The number of "primary" partition IDs to move in each batch of the plan.
* Moving a primary partition ID between nodes results in between zero and
* (# of zones) * (2) * (# stores) partition-stores being moved. The (2)
* comes from an upper bound of a single move affecting two-nodes per zone.
*/
public final static int BATCH_SIZE = Integer.MAX_VALUE;

private final Cluster currentCluster;
private final List<StoreDefinition> currentStores;
Expand Down
61 changes: 33 additions & 28 deletions src/java/voldemort/tools/RebalanceControllerCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import joptsimple.OptionException;
import joptsimple.OptionParser;
Expand Down Expand Up @@ -53,22 +52,20 @@ private static void setupParser() {
parser.accepts("donor-based", "Execute donor-based rebalancing.");
parser.accepts("stealer-based", "Execute stealer-based rebalancing (default).");

// TODO: WTF
// TODO: Can this option be deprecated?
parser.accepts("tries",
"(1) Tries during rebalance [ Default: "
+ RebalanceController.MAX_TRIES_REBALANCING
+ " ] (2) Number of tries while generating new metadata")
"Tries during stealer-based rebalance [ Default: "
+ RebalanceController.MAX_TRIES_REBALANCING + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-tries");
// TODO: WTF
// TODO: Can this option be deprecated?
parser.accepts("timeout",
"Time-out in seconds for rebalancing of a single task ( stealer - donor tuple ) [ Default : "
+ RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC + " ]")
.withRequiredArg()
.ofType(Long.class)
.describedAs("sec");
// TODO: WTF
parser.accepts("parallelism",
"Number of rebalances to run in parallel [ Default:"
+ RebalanceController.MAX_PARALLEL_REBALANCING + " ]")
Expand All @@ -84,15 +81,14 @@ private static void setupParser() {
.withRequiredArg()
.describedAs("stores.xml");

// TODO: These options are common with RebalancePlanCLI. How to share?
// TODO: Switch default for batch size to infinite.
parser.accepts("batch",
"Number of primary partitions to move together [ Default : "
+ RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE + " ]")
"Number of primary partitions to move together [ RebalancePlan parameter; Default : "
+ RebalancePlan.BATCH_SIZE + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-primary-partitions");
parser.accepts("output-dir", "Output directory in which to dump per-batch metadata")
parser.accepts("output-dir",
"RebalancePlan parameter; Output directory in which to dump per-batch metadata")
.withRequiredArg()
.ofType(String.class)
.describedAs("path");
Expand All @@ -108,11 +104,10 @@ private static void printUsage() {
help.append(" --final-cluster <clusterXML>\n");
help.append(" Optional:\n");
help.append(" --final-stores <storesXML> [ Needed for zone expansion ]\n");
help.append(" --output-dir [ Output directory is where we store the optimized cluster ]\n");
help.append(" --output-dir [ Output directory in which plan is stored ]\n");
help.append(" --batch <batch> [ Number of primary partitions to move in each rebalancing batch. ]\n");
help.append(" --output-dir <outputDir> [ Directory in which cluster metadata is dumped for each batch of the plan. ]\n");
help.append(" --stealer-based or --donor-based [ Defaults to stealer-based. ]\n");
// TODO: Add in WTF members: parallelism, tries, timeout, delete, other?

try {
parser.printHelpOn(System.out);
Expand Down Expand Up @@ -155,16 +150,31 @@ public static void main(String[] args) throws Exception {
// Bootstrap & fetch current cluster/stores
String bootstrapURL = (String) options.valueOf("url");

// Process optional "controller" arguments
boolean stealerBased = true;
if(options.has("donor-based")) {
stealerBased = false;
}
// TODO: Process other optional controller args

int parallelism = RebalanceController.MAX_PARALLEL_REBALANCING;
if(options.has("parallelism")) {
parallelism = (Integer) options.valueOf("parallelism");
}

int tries = RebalanceController.MAX_TRIES_REBALANCING;
if(options.has("tries")) {
tries = (Integer) options.valueOf("tries");
}

long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC;
if(options.has("timeout")) {
timeout = (Integer) options.valueOf("timeout");
}

RebalanceController rebalanceController = new RebalanceController(bootstrapURL,
1,
2,
TimeUnit.DAYS.toSeconds(30),
parallelism,
tries,
timeout,
stealerBased);

Cluster currentCluster = rebalanceController.getCurrentCluster();
Expand All @@ -184,25 +194,20 @@ public static void main(String[] args) throws Exception {
RebalanceUtils.validateClusterStores(finalCluster, finalStoreDefs);
RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster);

// Process optional planning args
int batchSize = CmdUtils.valueOf(options,
"batch",
RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE);
// Process optional "planning" arguments
int batchSize = CmdUtils.valueOf(options, "batch", RebalancePlan.BATCH_SIZE);

String outputDir = null;
if(options.has("output-dir")) {
outputDir = (String) options.valueOf("output-dir");
}

// Plan rebalancing
// TODO: Figure out when/how stealerBased flag should be used.
RebalancePlan rebalancePlan = new RebalancePlan(currentCluster,
// Plan & execute rebalancing.
rebalanceController.rebalance(new RebalancePlan(currentCluster,
currentStoreDefs,
finalCluster,
finalStoreDefs,
batchSize,
outputDir);
// Execute rebalancing plan.
rebalanceController.rebalance(rebalancePlan);
outputDir));
}
}
7 changes: 2 additions & 5 deletions src/java/voldemort/tools/RebalancePlanCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ private static void setupParser() {
"Path to target store definition xml. Needed for zone expansion.")
.withRequiredArg()
.describedAs("stores.xml");
// TODO: Switch default for batch size to infinite.
parser.accepts("batch",
"Number of primary partitions to move together [ Default : "
+ RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE + " ]")
+ RebalancePlan.BATCH_SIZE + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-primary-partitions");
Expand Down Expand Up @@ -150,9 +149,7 @@ public static void main(String[] args) throws Exception {
List<StoreDefinition> targetStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(targetStoresXML));

// Optional args
int batchSize = CmdUtils.valueOf(options,
"batch",
RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE);
int batchSize = CmdUtils.valueOf(options, "batch", RebalancePlan.BATCH_SIZE);

String outputDir = null;
if(options.has("output-dir")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void testRORWRebalance() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize);
try {

Expand Down Expand Up @@ -313,7 +313,7 @@ public void testRORWRebalanceWithReplication() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize);

try {
Expand Down Expand Up @@ -385,7 +385,7 @@ public void testRORebalanceWithReplication() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize);

try {
Expand Down Expand Up @@ -435,7 +435,7 @@ public void testRWRebalanceWithReplication() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize);

try {
Expand Down Expand Up @@ -489,7 +489,7 @@ public void testRebalanceCleanPrimary() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize);

try {
Expand Down Expand Up @@ -567,7 +567,7 @@ public void testRebalanceCleanSecondary() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize);

try {
Expand Down Expand Up @@ -786,7 +786,7 @@ public void testProxyGetDuringRebalancing() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
final RebalancePlan rebalancePlan = rebalanceClient.getPlan(updatedTargetCluster,
batchSize);

Expand Down Expand Up @@ -955,7 +955,7 @@ public void testProxyPutDuringRebalancing() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
final RebalancePlan rebalancePlan = rebalanceClient.getPlan(updatedTargetCluster,
batchSize);

Expand Down Expand Up @@ -1172,7 +1172,7 @@ public void testServerSideRouting() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
final RebalancePlan rebalancePlan = rebalanceClient.getPlan(updatedTargetCluster,
batchSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testRWRebalance() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize);

try {
Expand Down Expand Up @@ -369,7 +369,7 @@ public void testRebalanceCleanPrimarySecondary() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize);

try {
Expand Down Expand Up @@ -484,7 +484,7 @@ public void testProxyGetDuringRebalancing() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
final RebalancePlan rebalancePlan = rebalanceClient.getPlan(targetCluster, batchSize);

try {
Expand Down Expand Up @@ -646,7 +646,7 @@ public void testProxyPutDuringRebalancing() throws Exception {
maxTries,
timeout,
stealerBased);
int batchSize = RebalancePlan.PRIMARY_PARTITION_BATCH_SIZE;
int batchSize = RebalancePlan.BATCH_SIZE;
final RebalancePlan rebalancePlan = rebalanceClient.getPlan(updatedTargetCluster,
batchSize);

Expand Down

0 comments on commit 8a9a791

Please sign in to comment.