Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Updated ConsistencyCheck.java for new feature

  • Loading branch information...
commit efa07700d66c63a2db3c0e799d541288f7cc9e20 1 parent 388cca1
@zhongjiewu zhongjiewu authored
Showing with 254 additions and 106 deletions.
  1. +254 −106 src/java/voldemort/utils/ConsistencyCheck.java
View
360 src/java/voldemort/utils/ConsistencyCheck.java
@@ -17,6 +17,7 @@
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
+import voldemort.cluster.Zone;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.store.StoreDefinition;
import voldemort.versioning.Occurred;
@@ -26,6 +27,46 @@
public class ConsistencyCheck {
+ private enum ConsistencyLevel {
+ FULL,
+ GREY,
+ ORANGE,
+ ANTI_DR
+ }
+
+ private static class ConsistencyCheckStats {
+
+ private long consistentKeys;
+ private long totalKeys;
+
+ public ConsistencyCheckStats() {
+ consistentKeys = 0;
+ totalKeys = 0;
+ }
+
+ public void setConsistentKeys(long count) {
+ consistentKeys += count;
+ }
+
+ public void setTotalKeys(long count) {
+ totalKeys += count;
+ }
+
+ public long getConsistentKeys() {
+ return consistentKeys;
+ }
+
+ public long getTotalKeys() {
+ return totalKeys;
+ }
+
+ public void append(ConsistencyCheckStats that) {
+ consistentKeys += that.getConsistentKeys();
+ totalKeys += that.getTotalKeys();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
/* parse options */
OptionParser parser = new OptionParser();
@@ -34,32 +75,94 @@ public static void main(String[] args) throws Exception {
.withRequiredArg()
.describedAs("bootstrap-url")
.ofType(String.class);
- parser.accepts("partition", "partition-id")
+ parser.accepts("partitions", "partition-id")
.withRequiredArg()
.describedAs("partition-id")
+ .withValuesSeparatedBy(',')
.ofType(Integer.class);
parser.accepts("store", "store name")
.withRequiredArg()
.describedAs("store-name")
.ofType(String.class);
+ parser.accepts("primary-zone", "primary zone id").withRequiredArg().ofType(Integer.class);
+ parser.accepts("verbose", "verbose");
OptionSet options = parser.parse(args);
/* validate options */
+ boolean verbose = false;
if(options.hasArgument("help")) {
printUsage();
+ return;
}
- if(!options.hasArgument("url") || !options.hasArgument("partition")
- || !options.hasArgument("store")) {
+ if(!options.hasArgument("url") || !options.hasArgument("partitions")
+ || !options.hasArgument("store") || !options.hasArgument("primary-zone")) {
printUsage();
+ return;
+ }
+ if(options.has("verbose")) {
+ verbose = true;
}
String url = (String) options.valueOf("url");
String storeName = (String) options.valueOf("store");
- Integer partitionId = (Integer) options.valueOf("partition");
+ List<Integer> partitionIds = (List<Integer>) options.valuesOf("partitions");
+ Integer primaryZoneId = (Integer) options.valueOf("primary-zone");
+
+ ConsistencyCheckStats globalStats = new ConsistencyCheckStats();
+ Map<Integer, ConsistencyCheckStats> partitionStatsMap = new HashMap<Integer, ConsistencyCheckStats>();
+ for(Integer partitionId: partitionIds) {
+ ConsistencyCheckStats partitionStats = doConsistencyCheck(storeName,
+ partitionId,
+ url,
+ primaryZoneId,
+ verbose);
+ partitionStatsMap.put(partitionId, partitionStats);
+ globalStats.append(partitionStats);
+ }
+
+ /* print stats */
+ // partition based
+ StringBuilder statsString = new StringBuilder();
+ // each partition
+ statsString.append("TYPE,Store,ParitionId,KeysConsistent,KeysTotal,Consistency\n");
+ for(Map.Entry<Integer, ConsistencyCheckStats> entry: partitionStatsMap.entrySet()) {
+ Integer partitionId = entry.getKey();
+ ConsistencyCheckStats partitionStats = entry.getValue();
+ statsString.append("STATS,");
+ statsString.append(storeName + ",");
+ statsString.append(partitionId + ",");
+ statsString.append(partitionStats.getConsistentKeys() + ",");
+ statsString.append(partitionStats.getTotalKeys() + ",");
+ statsString.append((double) (partitionStats.getConsistentKeys())
+ / (double) partitionStats.getTotalKeys());
+ statsString.append("\n");
+ }
+ // all partitions
+ statsString.append("STATS,");
+ statsString.append(storeName + ",");
+ statsString.append("aggregate,");
+ statsString.append(globalStats.getConsistentKeys() + ",");
+ statsString.append(globalStats.getTotalKeys() + ",");
+ statsString.append((double) (globalStats.getConsistentKeys())
+ / (double) globalStats.getTotalKeys());
+ statsString.append("\n");
+
+ System.out.println();
+ System.out.println(statsString.toString());
+ }
+
+ public static ConsistencyCheckStats doConsistencyCheck(String storeName,
+ Integer partitionId,
+ String url,
+ Integer primaryZoneId,
+ boolean verbose) throws Exception {
List<Integer> singlePartition = new ArrayList<Integer>();
singlePartition.add(partitionId);
/* connect to cluster */
+ if(verbose) {
+ System.out.println("Connecting to bootstrap server: " + url);
+ }
AdminClient adminClient = new AdminClient(url, new AdminClientConfig(), 0);
Cluster cluster = adminClient.getAdminClientCluster();
@@ -118,14 +221,18 @@ public static void main(String[] args) throws Exception {
}
/* print config info */
- StringBuilder configInfo = new StringBuilder();
- configInfo.append("Configuration\n");
- configInfo.append("=============\n");
- configInfo.append(" URL: " + url + "\n");
- configInfo.append("Store: " + storeName + "\n");
- configInfo.append("ParId: " + partitionId + "\n");
- configInfo.append("Nodes: " + nodeIdList.toString() + "\n");
- System.out.println(configInfo);
+ if(verbose) {
+ StringBuilder configInfo = new StringBuilder();
+ configInfo.append("TYPE,Store,PartitionId,Node,ZoneId\n");
+ for(Integer nodeId: nodeIdList) {
+ configInfo.append("CONFIG,");
+ configInfo.append(storeName + ",");
+ configInfo.append(partitionId + ",");
+ configInfo.append(nodeId + ",");
+ configInfo.append(cluster.getNodeById(nodeId).getZoneId() + "\n");
+ }
+ System.out.println(configInfo);
+ }
/* get entry Iterator from each node */
Map<Integer, Iterator<Pair<ByteArray, Versioned<byte[]>>>> nodeEntriesMap;
@@ -164,34 +271,36 @@ public static void main(String[] args) throws Exception {
ByteArray key = nodeEntry.getFirst();
Version version = nodeEntry.getSecond().getVersion();
- // try sweep last key fetched by this iterator
- if(lastFetchedKey.containsKey(nodeEntries)) {
- ByteArray lastKey = lastFetchedKey.get(nodeEntries);
- if(key != lastKey) {
- if(!fullyFetchedKeys.containsKey(lastKey)) {
- fullyFetchedKeys.put(lastKey,
- new HashSet<Iterator<Pair<ByteArray, Versioned<byte[]>>>>());
- }
- Set<Iterator<Pair<ByteArray, Versioned<byte[]>>>> lastKeyIterSet = fullyFetchedKeys.get(lastKey);
- lastKeyIterSet.add(nodeEntries);
- // sweep if fully fetched by all iterators
- if(lastKeyIterSet.size() == nodeIdList.size()) {
- // keyFetchComplete
- if(isConsistent(keyVersionNodeSetMap.get(lastKey),
- storeDefinition.getReplicationFactor())) {
- keyVersionNodeSetMap.remove(lastKey);
- preQualifiedKeys++;
- }
- fullyFetchedKeys.remove(lastKey);
- }
- }
- }
- lastFetchedKey.put(nodeEntries, key);
-
if(retentionChecker.isExpired(version)) {
expiredRecords++;
continue;
} else {
+ // try sweep last key fetched by this iterator
+ if(lastFetchedKey.containsKey(nodeEntries)) {
+ ByteArray lastKey = lastFetchedKey.get(nodeEntries);
+ if(key != lastKey) {
+ if(!fullyFetchedKeys.containsKey(lastKey)) {
+ fullyFetchedKeys.put(lastKey,
+ new HashSet<Iterator<Pair<ByteArray, Versioned<byte[]>>>>());
+ }
+ Set<Iterator<Pair<ByteArray, Versioned<byte[]>>>> lastKeyIterSet = fullyFetchedKeys.get(lastKey);
+ lastKeyIterSet.add(nodeEntries);
+ // sweep if fully fetched by all iterators
+ if(lastKeyIterSet.size() == nodeIdList.size()) {
+ // keyFetchComplete
+ ConsistencyLevel level = determineConsistency(keyVersionNodeSetMap.get(lastKey),
+ storeDefinition,
+ cluster,
+ primaryZoneId);
+ if(level == ConsistencyLevel.FULL) {
+ keyVersionNodeSetMap.remove(lastKey);
+ preQualifiedKeys++;
+ }
+ fullyFetchedKeys.remove(lastKey);
+ }
+ }
+ }
+ lastFetchedKey.put(nodeEntries, key);
// initialize key -> Map<Version, Set<nodeId>>
if(!keyVersionNodeSetMap.containsKey(key)) {
keyVersionNodeSetMap.put(key, new HashMap<Version, Set<Integer>>());
@@ -229,12 +338,11 @@ public static void main(String[] args) throws Exception {
}
}
// stats reporting
- if(System.currentTimeMillis() > lastReportTimeMs + reportPeriodMs) {
+ if(verbose && System.currentTimeMillis() > lastReportTimeMs + reportPeriodMs) {
long currentTimeMs = System.currentTimeMillis();
System.out.println("Progress Report");
System.out.println("===============");
System.out.println(" Number of records Scanned: " + numRecordsScanned);
- System.out.println(" Total number of unique keys: " + keyVersionNodeSetMap.size());
System.out.println(" Records Ignored(Retention): " + expiredRecords);
System.out.println("Recent fetch speed(records/s): "
+ (numRecordsScanned - numRecordsScannedLast)
@@ -246,91 +354,131 @@ public static void main(String[] args) throws Exception {
} while(anyNodeHasNext);
// analyzing
- System.out.println("Analyzing....");
+ if(verbose) {
+ System.out.println("Analyzing....");
+ }
cleanIneligibleKeys(keyVersionNodeSetMap, storeDefinition.getRequiredWrites());
+
+ // clean the rest of consistent keys
+ Set<ByteArray> keysToDelete = new HashSet<ByteArray>();
+ for(ByteArray key: keyVersionNodeSetMap.keySet()) {
+ ConsistencyLevel level = determineConsistency(keyVersionNodeSetMap.get(key),
+ storeDefinition,
+ cluster,
+ primaryZoneId);
+ if(level == ConsistencyLevel.FULL) {
+ keysToDelete.add(key);
+ }
+ }
+ for(ByteArray key: keysToDelete) {
+ keyVersionNodeSetMap.remove(key);
+ preQualifiedKeys++;
+ }
+
long totalKeys = keyVersionNodeSetMap.size() + preQualifiedKeys;
long totalKeysConsistent = preQualifiedKeys;
- // print some stats
- StringBuilder totalStats = new StringBuilder();
- totalStats.append("KeysTotal, KeysConsistent, Consistency\n");
- totalStats.append(totalKeys + ", " + totalKeysConsistent + ", ");
- totalStats.append((double) totalKeysConsistent / (double) totalKeys);
- totalStats.append("\n");
- System.out.println(totalStats.toString());
-
- // zone-wise consistency
- Map<Integer, Integer> zonePartialConsistentKeyCount = new HashMap<Integer, Integer>();
- for(Map.Entry<Integer, Set<Integer>> zoneToNodeSetEntry: zoneToNodeIds.entrySet()) {
- Integer zoneId = zoneToNodeSetEntry.getKey();
- Set<Integer> zoneNodeSet = zoneToNodeSetEntry.getValue();
- int partialConsistentKeyCount = 0;
+ // print inconsistent keys
+ if(verbose) {
+ StringBuilder record = new StringBuilder();
+ record.append("TYPE,Store,ParId,Key,ServerSet,VersionTS,VectorClock,ConsistentyLevel\n");
for(Map.Entry<ByteArray, Map<Version, Set<Integer>>> entry: keyVersionNodeSetMap.entrySet()) {
- boolean partialConsistent = true;
- for(Set<Integer> keyVersionNodeSet: entry.getValue().values()) {
- partialConsistent = partialConsistent
- && keyVersionNodeSet.containsAll(zoneNodeSet);
- }
- if(partialConsistent) {
- partialConsistentKeyCount++;
+ ByteArray key = entry.getKey();
+ Map<Version, Set<Integer>> versionMap = entry.getValue();
+ for(Map.Entry<Version, Set<Integer>> versionSet: versionMap.entrySet()) {
+ Version version = versionSet.getKey();
+ Set<Integer> nodeSet = versionSet.getValue();
+ record.append("BAD_KEY,");
+ record.append(storeName + ",");
+ record.append(partitionId + ",");
+ record.append(ByteUtils.toHexString(key.get()) + ",");
+ record.append(nodeSet.toString().replaceAll(", ", ";") + ",");
+ record.append(((VectorClock) version).getTimestamp() + ",");
+ record.append(version.toString()
+ .replaceAll(", ", ";")
+ .replaceAll(" ts:[0-9]*", "")
+ .replaceAll("version\\((.*)\\)", "[$1]")
+ + ",");
+ record.append(determineConsistency(versionMap,
+ storeDefinition,
+ cluster,
+ primaryZoneId).toString()
+ + "\n");
}
}
- zonePartialConsistentKeyCount.put(zoneId, partialConsistentKeyCount);
+ System.out.println(record.toString());
}
- // print zone-wise consistency result
- System.out.println("storeName,zoneId,consistency");
- for(Integer zoneId: zoneToNodeIds.keySet()) {
- long zoneConsistentKeys = totalKeysConsistent
- + zonePartialConsistentKeyCount.get(zoneId);
- System.out.println(storeName + "," + zoneId + "," + (double) (zoneConsistentKeys)
- / (double) totalKeys);
- }
+ ConsistencyCheckStats stats = new ConsistencyCheckStats();
+ stats.setConsistentKeys(totalKeysConsistent);
+ stats.setTotalKeys(totalKeys);
+
+ return stats;
}
public static void printUsage() {
- System.out.println("Usage: \n--partition <partitionId> --url <url> --store <storeName>");
+ System.out.println("Usage: \n--partitions <partitionId,partitionId..> --url <url> --store <storeName> --primary-zone <primary-zone-id>");
}
- // public static Map<Integer, Integer> cleanConsistentKeys(Map<ByteArray,
- // Map<Version, Set<Integer>>> keyVersionNodeSetMap,
- // int replicationFactor) {
- // Set<ByteArray> keysToDelete = new HashSet<ByteArray>();
- // Map<Integer, Integer> consistentVersionsCountMap = new HashMap<Integer,
- // Integer>();
- // for(Map.Entry<ByteArray, Map<Version, Set<Integer>>> entry:
- // keyVersionNodeSetMap.entrySet()) {
- // ByteArray key = entry.getKey();
- // Map<Version, Set<Integer>> versionNodeSetMap = entry.getValue();
- // Integer versionCount = versionNodeSetMap.size();
- // boolean consistent = isConsistent(versionNodeSetMap, replicationFactor);
- // if(consistent) {
- // if(consistentVersionsCountMap.containsKey(versionCount)) {
- // Integer count = consistentVersionsCountMap.get(versionCount);
- // consistentVersionsCountMap.put(versionCount, new Integer(count + 1));
- // } else {
- // consistentVersionsCountMap.put(versionCount, new Integer(1));
- // }
- // keyVersionNodeSetMap.put(key, null);
- // keysToDelete.add(key);
- // }
- // }
- // for(ByteArray deleteThisKey: keysToDelete) {
- // keyVersionNodeSetMap.remove(deleteThisKey);
- // }
- // keysToDelete = null;
- // System.gc();
- // return consistentVersionsCountMap;
- // }
-
- public static boolean isConsistent(Map<Version, Set<Integer>> versionNodeSetMap,
- int replicationFactor) {
- boolean consistent = true;
+ public static ConsistencyLevel determineConsistency(Map<Version, Set<Integer>> versionNodeSetMap,
+ StoreDefinition storeDef,
+ Cluster cluster,
+ int primaryZoneId) {
+ boolean fullyConsistent = true;
+ Version latestVersion = null;
for(Map.Entry<Version, Set<Integer>> versionNodeSetEntry: versionNodeSetMap.entrySet()) {
+ Version version = versionNodeSetEntry.getKey();
+ if(latestVersion == null
+ || ((VectorClock) latestVersion).getTimestamp() < ((VectorClock) version).getTimestamp()) {
+ latestVersion = version;
+ }
Set<Integer> nodeSet = versionNodeSetEntry.getValue();
- consistent = consistent && (nodeSet.size() == replicationFactor);
+ fullyConsistent = fullyConsistent
+ && (nodeSet.size() == storeDef.getReplicationFactor());
+ }
+ if(fullyConsistent) {
+ return ConsistencyLevel.FULL;
+ } else {
+ Set<Integer> nodeSet = versionNodeSetMap.get(latestVersion);
+ // latest write consistent, effectively consistent
+ if(nodeSet.size() == storeDef.getReplicationFactor()) {
+ return ConsistencyLevel.GREY;
+ }
+ // timeout write
+ if(nodeSet.size() <= storeDef.getRequiredWrites()) {
+ return ConsistencyLevel.GREY;
+ }
+ // DR-category: if other zone does not have as many available nodes
+ // as primary zone
+ Map<Integer, Integer> zoneToAvailableNodeCounts = new HashMap<Integer, Integer>();
+ for(Integer nodeId: nodeSet) {
+ Integer zoneId = cluster.getNodeById(nodeId).getZoneId();
+ Integer count = 0;
+ if(zoneToAvailableNodeCounts.containsKey(zoneId)) {
+ count = zoneToAvailableNodeCounts.get(zoneId);
+ }
+ count++;
+ zoneToAvailableNodeCounts.put(zoneId, count);
+ }
+ Integer primaryZoneAvailableNodeCounts = zoneToAvailableNodeCounts.get(primaryZoneId);
+ if(primaryZoneAvailableNodeCounts == null) {
+ primaryZoneAvailableNodeCounts = 0;
+ }
+ for(Zone zone: cluster.getZones()) {
+ Integer zoneId = zone.getId();
+ // if not primary zone and has less nodes available than primary
+ // zone
+ if(primaryZoneId != zoneId) {
+ if(!zoneToAvailableNodeCounts.containsKey(zoneId)
+ || zoneToAvailableNodeCounts.get(zoneId) == null
+ || zoneToAvailableNodeCounts.get(zoneId) < primaryZoneAvailableNodeCounts) {
+ return ConsistencyLevel.ANTI_DR;
+ }
+ }
+ }
+ // other intermittent inconsistent state
+ return ConsistencyLevel.ORANGE;
}
- return consistent;
}
public static void cleanIneligibleKeys(Map<ByteArray, Map<Version, Set<Integer>>> keyVersionNodeSetMap,
Please sign in to comment.
Something went wrong with that request. Please try again.