Permalink
Browse files

add zoned option for restore from replicas

  • Loading branch information...
1 parent a78111d commit 425d99d186fbfd179759f2ebac557f6348583b40 Lei Gao committed May 10, 2012
@@ -208,6 +208,10 @@ public static void main(String[] args) throws Exception {
parser.accepts("backup-incremental",
"Perform an incremental backup for point-in-time recovery."
+ " By default backup has latest consistent snapshot.");
+ parser.accepts("zone", "zone id")
+ .withRequiredArg()
+ .describedAs("zone-id")
+ .ofType(Integer.class);
OptionSet options = parser.parse(args);
@@ -234,6 +238,7 @@ public static void main(String[] args) throws Exception {
String url = (String) options.valueOf("url");
Integer nodeId = CmdUtils.valueOf(options, "node", -1);
int parallelism = CmdUtils.valueOf(options, "restore", 5);
+ Integer zoneId = CmdUtils.valueOf(options, "zone", -1);
AdminClient adminClient = new AdminClient(url, new AdminClientConfig());
@@ -324,7 +329,7 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}
System.out.println("Starting restore");
- adminClient.restoreDataFromReplications(nodeId, parallelism);
+ adminClient.restoreDataFromReplications(nodeId, parallelism, zoneId);
System.out.println("Finished restore");
}
if(ops.contains("k")) {
@@ -647,6 +647,22 @@ public ByteArray computeNext() {
* @throws InterruptedException
*/
public void restoreDataFromReplications(int nodeId, int parallelTransfers) {
+ restoreDataFromReplications(nodeId, parallelTransfers, -1);
+ }
+
+ /**
+ * RestoreData from copies on other machines for the given nodeId
+ * <p>
+ * Recovery mechanism to recover and restore data actively from replicated
+ * copies in the cluster.<br>
+ *
+ * @param nodeId Id of the node to restoreData
+ * @param parallelTransfers number of transfers
+ * @param zoneId zone from which the nodes are chosen from, -1 means no zone
+ * preference
+ * @throws InterruptedException
+ */
+ public void restoreDataFromReplications(int nodeId, int parallelTransfers, int zoneId) {
ExecutorService executors = Executors.newFixedThreadPool(parallelTransfers,
new ThreadFactory() {
@@ -676,7 +692,7 @@ public Thread newThread(Runnable r) {
}
}
for(StoreDefinition def: writableStores) {
- restoreStoreFromReplication(nodeId, cluster, def, executors);
+ restoreStoreFromReplication(nodeId, cluster, def, executors, zoneId);
}
} finally {
executors.shutdown();
@@ -703,8 +719,26 @@ public Thread newThread(Runnable r) {
public Map<Integer, HashMap<Integer, List<Integer>>> getReplicationMapping(int restoringNode,
Cluster cluster,
StoreDefinition storeDef) {
+ return getReplicationMapping(restoringNode, cluster, storeDef, -1);
+ }
+
+ /**
+ * For a particular node, finds out all the [replica, partition] tuples it
+ * needs to steal in order to be brought back to normal state
+ *
+ * @param restoringNode The id of the node which needs to be restored
+ * @param cluster The cluster definition
+ * @param storeDef The store definition to use
+ * @param zoneId zone from which nodes are chosen, -1 means no zone
+ * preference
+ * @return Map of node id to map of replica type and corresponding partition
+ * list
+ */
+ public Map<Integer, HashMap<Integer, List<Integer>>> getReplicationMapping(int restoringNode,
+ Cluster cluster,
+ StoreDefinition storeDef,
+ int zoneId) {
- Map<Integer, Integer> partitionToNodeId = RebalanceUtils.getCurrentPartitionMapping(cluster);
Map<Integer, HashMap<Integer, List<Integer>>> returnMap = Maps.newHashMap();
RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef,
@@ -731,28 +765,12 @@ public Thread newThread(Runnable r) {
+ "being left in replicating list");
}
- // Pick the first element and find its position in the
- // origin replicating list
- int replicaType = extraCopyReplicatingPartitions.indexOf(replicatingPartitions.get(0));
- int partition = extraCopyReplicatingPartitions.get(0);
- int nodeId = partitionToNodeId.get(replicatingPartitions.get(0));
-
- HashMap<Integer, List<Integer>> replicaToPartitionList = null;
- if(returnMap.containsKey(nodeId)) {
- replicaToPartitionList = returnMap.get(nodeId);
- } else {
- replicaToPartitionList = Maps.newHashMap();
- returnMap.put(nodeId, replicaToPartitionList);
- }
-
- List<Integer> partitions = null;
- if(replicaToPartitionList.containsKey(replicaType)) {
- partitions = replicaToPartitionList.get(replicaType);
- } else {
- partitions = Lists.newArrayList();
- replicaToPartitionList.put(replicaType, partitions);
- }
- partitions.add(partition);
+ addDonorWithZonePreference(replicatingPartitions,
+ extraCopyReplicatingPartitions,
+ returnMap,
+ zoneId,
+ cluster,
+ storeDef);
}
}
@@ -761,6 +779,70 @@ public Thread newThread(Runnable r) {
}
/**
+ * For each partition that need to be restored, find a donor node that owns
+ * the partition AND has the same zone ID as requested. -1 means no zone
+ * preference required when finding a donor node needs to steal in order to
+ *
+ * @param remainderPartitions The replicating partitions without the one
+ * needed by the restore node
+ * @param originalPartitions The entire replicating partition list
+ * (including the one needed by the restore node)
+ * @param donorMap All donor nodes that will be fetched from
+ * @param zondId The zone from which donor nodes will be chosen from; -1
+ * means all zones are fine
+ * @param cluster The cluster metadata
+ * @param storeDef The store to be restored
+ * @return
+ */
+ private void addDonorWithZonePreference(List<Integer> remainderPartitions,
+ List<Integer> originalPartitions,
+ Map<Integer, HashMap<Integer, List<Integer>>> donorMap,
+ int zoneId,
+ Cluster cluster,
+ StoreDefinition storeDef) {
+ Map<Integer, Integer> partitionToNodeId = RebalanceUtils.getCurrentPartitionMapping(cluster);
+ int nodeId = -1;
+ int replicaType = -1;
+ int partition = -1;
+ boolean found = false;
+ int index = 0;
+
+ while(!found && index < remainderPartitions.size()) {
+ replicaType = originalPartitions.indexOf(remainderPartitions.get(index));
+ partition = originalPartitions.get(0);
+ nodeId = partitionToNodeId.get(remainderPartitions.get(index));
+ if(-1 == zoneId || cluster.getNodeById(nodeId).getZoneId() == zoneId) {
+ found = true;
+ } else {
+ index++;
+ }
+ }
+
+ if(!found) {
+ throw new VoldemortException("unable to find a node to fetch partition " + partition
+ + " of replica type " + replicaType + " for store "
+ + storeDef.getName());
+ }
+
+ HashMap<Integer, List<Integer>> replicaToPartitionList = null;
+ if(donorMap.containsKey(nodeId)) {
+ replicaToPartitionList = donorMap.get(nodeId);
+ } else {
+ replicaToPartitionList = Maps.newHashMap();
+ donorMap.put(nodeId, replicaToPartitionList);
+ }
+
+ List<Integer> partitions = null;
+ if(replicaToPartitionList.containsKey(replicaType)) {
+ partitions = replicaToPartitionList.get(replicaType);
+ } else {
+ partitions = Lists.newArrayList();
+ replicaToPartitionList.put(replicaType, partitions);
+ }
+ partitions.add(partition);
+ }
+
+ /**
* For a particular store and node, runs the replication job. This works
* only for read-write stores
*
@@ -772,13 +854,15 @@ public Thread newThread(Runnable r) {
private void restoreStoreFromReplication(final int restoringNodeId,
final Cluster cluster,
final StoreDefinition storeDef,
- final ExecutorService executorService) {
+ final ExecutorService executorService,
+ final int zoneId) {
logger.info("Restoring data for store " + storeDef.getName() + " on node "
+ restoringNodeId);
Map<Integer, HashMap<Integer, List<Integer>>> restoreMapping = getReplicationMapping(restoringNodeId,
cluster,
- storeDef);
+ storeDef,
+ zoneId);
// migrate partition
for(final Entry<Integer, HashMap<Integer, List<Integer>>> replicationEntry: restoreMapping.entrySet()) {
final int donorNodeId = replicationEntry.getKey();
@@ -790,18 +874,10 @@ public void run() {
+ restoringNodeId + " from node " + replicationEntry.getKey()
+ " partitions:" + replicationEntry.getValue());
- int migrateAsyncId = migratePartitions(donorNodeId,
- restoringNodeId,
- storeDef.getName(),
- replicationEntry.getValue(),
- null,
- null,
- false);
- waitForCompletion(restoringNodeId,
- migrateAsyncId,
- adminClientConfig.getRestoreDataTimeoutSec(),
- TimeUnit.SECONDS);
-
+ int migrateAsyncId = migratePartitions(donorNodeId, restoringNodeId, storeDef.getName(), replicationEntry.getValue(), null, null, false);
+
+ waitForCompletion(restoringNodeId, migrateAsyncId, adminClientConfig.getRestoreDataTimeoutSec(), TimeUnit.SECONDS);
+
logger.info("Restoring data for store:" + storeDef.getName()
+ " from node " + donorNodeId + " completed.");
} catch(Exception e) {
@@ -718,6 +718,9 @@ private void rebalancePerTaskTransition(final int taskId,
HashMap<Integer, List<RebalancePartitionsInfo>> donorNodeBasedPartitionsInfo = RebalanceUtils.groupPartitionsInfoByNode(rebalancePartitionPlanList,
false);
for(Entry<Integer, List<RebalancePartitionsInfo>> entries: donorNodeBasedPartitionsInfo.entrySet()) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {}
DonorBasedRebalanceTask rebalanceTask = new DonorBasedRebalanceTask(taskId,
entries.getValue(),
rebalanceConfig,

0 comments on commit 425d99d

Please sign in to comment.