Skip to content

Commit

Permalink
Fixed cluster nomenclature: current, interim, final
Browse files Browse the repository at this point in the history
Touched a lot of code to normalize variable names, method names, and
comments about clusters:
- current : current cluster in prod
- interim : current cluster + new nodes/zones w empty partitions
- final : final cluster w shuffled partitions and/or populated new
  nodes/zones

Added TODO about voldemort server excepting on invalid cluster xml

Dropped TODOs that were unnecessary
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 4c276b8 commit 89969de
Show file tree
Hide file tree
Showing 19 changed files with 501 additions and 532 deletions.
36 changes: 17 additions & 19 deletions src/java/voldemort/client/rebalance/RebalanceBatchPlan.java
Expand Up @@ -31,10 +31,6 @@

import com.google.common.collect.Maps;

// TODO: (refactor) Fix cluster nomenclature in general: make sure there are
// exactly three prefixes used to distinguish cluster xml: initial or current,
// target or spec or expanded, and final. 'target' has historically been
// overloaded to mean spec/expanded or final depending on context.
/**
* Constructs a batch plan that goes from currentCluster to finalCluster. The
* partition-stores included in the move are based on those listed in storeDefs.
Expand Down Expand Up @@ -258,10 +254,10 @@ public List<RebalancePartitionsInfo> buildRebalancePartitionsInfos() {
*/
private List<RebalancePartitionsInfo> constructBatchPlan() {
// Construct all store routing plans once.
HashMap<String, StoreRoutingPlan> targetStoreRoutingPlans = new HashMap<String, StoreRoutingPlan>();
HashMap<String, StoreRoutingPlan> currentStoreRoutingPlans = new HashMap<String, StoreRoutingPlan>();
for(StoreDefinition storeDef: currentStoreDefs) {
targetStoreRoutingPlans.put(storeDef.getName(), new StoreRoutingPlan(currentCluster,
storeDef));
currentStoreRoutingPlans.put(storeDef.getName(), new StoreRoutingPlan(currentCluster,
storeDef));
}
HashMap<String, StoreRoutingPlan> finalStoreRoutingPlans = new HashMap<String, StoreRoutingPlan>();
for(StoreDefinition storeDef: finalStoreDefs) {
Expand All @@ -277,28 +273,30 @@ private List<RebalancePartitionsInfo> constructBatchPlan() {

// Consider all store definitions ...
for(StoreDefinition storeDef: finalStoreDefs) {
StoreRoutingPlan targetSRP = targetStoreRoutingPlans.get(storeDef.getName());
StoreRoutingPlan currentSRP = currentStoreRoutingPlans.get(storeDef.getName());
StoreRoutingPlan finalSRP = finalStoreRoutingPlans.get(storeDef.getName());
for(int stealerPartitionId: finalSRP.getZoneNAryPartitionIds(stealerNodeId)) {
// ... and all nary partition-stores,
// now steal what is needed

// Optimization: Do not steal a partition-store you already
// host!
if(targetSRP.getReplicationNodeList(stealerPartitionId).contains(stealerNodeId)) {
if(currentSRP.getReplicationNodeList(stealerPartitionId)
.contains(stealerNodeId)) {
continue;
}

// Determine which node to steal from.
int donorNodeId = getDonorId(targetSRP,
int donorNodeId = getDonorId(currentSRP,
finalSRP,
stealerZoneId,
stealerNodeId,
stealerPartitionId);

// Add this specific partition-store steal to the overall
// plan
int donorReplicaType = targetSRP.getReplicaType(donorNodeId, stealerPartitionId);
int donorReplicaType = currentSRP.getReplicaType(donorNodeId,
stealerPartitionId);
rpiBuilder.addPartitionStoreMove(stealerNodeId,
donorNodeId,
storeDef.getName(),
Expand All @@ -320,12 +318,12 @@ private List<RebalancePartitionsInfo> constructBatchPlan() {
* Current policy:
*
* 1) If possible, a stealer node that is the zone n-ary in the finalCluster
* steals from the zone n-ary in the targetCluster in the same zone.
* steals from the zone n-ary in the currentCluster in the same zone.
*
* 2) If there are no partition-stores to steal in the same zone (i.e., this
* is the "zone expansion" use case), then a differnt policy must be used.
* The stealer node that is the zone n-ary in the finalCluster determines
* which pre-existing zone in the targetCluster hosts the primary partition
* which pre-existing zone in the currentCluster hosts the primary partition
* id for the partition-store. The stealer then steals the zone n-ary from
* that pre-existing zone.
*
Expand All @@ -352,14 +350,14 @@ private List<RebalancePartitionsInfo> constructBatchPlan() {
* n-aries in the new zone steal from the single cross-zone stealer in the
* zone. This would require apparatus in the RebalanceController to work.
*
* @param targetSRP
* @param currentSRP
* @param finalSRP
* @param stealerZoneId
* @param stealerNodeId
* @param stealerPartitionId
* @return the node id of the donor for this partition Id.
*/
protected int getDonorId(StoreRoutingPlan targetSRP,
protected int getDonorId(StoreRoutingPlan currentSRP,
StoreRoutingPlan finalSRP,
int stealerZoneId,
int stealerNodeId,
Expand All @@ -369,16 +367,16 @@ protected int getDonorId(StoreRoutingPlan targetSRP,
stealerPartitionId);

int donorZoneId;
if(targetSRP.zoneNAryExists(stealerZoneId, stealerZoneNAry, stealerPartitionId)) {
if(currentSRP.zoneNAryExists(stealerZoneId, stealerZoneNAry, stealerPartitionId)) {
// Steal from local n-ary (since one exists).
donorZoneId = stealerZoneId;
} else {
// Steal from zone that hosts primary partition Id.
int targetMasterNodeId = targetSRP.getNodeIdForPartitionId(stealerPartitionId);
donorZoneId = currentCluster.getNodeById(targetMasterNodeId).getZoneId();
int currentMasterNodeId = currentSRP.getNodeIdForPartitionId(stealerPartitionId);
donorZoneId = currentCluster.getNodeById(currentMasterNodeId).getZoneId();
}

return targetSRP.getNodeIdForZoneNary(donorZoneId, stealerZoneNAry, stealerPartitionId);
return currentSRP.getNodeIdForZoneNary(donorZoneId, stealerZoneNAry, stealerPartitionId);

}

Expand Down
9 changes: 4 additions & 5 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -125,12 +125,11 @@ public RebalancePlan getPlan(Cluster finalCluster,
List<StoreDefinition> finalStoreDefs,
int batchSize) {
RebalanceUtils.validateClusterStores(finalCluster, finalStoreDefs);
RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster);
// If an interim cluster is needed, then currentCluster should be an
// interim cluster! I.e., it should include new nodes/zones without any
// partitions assigned to them.
RebalanceUtils.validateInterimFinalCluster(currentCluster, finalCluster);

// TODO: (currentCluster vs interimCluster) Add more validation before
// constructing plan? Given that currentCluster was polled from prod
// cluster, should confirm that it is an "interim cluster" i.e., has
// same (superset?) of nodes as are in finalCluster.
String outputDir = null;
return new RebalancePlan(currentCluster,
currentStoreDefs,
Expand Down
8 changes: 2 additions & 6 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Expand Up @@ -79,7 +79,7 @@ public class RebalancePlan {
* be used to transform deployed store definitions. In practice, this use
* case has not been tested.
*
* @param currentCluster current deployed cluster
* @param currentCluster current deployed cluster.
* @param currentStoreDefs current deployed store defs
* @param finalCluster desired deployed cluster
* @param finalStoreDefs desired deployed store defs
Expand All @@ -99,11 +99,7 @@ public RebalancePlan(final Cluster currentCluster,
this.batchSize = batchSize;
this.outputDir = outputDir;

// TODO: (currentCluster vs interimCluster) Instead of divining
// interimCluster from currentCluster and finalCluster, should we
// require that interimCluster be passed in?

// Derive the targetCluster from current & final cluster xml
// Derive the interimCluster from current & final cluster xml
RebalanceUtils.validateCurrentFinalCluster(this.currentCluster, this.finalCluster);
Cluster interimCluster = RebalanceUtils.getInterimCluster(this.currentCluster,
this.finalCluster);
Expand Down
19 changes: 18 additions & 1 deletion src/java/voldemort/server/VoldemortServer.java
Expand Up @@ -123,8 +123,25 @@ public AsyncOperationService getAsyncRunner() {

/**
* Compare the configured hostname with all the ip addresses and hostnames
* for the server node, and log a warning if there is a mismatch
* for the server node, and log a warning if there is a mismatch.
*
*/
// TODO: VoldemortServer should throw exception if cluster xml, node id, and
// server's state are not all mutually consistent.
//
// "I attempted to do this in the past. In practice its hard since the
// hostname strings returned may not exactly match what's in cluster.xml
// (ela4-app0000.prod vs ela4-app0000.prod.linkedin.com). And for folks
// running with multiple interfaces and stuff in the open source world, not
// sure how it would fan out..
//
// I am in favour of doing this though.. May be implement a server config,
// "strict.hostname.check.on.startup" which is false by default and true for
// our environments and our SRE makes sure there is an exact match?" --
// VChandar
//
// "Strict host name doesn't work? We can always trim the rest before the comparison."
// -- LGao
private void checkHostName() {
try {
HashSet<String> ipAddrList = new HashSet<String>();
Expand Down
Expand Up @@ -80,8 +80,8 @@ public class DonorBasedRebalanceAsyncOperation extends RebalanceAsyncOperation {
private final StoreRepository storeRepository;

private final AtomicBoolean running = new AtomicBoolean(true);
private final Cluster initialCluster;
private final Cluster targetCluster;
private final Cluster currentCluster;
private final Cluster finalCluster;
private final boolean usePartitionScan;

private final HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> storeToNodePartitionMapping;
Expand Down Expand Up @@ -121,8 +121,8 @@ public DonorBasedRebalanceAsyncOperation(Rebalancer rebalancer,
+ " partition-stores.");
this.storeRepository = storeRepository;
this.stealInfos = stealInfos;
this.targetCluster = metadataStore.getCluster();
this.initialCluster = stealInfos.get(0).getInitialCluster();
this.finalCluster = metadataStore.getCluster();
this.currentCluster = stealInfos.get(0).getInitialCluster();
this.usePartitionScan = usePartitionScan;
this.partitionStoreCount = RebalanceUtils.countPartitionStores(stealInfos);

Expand Down Expand Up @@ -275,7 +275,7 @@ public Thread newThread(Runnable r) {
if(voldemortConfig.getRebalancingOptimization() && !storageEngine.isPartitionAware()) {
for(Pair<Integer, HashMap<Integer, List<Integer>>> entry: stealerNodeToMappingTuples) {
HashMap<Integer, List<Integer>> optimizedReplicaToPartition = RebalanceUtils.getOptimizedReplicaToPartitionList(entry.getFirst(),
initialCluster,
currentCluster,
storeDef,
entry.getSecond());

Expand Down Expand Up @@ -329,7 +329,7 @@ private void fetchEntriesForStealers(StorageEngine<ByteArray, byte[], byte[]> st
HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue,
String storeName) {
int scanned = 0;
int[] fetched = new int[targetCluster.getNumberOfNodes()];
int[] fetched = new int[finalCluster.getNumberOfNodes()];
long startTime = System.currentTimeMillis();

ClosableIterator<ByteArray> keys = storageEngine.keys();
Expand All @@ -340,7 +340,7 @@ private void fetchEntriesForStealers(StorageEngine<ByteArray, byte[], byte[]> st
scanned++;
List<Integer> nodeIds = StoreRoutingPlan.checkKeyBelongsToPartition(key.get(),
optimizedStealerNodeToMappingTuples,
targetCluster,
finalCluster,
storeDef);

if(nodeIds.size() > 0) {
Expand Down Expand Up @@ -368,7 +368,7 @@ private void fetchEntriesForStealersPartitionScan(StorageEngine<ByteArray, byte[
HashMap<Integer, SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>> nodeToQueue,
String storeName) {
int scanned = 0;
int[] fetched = new int[targetCluster.getNumberOfNodes()];
int[] fetched = new int[finalCluster.getNumberOfNodes()];
long startTime = System.currentTimeMillis();

// construct a set of all the partitions we will be fetching
Expand All @@ -388,7 +388,7 @@ private void fetchEntriesForStealersPartitionScan(StorageEngine<ByteArray, byte[
for(Integer partition: partitionsToDonate) {
if(!StoreRoutingPlan.checkPartitionBelongsToNode(partition,
voldemortConfig.getNodeId(),
initialCluster,
currentCluster,
storeDef)) {
logger.info("Node " + voldemortConfig.getNodeId()
+ " does not seem to contain partition " + partition
Expand All @@ -408,7 +408,7 @@ private void fetchEntriesForStealersPartitionScan(StorageEngine<ByteArray, byte[
scanned++;
List<Integer> nodeIds = StoreRoutingPlan.checkKeyBelongsToPartition(key.get(),
optimizedStealerNodeToMappingTuples,
targetCluster,
finalCluster,
storeDef);

if(nodeIds.size() > 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/java/voldemort/tools/RebalanceControllerCLI.java
Expand Up @@ -71,11 +71,11 @@ private static void setupParser() {
.ofType(Long.class)
.describedAs("proxy pause");

parser.accepts("final-cluster", "Path to target cluster xml")
parser.accepts("final-cluster", "Path to final cluster xml")
.withRequiredArg()
.describedAs("cluster.xml");
parser.accepts("final-stores",
"Path to target store definition xml. Needed for zone expansion.")
"Path to final store definition xml. Needed for zone expansion.")
.withRequiredArg()
.describedAs("stores.xml");

Expand Down

0 comments on commit 89969de

Please sign in to comment.