Skip to content

Commit

Permalink
Refactoring: cleaned up util methods and moved them to appropriate ut…
Browse files Browse the repository at this point in the history
…il classes

RebalanceUtils
- all dump cluster methods are now here with more consistent interfaces
- analyzeInvalidMetadataRate is here (for lack of better place to put it)

RepartitionUtils
- moved all helper/util methods out of this class

Utils
- added removeItemsToSplitListEvenly, distributeEvenlyIntoList, and distributeEvenlyIntoMap

Moved test methods around to match new locations of util methods.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent fdc0eb3 commit 57686e4
Show file tree
Hide file tree
Showing 7 changed files with 479 additions and 475 deletions.
15 changes: 7 additions & 8 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -16,7 +16,6 @@

package voldemort.client.rebalance;

import java.io.File;
import java.io.StringReader;
import java.text.DecimalFormat;
import java.util.HashMap;
Expand Down Expand Up @@ -189,9 +188,9 @@ private void rebalancePerClusterTransition(Cluster currentCluster,

// Output initial and final cluster
if(rebalanceConfig.hasOutputDirectory())
RebalanceUtils.dumpCluster(currentCluster,
targetCluster,
new File(rebalanceConfig.getOutputDirectory()));
RebalanceUtils.dumpClusters(currentCluster,
targetCluster,
rebalanceConfig.getOutputDirectory());

// Start first dry run to compute the stolen partitions
for(Node stealerNode: targetCluster.getNodes()) {
Expand Down Expand Up @@ -310,10 +309,10 @@ private void rebalancePerClusterTransition(Cluster currentCluster,

// Output the transition plan to the output directory
if(rebalanceConfig.hasOutputDirectory())
RebalanceUtils.dumpCluster(currentCluster,
transitionCluster,
new File(rebalanceConfig.getOutputDirectory()),
"batch-" + Integer.toString(batchCounter) + ".");
RebalanceUtils.dumpClusters(currentCluster,
transitionCluster,
rebalanceConfig.getOutputDirectory(),
"batch-" + Integer.toString(batchCounter) + ".");

long startTimeMs = System.currentTimeMillis();
rebalancePerPartitionTransition(orderedClusterTransition);
Expand Down
14 changes: 6 additions & 8 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Expand Up @@ -16,7 +16,6 @@

package voldemort.client.rebalance;

import java.io.File;
import java.io.StringReader;
import java.text.DecimalFormat;
import java.util.ArrayList;
Expand All @@ -33,7 +32,6 @@
import voldemort.utils.MoveMap;
import voldemort.utils.PartitionBalance;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.RepartitionUtils;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;

Expand Down Expand Up @@ -98,7 +96,7 @@ public RebalancePlan(final Cluster currentCluster,
logger.info("Final cluster : " + finalCluster);
logger.info("Batch size : " + batchSize);

logger.info(RepartitionUtils.dumpInvalidMetadataRate(currentCluster,
logger.info(RebalanceUtils.analyzeInvalidMetadataRate(currentCluster,
currentStores,
finalCluster,
finalStores));
Expand Down Expand Up @@ -149,7 +147,7 @@ private void plan() {

// Output initial and final cluster
if(outputDir != null)
RebalanceUtils.dumpCluster(targetCluster, finalCluster, new File(outputDir));
RebalanceUtils.dumpClusters(targetCluster, finalCluster, outputDir);

// Determine which partitions must be stolen
for(Node stealerNode: finalCluster.getNodes()) {
Expand Down Expand Up @@ -191,10 +189,10 @@ private void plan() {
// TODO: Change naming convention in dumpCluster to be current- &
// final- or target- & final-
if(outputDir != null)
RebalanceUtils.dumpCluster(batchTargetCluster,
batchFinalCluster,
new File(outputDir),
"batch-" + Integer.toString(batches) + ".");
RebalanceUtils.dumpClusters(batchTargetCluster,
batchFinalCluster,
outputDir,
"batch-" + Integer.toString(batches) + ".");

// Generate a plan to compute the tasks
// TODO: OK to remove option to "delete" from planning?
Expand Down
150 changes: 124 additions & 26 deletions src/java/voldemort/utils/RebalanceUtils.java
Expand Up @@ -49,9 +49,11 @@
import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.routing.StoreRoutingPlan;
import voldemort.server.VoldemortConfig;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreUtils;
import voldemort.store.bdb.BdbStorageConfiguration;
import voldemort.store.metadata.MetadataStore.VoldemortState;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
Expand All @@ -76,7 +78,7 @@ public class RebalanceUtils {
public final static List<String> canRebalanceList = Arrays.asList(BdbStorageConfiguration.TYPE_NAME,
ReadOnlyStorageConfiguration.TYPE_NAME);

public final static String initialClusterFileName = "initial-cluster.xml";
public final static String currentClusterFileName = "current-cluster.xml";
public final static String finalClusterFileName = "final-cluster.xml";

/**
Expand Down Expand Up @@ -978,35 +980,19 @@ public static String printMap(final Map<Integer, Set<Pair<Integer, Integer>>> no
/**
* Given the initial and final cluster dumps it into the output directory
*
* @param initialCluster Initial cluster metadata
* @param currentCluster Initial cluster metadata
* @param finalCluster Final cluster metadata
* @param outputDir Output directory where to dump this file
* @param filePrefix String to prepend to the initial & final cluster
* metadata files
* @throws IOException
*/
public static void dumpCluster(Cluster initialCluster,
Cluster finalCluster,
File outputDir,
String filePrefix) {

// Create the output directory if it doesn't exist
if(!outputDir.exists()) {
Utils.mkdirs(outputDir);
}

// Get the file paths
File initialClusterFile = new File(outputDir, filePrefix + initialClusterFileName);
File finalClusterFile = new File(outputDir, filePrefix + finalClusterFileName);

// Write the output
ClusterMapper mapper = new ClusterMapper();
try {
FileUtils.writeStringToFile(initialClusterFile, mapper.writeCluster(initialCluster));
FileUtils.writeStringToFile(finalClusterFile, mapper.writeCluster(finalCluster));
} catch(IOException e) {
logger.error("Error writing cluster metadata to file");
}
public static void dumpClusters(Cluster currentCluster,
Cluster finalCluster,
String outputDirName,
String filePrefix) {
dumpClusterToFile(outputDirName, filePrefix + currentClusterFileName, currentCluster);
dumpClusterToFile(outputDirName, filePrefix + finalClusterFileName, finalCluster);
}

/**
Expand All @@ -1017,8 +1003,59 @@ public static void dumpCluster(Cluster initialCluster,
* @param outputDir Output directory where to dump this file
* @throws IOException
*/
public static void dumpCluster(Cluster initialCluster, Cluster finalCluster, File outputDir) {
dumpCluster(initialCluster, finalCluster, outputDir, "");
public static void dumpClusters(Cluster initialCluster,
Cluster finalCluster,
String outputDirName) {
dumpClusters(initialCluster, finalCluster, outputDirName, "");
}

/**
* Prints a cluster xml to a file.
*
* @param outputDirName
* @param fileName
* @param cluster
*/
public static void dumpClusterToFile(String outputDirName, String fileName, Cluster cluster) {

if(outputDirName != null) {
File outputDir = new File(outputDirName);
if(!outputDir.exists()) {
Utils.mkdirs(outputDir);
}

try {
FileUtils.writeStringToFile(new File(outputDirName, fileName),
new ClusterMapper().writeCluster(cluster));
} catch(IOException e) {
logger.error("IOException during dumpClusterToFile: " + e);
}
}
}

/**
* Prints a balance analysis to a file.
*
* @param outputDirName
* @param baseFileName suffix '.analysis' is appended to baseFileName.
* @param partitionBalance
*/
public static void dumpAnalysisToFile(String outputDirName,
String baseFileName,
PartitionBalance partitionBalance) {
if(outputDirName != null) {
File outputDir = new File(outputDirName);
if(!outputDir.exists()) {
Utils.mkdirs(outputDir);
}

try {
FileUtils.writeStringToFile(new File(outputDirName, baseFileName + ".analysis"),
partitionBalance.toString());
} catch(IOException e) {
logger.error("IOException during dumpAnalysisToFile: " + e);
}
}
}

/**
Expand Down Expand Up @@ -1188,4 +1225,65 @@ public Thread newThread(Runnable r) {
}
});
}

/**
* Compares current cluster with target cluster. Uses pertinent store defs
* for each cluster to determine if a node that hosts a zone-primary in the
* current cluster will no longer host any zone-nary in the target cluster.
* This check is the precondition for a server returning an invalid metadata
* exception to a client on a normal-case put or get. Normal-case being that
* the zone-primary receives the pseudo-master put or the get operation.
*
* @param currentCluster
* @param currentStoreDefs
* @param targetCluster
* @param targetStoreDefs
* @return pretty-printed string documenting invalid metadata rates for each
* zone.
*/
public static String analyzeInvalidMetadataRate(final Cluster currentCluster,
List<StoreDefinition> currentStoreDefs,
final Cluster targetCluster,
List<StoreDefinition> targetStoreDefs) {
StringBuilder sb = new StringBuilder();
sb.append("Dump of invalid metadata rates per zone").append(Utils.NEWLINE);

HashMap<StoreDefinition, Integer> uniqueStores = KeyDistributionGenerator.getUniqueStoreDefinitionsWithCounts(currentStoreDefs);

for(StoreDefinition currentStoreDef: uniqueStores.keySet()) {
sb.append("Store exemplar: " + currentStoreDef.getName())
.append(Utils.NEWLINE)
.append("\tThere are " + uniqueStores.get(currentStoreDef) + " other similar stores.")
.append(Utils.NEWLINE);

StoreRoutingPlan currentSRP = new StoreRoutingPlan(currentCluster, currentStoreDef);
StoreDefinition targetStoreDef = StoreUtils.getStoreDef(targetStoreDefs,
currentStoreDef.getName());
StoreRoutingPlan targetSRP = new StoreRoutingPlan(targetCluster, targetStoreDef);

// Only care about existing zones
for(int zoneId: currentCluster.getZoneIds()) {
int zoneLocalPrimaries = 0;
int invalidMetadata = 0;
// Examine nodes in current cluster in existing zone.
for(int nodeId: currentCluster.getNodeIdsInZone(zoneId)) {
for(int partitionId: targetSRP.getZonePrimaryPartitionIds(nodeId)) {
zoneLocalPrimaries++;
if(!currentSRP.getNaryPartitionIds(nodeId).contains(partitionId)) {
invalidMetadata++;
}
}
}
float rate = invalidMetadata / (float) zoneLocalPrimaries;
sb.append("\tZone " + zoneId)
.append(" : total zone primaries " + zoneLocalPrimaries)
.append(", # that trigger invalid metadata " + invalidMetadata)
.append(" => " + rate)
.append(Utils.NEWLINE);
}
}

return sb.toString();
}

}

0 comments on commit 57686e4

Please sign in to comment.