Skip to content

Commit

Permalink
Minor changes to address second round of review feedback on rebalance…
Browse files Browse the repository at this point in the history
… plan.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 5244216 commit 8bc5e17
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 35 deletions.
8 changes: 4 additions & 4 deletions src/java/voldemort/client/rebalance/RebalanceBatchPlan.java
Expand Up @@ -317,9 +317,12 @@ protected int getDonorId(StoreRoutingPlan targetSRP,
int stealerZoneId,
int stealerNodeId,
int stealerPartitionId) {
int stealerZoneNAry = finalSRP.getZoneNaryForNodesPartition(stealerZoneId,
stealerNodeId,
stealerPartitionId);

int donorZoneId;
if(targetSRP.zoneHasReplica(stealerZoneId, stealerPartitionId)) {
if(targetSRP.zoneNAryExists(stealerZoneId, stealerZoneNAry, stealerPartitionId)) {
// Steal from local n-ary (since one exists).
donorZoneId = stealerZoneId;
} else {
Expand All @@ -328,9 +331,6 @@ protected int getDonorId(StoreRoutingPlan targetSRP,
donorZoneId = targetCluster.getNodeById(targetMasterNodeId).getZoneId();
}

int stealerZoneNAry = finalSRP.getZoneNaryForNodesPartition(stealerZoneId,
stealerNodeId,
stealerPartitionId);
return targetSRP.getNodeIdForZoneNary(donorZoneId, stealerZoneNAry, stealerPartitionId);

}
Expand Down
20 changes: 3 additions & 17 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Expand Up @@ -16,7 +16,6 @@

package voldemort.client.rebalance;

import java.io.StringReader;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -33,7 +32,6 @@
import voldemort.utils.MoveMap;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;

import com.google.common.collect.Lists;
import com.google.common.collect.TreeMultimap;
Expand Down Expand Up @@ -156,11 +154,11 @@ private void plan() {

// Determine plan batch-by-batch
int batches = 0;
Cluster batchTargetCluster = cloneCluster(targetCluster);
Cluster batchTargetCluster = Cluster.cloneCluster(targetCluster);
while(!stealerToStolenPrimaryPartitions.isEmpty()) {

// Generate a batch partitions to move
Cluster batchFinalCluster = cloneCluster(batchTargetCluster);
Cluster batchFinalCluster = Cluster.cloneCluster(batchTargetCluster);
int partitions = 0;
List<Entry<Integer, Integer>> partitionsMoved = Lists.newArrayList();
for(Entry<Integer, Integer> stealerToPartition: stealerToStolenPrimaryPartitions.entries()) {
Expand Down Expand Up @@ -199,24 +197,12 @@ private void plan() {
zoneMoveMap.add(RebalanceBatchPlan.getZoneMoveMap());

batches++;
batchTargetCluster = cloneCluster(batchFinalCluster);
batchTargetCluster = Cluster.cloneCluster(batchFinalCluster);
}

logger.info(this);
}

/**
* In the absence of a Cluster.clone() operation, hack a clone method for
* local use.
*
* @param cluster
* @return clone of Cluster cluster.
*/
private Cluster cloneCluster(Cluster cluster) {
ClusterMapper mapper = new ClusterMapper();
return mapper.readCluster(new StringReader(mapper.writeCluster(cluster)));
}

/**
* Determines storage overhead and returns pretty printed summary.
*
Expand Down
17 changes: 14 additions & 3 deletions src/java/voldemort/cluster/Cluster.java
Expand Up @@ -17,6 +17,7 @@
package voldemort.cluster;

import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -32,6 +33,7 @@
import voldemort.annotations.jmx.JmxGetter;
import voldemort.annotations.jmx.JmxManaged;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;

import com.google.common.collect.Sets;

Expand Down Expand Up @@ -259,9 +261,18 @@ public String toString(boolean isDetailed) {
return builder.toString();
}

// TODO: Add a .clone() implementation. See hacked method in
// RebalancePlan.cloneCluster for example of current approach to cloning
// (use ClusterMapper to serde via XML...)
// TODO: Add a proper .clone() implementation.
/**
* In the absence of a proper Cluster.clone() operation, this hack safely
* clones a Cluster object via serde to/from XML.
*
* @param cluster
* @return clone of Cluster cluster.
*/
public static Cluster cloneCluster(Cluster cluster) {
ClusterMapper mapper = new ClusterMapper();
return mapper.readCluster(new StringReader(mapper.writeCluster(cluster)));
}

@Override
public boolean equals(Object second) {
Expand Down
24 changes: 14 additions & 10 deletions src/java/voldemort/routing/StoreRoutingPlan.java
Expand Up @@ -258,20 +258,24 @@ public int getZoneNAry(int zoneId, int nodeId, byte[] key) {
}

/**
* checks if zone replicates partition Id. False should only be returned in
* zone expansion use cases.
* checks if zone has a zone n-ary for partition Id. False should only be
* returned in zone expansion use cases.
*
* @param zoneId
* @param zoneNAry zone n-ary replica to confirm
* @param partitionId
* @return true iff partitionId is replicated in zone id.
* @return true iff partitionId has zone-nary replica in zone id .
*/
// TODO: add unit test.
public boolean zoneHasReplica(int zoneId, int partitionId) {
List<Integer> replicatingNodeIds = getReplicationNodeList(partitionId);
for(int replicatingNodeId: replicatingNodeIds) {
public boolean zoneNAryExists(int zoneId, int zoneNAry, int partitionId) {
int currentZoneNAry = -1;
for(int replicatingNodeId: getReplicationNodeList(partitionId)) {
Node replicatingNode = cluster.getNodeById(replicatingNodeId);
if(replicatingNode.getZoneId() == zoneId) {
return true;
currentZoneNAry++;
if(currentZoneNAry == zoneNAry) {
return true;
}
}
}
return false;
Expand Down Expand Up @@ -303,9 +307,9 @@ public int getZoneNaryForNodesPartition(int zoneId, int nodeId, int partitionId)
// zone
if(replicatingNode.getZoneId() == zoneId) {
zoneNAry++;
}
if(replicatingNode.getId() == nodeId) {
return zoneNAry;
if(replicatingNode.getId() == nodeId) {
return zoneNAry;
}
}
}
if(zoneNAry > 0) {
Expand Down
Expand Up @@ -84,7 +84,7 @@ public void run() throws VoldemortException {
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
needWait = false;
} catch(InterruptedException e) {
logger.info("sleep interrupted while waiting for remote node to recover:" + e);
logger.error("sleep interrupted while waiting for remote node to recover", e);
// continue
}
}
Expand Down

0 comments on commit 8bc5e17

Please sign in to comment.