Skip to content

Commit

Permalink
Adding Admin API and test for Clean after rebalancing
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed Aug 12, 2011
1 parent 1cec22a commit 2c1c99e
Show file tree
Hide file tree
Showing 9 changed files with 1,193 additions and 157 deletions.
136 changes: 109 additions & 27 deletions clients/python/voldemort/protocol/voldemort_admin_pb2.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions config/test_config1/config/server.properties
Expand Up @@ -2,6 +2,7 @@
node.id=0 node.id=0


max.threads=100 max.threads=100
enable.repair=true


############### DB options ###################### ############### DB options ######################


Expand Down
16 changes: 13 additions & 3 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -190,6 +190,7 @@ public static void main(String[] args) throws Exception {
.describedAs("job-ids") .describedAs("job-ids")
.withValuesSeparatedBy(',') .withValuesSeparatedBy(',')
.ofType(Integer.class); .ofType(Integer.class);
parser.accepts("rebalance-clean", "Clean after rebalancing is done");


OptionSet options = parser.parse(args); OptionSet options = parser.parse(args);


Expand Down Expand Up @@ -264,10 +265,13 @@ public static void main(String[] args) throws Exception {
if(options.has("async")) { if(options.has("async")) {
ops += "b"; ops += "b";
} }
if(options.has("rebalance-clean")) {
ops += "l";
}
if(ops.length() < 1) { if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, " Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
+ "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, " + "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, "
+ "set-metadata, check-metadata, key-distribution, clear-rebalancing-metadata, async) " + "set-metadata, check-metadata, key-distribution, clear-rebalancing-metadata, async, rebalance-clean) "
+ "must be specified"); + "must be specified");
} }


Expand Down Expand Up @@ -414,6 +418,9 @@ public static void main(String[] args) throws Exception {
asyncIds = (List<Integer>) options.valuesOf("async-id"); asyncIds = (List<Integer>) options.valuesOf("async-id");
executeAsync(nodeId, adminClient, asyncKey, asyncIds); executeAsync(nodeId, adminClient, asyncKey, asyncIds);
} }
if(ops.contains("l")) {
adminClient.rebalanceRepair(nodeId);
}
} catch(Exception e) { } catch(Exception e) {
e.printStackTrace(); e.printStackTrace();
Utils.croak(e.getMessage()); Utils.croak(e.getMessage());
Expand Down Expand Up @@ -508,6 +515,8 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE
stream.println("\t\t./bin/voldemort-admin-tool.sh --async get --url [url] --node [node-id]"); stream.println("\t\t./bin/voldemort-admin-tool.sh --async get --url [url] --node [node-id]");
stream.println("\t3) Stop a list of async jobs on a particular node"); stream.println("\t3) Stop a list of async jobs on a particular node");
stream.println("\t\t./bin/voldemort-admin-tool.sh --async stop --async-id [comma-separated list of async job id] --url [url] --node [node-id]"); stream.println("\t\t./bin/voldemort-admin-tool.sh --async stop --async-id [comma-separated list of async job id] --url [url] --node [node-id]");
stream.println("\t4) Clean a node after rebalancing is done");
stream.println("\t\t./bin/voldemort-admin-tool.sh --async rebalance-clean --url [url] --node [node-id]");
stream.println(); stream.println();
stream.println("OTHERS"); stream.println("OTHERS");
stream.println("\t1) Restore a particular node completely from its replicas"); stream.println("\t1) Restore a particular node completely from its replicas");
Expand Down Expand Up @@ -665,8 +674,9 @@ private static void executeSetMetadata(Integer nodeId,
+ adminClient.getAdminClientCluster() + adminClient.getAdminClientCluster()
.getNodeById(currentNodeId) .getNodeById(currentNodeId)
.getId()); .getId());
adminClient.updateRemoteMetadata(currentNodeId, key, Versioned.value(value.toString(), adminClient.updateRemoteMetadata(currentNodeId,
updatedVersion)); key,
Versioned.value(value.toString(), updatedVersion));
} }
} }


Expand Down
28 changes: 26 additions & 2 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -31,8 +31,8 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
Expand All @@ -49,8 +49,8 @@
import voldemort.client.protocol.VoldemortFilter; import voldemort.client.protocol.VoldemortFilter;
import voldemort.client.protocol.pb.ProtoUtils; import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto; import voldemort.client.protocol.pb.VAdminProto;
import voldemort.client.protocol.pb.VProto;
import voldemort.client.protocol.pb.VAdminProto.RebalancePartitionInfoMap; import voldemort.client.protocol.pb.VAdminProto.RebalancePartitionInfoMap;
import voldemort.client.protocol.pb.VProto;
import voldemort.client.protocol.pb.VProto.RequestType; import voldemort.client.protocol.pb.VProto.RequestType;
import voldemort.client.rebalance.RebalancePartitionsInfo; import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Cluster; import voldemort.cluster.Cluster;
Expand Down Expand Up @@ -1579,6 +1579,30 @@ public void rollbackStore(int nodeId, String storeName, long pushVersion) {
return; return;
} }


/**
* Repair the stores on a rebalanced node 'nodeId'
* <p>
*
* @param nodeId The id of the node on which to do the repair
*/
public void rebalanceRepair(int nodeId) {
VAdminProto.RebalanceRepairRequest.Builder rebalanceRepairRequest = VAdminProto.RebalanceRepairRequest.newBuilder();

VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder()
.setRebalanceRepair(rebalanceRepairRequest)
.setType(VAdminProto.AdminRequestType.REBALANCE_REPAIR)
.build();
VAdminProto.AsyncOperationStatusResponse.Builder response = sendAndReceive(nodeId,
adminRequest,
VAdminProto.AsyncOperationStatusResponse.newBuilder());

if(response.hasError()) {
throwException(response.getError());
}

return;
}

/** /**
* Fetch data from directory 'storeDir' on node id * Fetch data from directory 'storeDir' on node id
* <p> * <p>
Expand Down

0 comments on commit 2c1c99e

Please sign in to comment.