Skip to content
Browse files

Changes to invoke repair job via admin tool and validation via Entropy

  • Loading branch information...
1 parent c7a3f35 commit f3657b9542977bbb81ee914e87b91cb4a224dbe9 Chinmay Soman committed Aug 5, 2011
View
12 src/java/voldemort/VoldemortAdminTool.java
@@ -419,14 +419,24 @@ public static void main(String[] args) throws Exception {
executeAsync(nodeId, adminClient, asyncKey, asyncIds);
}
if(ops.contains("l")) {
- adminClient.rebalanceRepair(nodeId);
+ executeRebalanceRepair(nodeId, adminClient);
}
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
}
}
+ private static void executeRebalanceRepair(Integer nodeId, AdminClient adminClient) {
+ if(nodeId < 0) {
+ for(Node node: adminClient.getAdminClientCluster().getNodes()) {
+ adminClient.rebalanceRepair(node.getId());
+ }
+ } else {
+ adminClient.rebalanceRepair(nodeId);
+ }
+ }
+
public static void printHelp(PrintStream stream, OptionParser parser) throws IOException {
stream.println("Commands supported");
stream.println("------------------");
View
3 src/java/voldemort/server/scheduler/slop/RepairJob.java
@@ -30,7 +30,7 @@
public class RepairJob implements Runnable {
- private final int DELETE_BATCH = 50;
+ private final int DELETE_BATCH = 20;
private final static Logger logger = Logger.getLogger(RepairJob.class.getName());
public final static List<String> blackList = Arrays.asList("mysql", "krati", "read-only");
@@ -98,6 +98,7 @@ public void run() {
Date startTime = new Date();
boolean terminatedEarly = false;
logger.info("Started repair job at " + startTime);
+ System.out.println("Started repair job at " + startTime);
Map<String, Long> localStats = Maps.newHashMap();
for(StoreDefinition storeDef: metadataStore.getStoreDefList()) {
View
126 src/java/voldemort/utils/Entropy.java
@@ -7,6 +7,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import joptsimple.OptionParser;
@@ -40,7 +41,8 @@
/**
* Entropy constructor. Uses DEFAULT_NUM_KEYS number of keys
*
- * @param nodeId Node id. If -1, goes over all of them
+ * @param nodeId Node id. If -1, goes over all of them. For negative test
+ * nodeId must be valid.
*/
public Entropy(int nodeId) {
this.nodeId = nodeId;
@@ -50,7 +52,8 @@ public Entropy(int nodeId) {
/**
* Entropy constructor
*
- * @param nodeId Node id. If -1, goes over all of them
+ * @param nodeId Node id. If -1, goes over all of them. For negative test
+ * nodeId must be valid.
* @param numKeys Number of keys
*/
public Entropy(int nodeId, long numKeys) {
@@ -87,9 +90,16 @@ public static void main(String args[]) throws IOException {
.withRequiredArg()
.describedAs("keys")
.ofType(Long.class);
+ parser.accepts("negative-test",
+ "Check for keys that dont belong on the given nodeId are not present");
OptionSet options = parser.parse(args);
+ boolean negativeTest = false;
+ if(options.has("negative-test")) {
+ negativeTest = true;
+ }
+
if(options.has("help")) {
parser.printHelpOn(System.out);
System.exit(0);
@@ -130,7 +140,8 @@ public static void main(String args[]) throws IOException {
List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new File(storesXml));
Entropy detector = new Entropy(nodeId, numKeys);
- detector.generateEntropy(cluster, storeDefs, outputDir, opType);
+
+ detector.generateEntropy(cluster, storeDefs, outputDir, opType, negativeTest);
}
/**
@@ -147,6 +158,26 @@ public void generateEntropy(Cluster cluster,
List<StoreDefinition> storeDefs,
File storeDir,
boolean opType) throws IOException {
+ generateEntropy(cluster, storeDefs, storeDir, opType, false);
+ }
+
+ /**
+ * Run the actual entropy calculation tool
+ *
+ * @param cluster The cluster metadata
+ * @param storeDefs The list of stores
+ * @param storeDir The store directory
+ * @param opType Operation type - true ( run entropy calculator ), false (
+ * save keys )
+ * @param negativeTest Validate that the rebalanced keys are deleted from
+ * the store
+ * @throws IOException
+ */
+ public void generateEntropy(Cluster cluster,
+ List<StoreDefinition> storeDefs,
+ File storeDir,
+ boolean opType,
+ boolean negativeTest) throws IOException {
AdminClient adminClient = null;
try {
adminClient = new AdminClient(cluster,
@@ -202,20 +233,43 @@ public void generateEntropy(Cluster cluster,
}
}
} else {
+ List<Integer> partitions = cluster.getNodeById(nodeId)
+ .getPartitionIds();
+ Map<Integer, Integer> partitionMap = new HashMap<Integer, Integer>();
+ int numKeysPerPartition = (int) Math.floor(numKeys / partitions.size());
+ int numKeysStored = 0;
+
+ for(int partitionId: partitions) {
+ partitionMap.put(partitionId, 0);
+ }
+
keys = adminClient.fetchKeys(nodeId,
storeDef.getName(),
- cluster.getNodeById(nodeId)
- .getPartitionIds(),
+ partitions,
null,
false);
- for(long keyId = 0; keyId < numKeys && keys.hasNext(); keyId++) {
+ while(keys.hasNext() && numKeysStored < numKeys) {
ByteArray key = keys.next();
if(RebalanceUtils.getNodeIds(strategy.routeRequest(key.get()))
.contains(nodeId)) {
+ int targetPartition = strategy.getPartitionList(key.get())
+ .get(0);
+ int partitionCount = partitionMap.get(targetPartition);
+ if(partitionCount == numKeysPerPartition)
+ continue;
writer.write(key.length());
writer.write(key.get());
+ partitionMap.put(targetPartition, partitionCount + 1);
+ numKeysStored++;
}
}
+
+ System.out.println("Total partitions filled : " + partitions.size());
+ for(int partitionId: partitions) {
+ System.out.println("Count in partition #" + partitionId + " = "
+ + partitionMap.get(partitionId));
+ }
+
}
} finally {
@@ -225,6 +279,9 @@ public void generateEntropy(Cluster cluster,
} else {
+ if(negativeTest && nodeId == -1)
+ return;
+
if(!(storesKeyFile.exists() && storesKeyFile.canRead())) {
System.err.println("Could not find " + storeDef.getName()
+ " file to check");
@@ -247,8 +304,10 @@ public void generateEntropy(Cluster cluster,
RequestRoutingType.IGNORE_CHECKS));
}
+ long deletedKeys = 0L;
long foundKeys = 0L;
long totalKeys = 0L;
+
try {
reader = new FileInputStream(storesKeyFile);
while(reader.available() != 0) {
@@ -263,26 +322,47 @@ public void generateEntropy(Cluster cluster,
reader.read(key);
List<Node> responsibleNodes = strategy.routeRequest(key);
- boolean missingKey = false;
- for(Node node: responsibleNodes) {
- List<Versioned<byte[]>> value = socketStoresPerNode.get(node.getId())
- .get(new ByteArray(key),
- null);
-
- if(value == null || value.size() == 0) {
- missingKey = true;
- }
- }
- if(!missingKey)
- foundKeys++;
- totalKeys++;
+ if(!negativeTest) {
+ boolean missingKey = false;
+ for(Node node: responsibleNodes) {
+ List<Versioned<byte[]>> value = socketStoresPerNode.get(node.getId())
+ .get(new ByteArray(key),
+ null);
+ if(value == null || value.size() == 0) {
+ missingKey = true;
+ }
+ }
+ if(!missingKey)
+ foundKeys++;
+ totalKeys++;
+ } else {
+ if(!responsibleNodes.contains(cluster.getNodeById(nodeId))) {
+ List<Versioned<byte[]>> value = socketStoresPerNode.get(nodeId)
+ .get(new ByteArray(key),
+ null);
+
+ if(value == null || value.size() == 0) {
+ deletedKeys++;
+ }
+ totalKeys++;
+ }
+ }
}
- System.out.println("Found = " + foundKeys + " Total = " + totalKeys);
- if(foundKeys > 0 && totalKeys > 0) {
- System.out.println("%age found - " + 100.0 * (double) foundKeys
- / totalKeys);
+
+ if(!negativeTest) {
+ System.out.println("Found = " + foundKeys + " Total = " + totalKeys);
+ if(foundKeys > 0 && totalKeys > 0) {
+ System.out.println("%age found - " + 100.0 * (double) foundKeys
+ / totalKeys);
+ }
+ } else {
+ System.out.println("Deleted = " + deletedKeys + " Total = " + totalKeys);
+ if(deletedKeys > 0 && totalKeys > 0) {
+ System.out.println("%age deleted - " + 100.0 * (double) deletedKeys
+ / totalKeys);
+ }
}
} finally {
if(reader != null)

0 comments on commit f3657b9

Please sign in to comment.
Something went wrong with that request. Please try again.