From e5ddb340c7df3ee498ec5a910d921a29cb549666 Mon Sep 17 00:00:00 2001 From: Jay J Wylie Date: Fri, 8 Feb 2013 16:33:10 -0800 Subject: [PATCH] Re-commiting a series of ZWu's commits to the consistency fix in a single batch. The other commits were not made against the same master and so this is easier than trying to figure out what went wrong in the merge/rebase. --- .../voldemort/utils/ConsistencyCheck.java | 839 ++++++++++-------- test/common/voldemort/config/stores.xml | 19 + 2 files changed, 500 insertions(+), 358 deletions(-) diff --git a/src/java/voldemort/utils/ConsistencyCheck.java b/src/java/voldemort/utils/ConsistencyCheck.java index f26c7a06d5..ec7d38ba71 100644 --- a/src/java/voldemort/utils/ConsistencyCheck.java +++ b/src/java/voldemort/utils/ConsistencyCheck.java @@ -26,223 +26,43 @@ public class ConsistencyCheck { - private enum ConsistencyLevel { - FULL, - LATEST_CONSISTENT, - ORANGE - } - - private static class PrefixNode { - - private Integer prefixId; - private Node node; - - public PrefixNode(Integer prefixId, Node node) { - this.prefixId = prefixId; - this.node = node; - } - - public Node getNode() { - return node; - } - - public Integer getPrefixId() { - return prefixId; - } - - @Override - public boolean equals(Object o) { - if(this == o) - return true; - if(!(o instanceof PrefixNode)) - return false; - - PrefixNode n = (PrefixNode) o; - return prefixId.equals(n.getPrefixId()) && node.equals(n.getNode()); - } - - @Override - public String toString() { - return prefixId + "." + node.getId(); - } - - } - - private static class ConsistencyCheckStats { - - private long consistentKeys; - private long totalKeys; - - public ConsistencyCheckStats() { - consistentKeys = 0; - totalKeys = 0; - } - - public void setConsistentKeys(long count) { - consistentKeys += count; - } - - public void setTotalKeys(long count) { - totalKeys += count; - } - - public long getConsistentKeys() { - return consistentKeys; - } - - public long getTotalKeys() { - return totalKeys; - } - - public void append(ConsistencyCheckStats that) { - consistentKeys += that.getConsistentKeys(); - totalKeys += that.getTotalKeys(); - } - } - - private static class HashedValue implements Version { - - final private Version innerVersion; - final private Integer valueHash; - - public HashedValue(Versioned versioned) { - innerVersion = versioned.getVersion(); - valueHash = new FnvHashFunction().hash(versioned.getValue()); - } - - public int getValueHash() { - return valueHash; - } - - public Version getInner() { - return innerVersion; - } + private Boolean verbose = false; + private List urls; + private String storeName; + private Integer partitionId; + + private Integer retentionDays = 0; + private Integer replicationFactor = 0; + private Integer requiredWrites = 0; + private Map>>> nodeEntriesMap; + + public ConsistencyCheck(List urls, String storeName, int partitionId, boolean verbose) { + this.urls = urls; + this.storeName = storeName; + this.partitionId = partitionId; + this.verbose = verbose; - @Override - public boolean equals(Object object) { - if(this == object) - return true; - if(object == null) - return false; - if(!object.getClass().equals(HashedValue.class)) - return false; - HashedValue hash = (HashedValue) object; - boolean result = valueHash.equals(hash.getValueHash()); - return result; - } - - @Override - public int hashCode() { - return valueHash; - } - - public Occurred compare(Version v) { - return Occurred.CONCURRENTLY; - } } - @SuppressWarnings("unchecked") - public static void main(String[] args) throws Exception { - /* parse options */ - OptionParser parser = new OptionParser(); - parser.accepts("help", "print help information"); - parser.accepts("urls", "[REQUIRED] bootstrap URLs") - .withRequiredArg() - .describedAs("bootstrap-url") - .withValuesSeparatedBy(',') - .ofType(String.class); - parser.accepts("partitions", "partition-id") - .withRequiredArg() - .describedAs("partition-id") - .withValuesSeparatedBy(',') - .ofType(Integer.class); - parser.accepts("store", "store name") - .withRequiredArg() - .describedAs("store-name") - .ofType(String.class); - parser.accepts("verbose", "verbose"); - OptionSet options = parser.parse(args); - - /* validate options */ - boolean verbose = false; - if(options.hasArgument("help")) { - printUsage(); - return; - } - if(!options.hasArgument("urls") || !options.hasArgument("partitions") - || !options.hasArgument("store")) { - printUsage(); - return; - } - if(options.has("verbose")) { - verbose = true; - } - - List urls = (List) options.valuesOf("urls"); - String storeName = (String) options.valueOf("store"); - List partitionIds = (List) options.valuesOf("partitions"); - - ConsistencyCheckStats globalStats = new ConsistencyCheckStats(); - Map partitionStatsMap = new HashMap(); - for(Integer partitionId: partitionIds) { - ConsistencyCheckStats partitionStats = doConsistencyCheck(storeName, - partitionId, - urls, - verbose); - partitionStatsMap.put(partitionId, partitionStats); - globalStats.append(partitionStats); - } - - /* print stats */ - // partition based - StringBuilder statsString = new StringBuilder(); - // each partition - statsString.append("TYPE,Store,ParitionId,KeysConsistent,KeysTotal,Consistency\n"); - for(Map.Entry entry: partitionStatsMap.entrySet()) { - Integer partitionId = entry.getKey(); - ConsistencyCheckStats partitionStats = entry.getValue(); - statsString.append("STATS,"); - statsString.append(storeName + ","); - statsString.append(partitionId + ","); - statsString.append(partitionStats.getConsistentKeys() + ","); - statsString.append(partitionStats.getTotalKeys() + ","); - statsString.append((double) (partitionStats.getConsistentKeys()) - / (double) partitionStats.getTotalKeys()); - statsString.append("\n"); - } - // all partitions - statsString.append("STATS,"); - statsString.append(storeName + ","); - statsString.append("aggregate,"); - statsString.append(globalStats.getConsistentKeys() + ","); - statsString.append(globalStats.getTotalKeys() + ","); - statsString.append((double) (globalStats.getConsistentKeys()) - / (double) globalStats.getTotalKeys()); - statsString.append("\n"); - - System.out.println(); - System.out.println(statsString.toString()); - } - - public static ConsistencyCheckStats doConsistencyCheck(String storeName, - Integer partitionId, - List urls, - boolean verbose) throws Exception { + /** + * Connect to the clusters using given urls and start fetching process on + * correct nodes + * + * @throws Exception When no such store is found + */ + public void connect() throws Exception { List singlePartition = new ArrayList(); singlePartition.add(partitionId); - /* connect to cluster */ - List adminClients = new ArrayList(urls.size()); - Map>>> nodeEntriesMap; nodeEntriesMap = new HashMap>>>(); - RetentionChecker retentionChecker = null; - int leastRetentionDays = 0; - List nodeList = new ArrayList(); - Integer replicationFactor = 0; - Integer requiredWrites = null; + + List adminClients = new ArrayList(urls.size()); int urlId = 0; - for(String url: urls) { - if(verbose) { + List nodeList = new ArrayList(); + + for(String url: this.urls) { + /* connect to cluster through admin port */ + if(this.verbose) { System.out.println("Connecting to bootstrap server: " + url); } AdminClient adminClient = new AdminClient(url, new AdminClientConfig(), 0); @@ -263,15 +83,14 @@ public static ConsistencyCheckStats doConsistencyCheck(String storeName, throw new Exception("No such store found: " + storeName); } - /* construct rententionChecker */ - int retentionDays = 0; + /* find the shorted retention policy */ + int storeRetentionDays = 0; if(storeDefinition.getRetentionDays() != null) { - retentionDays = storeDefinition.getRetentionDays().intValue(); + storeRetentionDays = storeDefinition.getRetentionDays().intValue(); } - if(retentionChecker == null - || (retentionDays != 0 && retentionDays < leastRetentionDays)) { - retentionChecker = new RetentionChecker(retentionDays); - leastRetentionDays = retentionDays; + if((retentionDays == 0) + || (storeRetentionDays != 0 && storeRetentionDays < retentionDays)) { + retentionDays = storeRetentionDays; } /* make partitionId -> node mapping */ @@ -322,37 +141,48 @@ public static ConsistencyCheckStats doConsistencyCheck(String storeName, false); nodeEntriesMap.put(new PrefixNode(urlId, cluster.getNodeById(nodeId)), entries); } + + // calculate overall replication factor and required writes replicationFactor += storeDefinition.getReplicationFactor(); - if(requiredWrites == null) { + if(requiredWrites == 0) { requiredWrites = storeDefinition.getRequiredWrites(); } urlId++; } + } - /* start fetch */ + /** + * Run consistency check on connected key-value iterators + * + * @return Results in form of ConsistencyCheckStats + */ + public ConsistencyCheckStats execute() { + // retention checker + RetentionChecker retentionChecker = new RetentionChecker(retentionDays); + + // map to remember key-version-node information Map>> keyVersionNodeSetMap; - Map>>>> fullyFetchedKeys; - Map>>, ByteArray> lastFetchedKey; keyVersionNodeSetMap = new HashMap>>(); + + // variables to sweep good keys on the fly + Map>>>> fullyFetchedKeys; fullyFetchedKeys = new HashMap>>>>(); + Map>>, ByteArray> lastFetchedKey; lastFetchedKey = new HashMap>>, ByteArray>(); - long numRecordsScanned = 0; - long numRecordsScannedLast = 0; - long lastReportTimeMs = 0; - long reportPeriodMs = 5000; - long expiredRecords = 0; - long preQualifiedKeys = 0; + /* start fetch */ boolean anyNodeHasNext; + long consistentKeys = 0; + ProgressReporter reporter = new ProgressReporter(); do { anyNodeHasNext = false; - /* for each iterator */ + /* for each iterator(fetch one key at a time) */ for(Map.Entry>>> nodeEntriesMapEntry: nodeEntriesMap.entrySet()) { PrefixNode node = nodeEntriesMapEntry.getKey(); Iterator>> nodeEntries = nodeEntriesMapEntry.getValue(); if(nodeEntries.hasNext()) { anyNodeHasNext = true; - numRecordsScanned++; + reporter.recordScans(1); Pair> nodeEntry = nodeEntries.next(); ByteArray key = nodeEntry.getFirst(); Versioned versioned = nodeEntry.getSecond(); @@ -362,93 +192,81 @@ public static ConsistencyCheckStats doConsistencyCheck(String storeName, } else { version = new HashedValue(versioned); } - + // skip version if expired if(retentionChecker.isExpired(version)) { - expiredRecords++; + reporter.recordExpired(1); continue; - } else { - // try sweep last key fetched by this iterator - if(lastFetchedKey.containsKey(nodeEntries)) { - ByteArray lastKey = lastFetchedKey.get(nodeEntries); - if(!key.equals(lastKey)) { - if(!fullyFetchedKeys.containsKey(lastKey)) { - fullyFetchedKeys.put(lastKey, - new HashSet>>>()); - } - Set>>> lastKeyIterSet = fullyFetchedKeys.get(lastKey); - lastKeyIterSet.add(nodeEntries); - - // sweep if fully fetched by all iterators - if(lastKeyIterSet.size() == nodeEntriesMap.size()) { - // keyFetchComplete - ConsistencyLevel level = determineConsistency(keyVersionNodeSetMap.get(lastKey), - replicationFactor); - if(level == ConsistencyLevel.FULL - || level == ConsistencyLevel.LATEST_CONSISTENT) { - keyVersionNodeSetMap.remove(lastKey); - preQualifiedKeys++; - } - fullyFetchedKeys.remove(lastKey); + } + // try sweep last key fetched by this iterator + if(lastFetchedKey.containsKey(nodeEntries)) { + ByteArray lastKey = lastFetchedKey.get(nodeEntries); + if(!key.equals(lastKey)) { + if(!fullyFetchedKeys.containsKey(lastKey)) { + fullyFetchedKeys.put(lastKey, + new HashSet>>>()); + } + Set>>> lastKeyIterSet = fullyFetchedKeys.get(lastKey); + lastKeyIterSet.add(nodeEntries); + + // sweep if fully fetched by all iterators + if(lastKeyIterSet.size() == nodeEntriesMap.size()) { + // keyFetchComplete + ConsistencyLevel level = determineConsistency(keyVersionNodeSetMap.get(lastKey), + replicationFactor); + if(level == ConsistencyLevel.FULL + || level == ConsistencyLevel.LATEST_CONSISTENT) { + keyVersionNodeSetMap.remove(lastKey); + consistentKeys++; } + fullyFetchedKeys.remove(lastKey); } } - lastFetchedKey.put(nodeEntries, key); - // initialize key -> Map> - if(!keyVersionNodeSetMap.containsKey(key)) { - keyVersionNodeSetMap.put(key, new HashMap>()); - } - Map> versionNodeSetMap = keyVersionNodeSetMap.get(key); - // Initialize Version -> Set - if(!versionNodeSetMap.containsKey(version)) { - // decide if this is the newest version - Iterator iter = versionNodeSetMap.keySet().iterator(); - // if after any one in the map, then reset map - if(iter.hasNext()) { - Version existingVersion = iter.next(); - // existing version(s) are old - if(version.compare(existingVersion) == Occurred.AFTER) { - // swap out the old map and put a new map - versionNodeSetMap = new HashMap>(); - keyVersionNodeSetMap.put(key, versionNodeSetMap); - } else if(existingVersion.compare(version) == Occurred.AFTER) { - // ignore this version - continue; - } else if(existingVersion.compare(version) == Occurred.CONCURRENTLY) { - - } else { - System.err.print("[ERROR]Two versions are not after each other nor currently(key, v1, v2)"); - System.err.print(key + ", " + existingVersion + ", " + version); - } + } + // remember key fetch states + lastFetchedKey.put(nodeEntries, key); + // initialize key -> Map> + if(!keyVersionNodeSetMap.containsKey(key)) { + keyVersionNodeSetMap.put(key, new HashMap>()); + } + Map> versionNodeSetMap = keyVersionNodeSetMap.get(key); + // Initialize Version -> Set + if(!versionNodeSetMap.containsKey(version)) { + // decide if this is the newest version + Iterator iter = versionNodeSetMap.keySet().iterator(); + // if after any one in the map, then reset map + if(iter.hasNext()) { + Version existingVersion = iter.next(); + // existing version(s) are old + if(version.compare(existingVersion) == Occurred.AFTER) { + // swap out the old map and put a new + // map + versionNodeSetMap = new HashMap>(); + keyVersionNodeSetMap.put(key, versionNodeSetMap); + } else if(existingVersion.compare(version) == Occurred.AFTER) { + // ignore this version + continue; + } else if(existingVersion.compare(version) == Occurred.CONCURRENTLY) { + // put it into the node set + } else { + System.err.print("[ERROR]Two versions are not after each other nor currently(key, v1, v2)"); + System.err.print(key + ", " + existingVersion + ", " + version); } - // insert nodeIdSet into the map - versionNodeSetMap.put(version, new HashSet()); } - // add nodeId to set - Set nodeSet = versionNodeSetMap.get(version); - nodeSet.add(node); + // insert nodeIdSet into the map + versionNodeSetMap.put(version, new HashSet()); } + // add nodeId to set + Set nodeSet = versionNodeSetMap.get(version); + nodeSet.add(node); } } // stats reporting - if(verbose && System.currentTimeMillis() > lastReportTimeMs + reportPeriodMs) { - long currentTimeMs = System.currentTimeMillis(); - System.out.println("Progress Report"); - System.out.println("==============="); - System.out.println(" Number of records Scanned: " + numRecordsScanned); - System.out.println(" Records Ignored(Retention): " + expiredRecords); - System.out.println("Recent fetch speed(records/s): " - + (numRecordsScanned - numRecordsScannedLast) - / ((currentTimeMs - lastReportTimeMs) / 1000)); - System.out.println(); - lastReportTimeMs = currentTimeMs; - numRecordsScannedLast = numRecordsScanned; + if(verbose) { + reporter.tryReport(); } } while(anyNodeHasNext); - // analyzing - if(verbose) { - System.out.println("Analyzing...."); - } + // clean keys not sufficient for write cleanIneligibleKeys(keyVersionNodeSetMap, requiredWrites); // clean the rest of consistent keys @@ -462,12 +280,9 @@ public static ConsistencyCheckStats doConsistencyCheck(String storeName, } for(ByteArray key: keysToDelete) { keyVersionNodeSetMap.remove(key); - preQualifiedKeys++; + consistentKeys++; } - long totalKeys = keyVersionNodeSetMap.size() + preQualifiedKeys; - long totalKeysConsistent = preQualifiedKeys; - // print inconsistent keys if(verbose) { System.out.println("TYPE,Store,ParId,Key,ServerSet,VersionTS,VectorClock[,ValueHash]"); @@ -479,52 +294,269 @@ public static ConsistencyCheckStats doConsistencyCheck(String storeName, } ConsistencyCheckStats stats = new ConsistencyCheckStats(); - stats.setConsistentKeys(totalKeysConsistent); - stats.setTotalKeys(totalKeys); + stats.consistentKeys = consistentKeys; + stats.totalKeys = keyVersionNodeSetMap.size() + consistentKeys; return stats; } - public static String keyVersionToString(ByteArray key, - Map> versionMap, - String storeName, - Integer partitionId) { - StringBuilder record = new StringBuilder(); - for(Map.Entry> versionSet: versionMap.entrySet()) { - Version version = versionSet.getKey(); - Set nodeSet = versionSet.getValue(); + protected enum ConsistencyLevel { + FULL, + LATEST_CONSISTENT, + INCONSISTENT + } - record.append("BAD_KEY,"); - record.append(storeName + ","); - record.append(partitionId + ","); - record.append(ByteUtils.toHexString(key.get()) + ","); - record.append(nodeSet.toString().replace(", ", ";") + ","); - if(version instanceof VectorClock) { - record.append(((VectorClock) version).getTimestamp() + ","); - record.append(version.toString() - .replaceAll(", ", ";") - .replaceAll(" ts:[0-9]*", "") - .replaceAll("version\\((.*)\\)", "[$1]")); + protected static class PrefixNode { + + private Integer prefixId; + private Node node; + + /** + * Used to track nodes that may share the same nodeId in different + * clusters + * + * @param prefixId a prefix to be associated different clusters + * @param node the real node + */ + public PrefixNode(Integer prefixId, Node node) { + this.prefixId = prefixId; + this.node = node; + } + + public Node getNode() { + return node; + } + + public Integer getPrefixId() { + return prefixId; + } + + @Override + public boolean equals(Object o) { + if(this == o) + return true; + if(!(o instanceof PrefixNode)) + return false; + + PrefixNode n = (PrefixNode) o; + return prefixId.equals(n.getPrefixId()) && node.equals(n.getNode()); + } + + @Override + public String toString() { + return prefixId + "." + node.getId(); + } + + } + + protected static class ConsistencyCheckStats { + + public long consistentKeys; + public long totalKeys; + + /** + * Used to track consistency results + */ + public ConsistencyCheckStats() { + consistentKeys = 0; + totalKeys = 0; + } + + public void append(ConsistencyCheckStats that) { + consistentKeys += that.consistentKeys; + totalKeys += that.totalKeys; + } + } + + protected static class HashedValue implements Version { + + final private Version innerVersion; + final private Integer valueHash; + + /** + * A class to save version and value hash It is used to compare versions + * by the value hash + * + * @param versioned Versioned value with version information and value + * itself + */ + public HashedValue(Versioned versioned) { + innerVersion = versioned.getVersion(); + valueHash = new FnvHashFunction().hash(versioned.getValue()); + } + + public int getValueHash() { + return valueHash; + } + + public Version getInner() { + return innerVersion; + } + + @Override + public boolean equals(Object object) { + if(this == object) + return true; + if(object == null) + return false; + if(!object.getClass().equals(HashedValue.class)) + return false; + HashedValue hash = (HashedValue) object; + boolean result = valueHash.equals(hash.getValueHash()); + return result; + } + + @Override + public int hashCode() { + return valueHash; + } + + @Override + public Occurred compare(Version v) { + return Occurred.CONCURRENTLY; // always regard as conflict + } + } + + protected static class RetentionChecker { + + final private long bufferTimeSeconds = 600; // expire N seconds earlier + final private long expiredTimeMs; + + /** + * A checker to determine if a key is to be cleaned according to + * retention policy + * + * @param days number of days ago from now to retain keys + */ + public RetentionChecker(int days) { + if(days <= 0) { + expiredTimeMs = 0; + } else { + long now = System.currentTimeMillis(); + expiredTimeMs = now - (Time.SECONDS_PER_DAY * days - bufferTimeSeconds) + * Time.MS_PER_SECOND; } - if(version instanceof HashedValue) { - Integer hashValue = ((HashedValue) version).getValueHash(); - Version realVersion = ((HashedValue) version).getInner(); - record.append(((VectorClock) realVersion).getTimestamp() + ","); - record.append(realVersion.toString() - .replaceAll(", ", ";") - .replaceAll(" ts:[0-9]*", "") - .replaceAll("version\\((.*)\\)", "[$1],")); - record.append(hashValue); + } + + /** + * Determine if a version is expired + * + * @param v version to be checked + * @return if the version is expired according to retention policy + */ + public boolean isExpired(Version v) { + if(v instanceof VectorClock) { + return ((VectorClock) v).getTimestamp() < expiredTimeMs; + } else if(v instanceof HashedValue) { + return false; + } else { + System.err.println("[WARNING]Version type is not supported for checking expiration"); + return false; } - record.append("\n"); } - return record.toString(); } - public static void printUsage() { - System.out.println("Usage: \n--partitions --url --store "); + protected static class ProgressReporter { + + long lastReportTimeMs = 0; + long reportPeriodMs = 0; + long numRecordsScanned = 0; + long numRecordsScannedLast = 0; + long numExpiredRecords = 0; + + public ProgressReporter() { + reportPeriodMs = 5000; + } + + /** + * Progress Reporter + * + * @param intervalMs interval between printing progress in miliseconds + */ + public ProgressReporter(long intervalMs) { + reportPeriodMs = intervalMs; + } + + public void recordScans(long count) { + numRecordsScanned += count; + } + + public void recordExpired(long count) { + numExpiredRecords += count; + } + + public void tryReport() { + if(System.currentTimeMillis() > lastReportTimeMs + reportPeriodMs) { + long currentTimeMs = System.currentTimeMillis(); + StringBuilder s = new StringBuilder(); + s.append("Progress Report\n"); + s.append("===============\n"); + s.append(" Number of records Scanned: " + numRecordsScanned + "\n"); + s.append(" Records Ignored(Retention): " + numExpiredRecords + "\n"); + s.append("Recent fetch speed(records/s): " + + (numRecordsScanned - numRecordsScannedLast) + / ((currentTimeMs - lastReportTimeMs) / 1000) + "\n"); + System.out.print(s.toString()); + lastReportTimeMs = currentTimeMs; + numRecordsScannedLast = numRecordsScanned; + } + } + } + + /** + * Return args parser + * + * @return program parser + * */ + private static OptionParser getParser() { + /* parse options */ + OptionParser parser = new OptionParser(); + parser.accepts("help", "print help information"); + parser.accepts("urls", "[REQUIRED] bootstrap URLs") + .withRequiredArg() + .describedAs("bootstrap-url") + .withValuesSeparatedBy(',') + .ofType(String.class); + parser.accepts("partitions", "partition-id") + .withRequiredArg() + .describedAs("partition-id") + .withValuesSeparatedBy(',') + .ofType(Integer.class); + parser.accepts("store", "store name") + .withRequiredArg() + .describedAs("store-name") + .ofType(String.class); + parser.accepts("verbose", "verbose"); + return parser; + } + + /** + * Print Usage to STDOUT + */ + private static void printUsage() { + StringBuilder help = new StringBuilder(); + help.append("ConsistencyCheck Tool\n Scan partitions of a store by bootstrap url(s) "); + help.append("for consistency and optionally print out inconsistent keys\n"); + help.append("Options:\n"); + help.append(" Required:\n"); + help.append(" --partitions [,...]\n"); + help.append(" --urls [,...]\n"); + help.append(" --store \n"); + help.append(" Optional:\n"); + help.append(" --verbose\n"); + help.append(" --help\n"); + help.append(" Note:\n"); + help.append(" When multiple urls are used, the versions are identified by value hashes, instead of VectorClocks\n"); + System.out.print(help.toString()); } + /** + * Determine the consistency level of a key + * + * @param versionNodeSetMap A map that maps version to set of PrefixNodes + * @param replicationFactor Total replication factor for the set of clusters + * @return ConsistencyLevel Enum + */ public static ConsistencyLevel determineConsistency(Map> versionNodeSetMap, int replicationFactor) { boolean fullyConsistent = true; @@ -549,10 +581,18 @@ public static ConsistencyLevel determineConsistency(Map return ConsistencyLevel.LATEST_CONSISTENT; } // all other states inconsistent - return ConsistencyLevel.ORANGE; + return ConsistencyLevel.INCONSISTENT; } } + /** + * Determine if a key version is invalid by comparing the version's + * existance and required writes configuration + * + * @param keyVersionNodeSetMap A map that contains keys mapping to a map + * that maps versions to set of PrefixNodes + * @param requiredWrite Required Write configuration + */ public static void cleanIneligibleKeys(Map>> keyVersionNodeSetMap, int requiredWrite) { Set keysToDelete = new HashSet(); @@ -583,29 +623,112 @@ public static void cleanIneligibleKeys(Map urls = (List) options.valuesOf("urls"); + String storeName = (String) options.valueOf("store"); + List partitionIds = (List) options.valuesOf("partitions"); + + ConsistencyCheckStats globalStats = new ConsistencyCheckStats(); + Map partitionStatsMap = new HashMap(); + /* scan each partitions */ + for(Integer partitionId: partitionIds) { + ConsistencyCheck checker = new ConsistencyCheck(urls, storeName, partitionId, verbose); + checker.connect(); + ConsistencyCheckStats partitionStats = checker.execute(); + partitionStatsMap.put(partitionId, partitionStats); + globalStats.append(partitionStats); } - public boolean isExpired(Version v) { - if(v instanceof VectorClock) { - return ((VectorClock) v).getTimestamp() < expiredTimeMs; - } else if(v instanceof HashedValue) { - return false; - } else { - System.err.println("[WARNING]Version type is not supported for checking expiration"); - return false; + /* print stats */ + StringBuilder statsString = new StringBuilder(); + // each partition + statsString.append("TYPE,Store,ParitionId,KeysConsistent,KeysTotal,Consistency\n"); + for(Map.Entry entry: partitionStatsMap.entrySet()) { + Integer partitionId = entry.getKey(); + ConsistencyCheckStats partitionStats = entry.getValue(); + statsString.append("STATS,"); + statsString.append(storeName + ","); + statsString.append(partitionId + ","); + statsString.append(partitionStats.consistentKeys + ","); + statsString.append(partitionStats.totalKeys + ","); + statsString.append((double) (partitionStats.consistentKeys) + / (double) partitionStats.totalKeys); + statsString.append("\n"); + } + // all partitions + statsString.append("STATS,"); + statsString.append(storeName + ","); + statsString.append("aggregate,"); + statsString.append(globalStats.consistentKeys + ","); + statsString.append(globalStats.totalKeys + ","); + statsString.append((double) (globalStats.consistentKeys) / (double) globalStats.totalKeys); + statsString.append("\n"); + + System.out.println(); + System.out.println(statsString.toString()); + } + + /** + * Convert a key-version-nodeSet information to string + * + * @param key The key + * @param versionMap mapping versions to set of PrefixNodes + * @param storeName store's name + * @param partitionId partition scanned + * @return a string that describe the information passed in + */ + public static String keyVersionToString(ByteArray key, + Map> versionMap, + String storeName, + Integer partitionId) { + StringBuilder record = new StringBuilder(); + for(Map.Entry> versionSet: versionMap.entrySet()) { + Version version = versionSet.getKey(); + Set nodeSet = versionSet.getValue(); + + record.append("BAD_KEY,"); + record.append(storeName + ","); + record.append(partitionId + ","); + record.append(ByteUtils.toHexString(key.get()) + ","); + record.append(nodeSet.toString().replace(", ", ";") + ","); + if(version instanceof VectorClock) { + record.append(((VectorClock) version).getTimestamp() + ","); + record.append(version.toString() + .replaceAll(", ", ";") + .replaceAll(" ts:[0-9]*", "") + .replaceAll("version\\((.*)\\)", "[$1]")); + } + if(version instanceof HashedValue) { + Integer hashValue = ((HashedValue) version).getValueHash(); + Version realVersion = ((HashedValue) version).getInner(); + record.append(((VectorClock) realVersion).getTimestamp() + ","); + record.append(realVersion.toString() + .replaceAll(", ", ";") + .replaceAll(" ts:[0-9]*", "") + .replaceAll("version\\((.*)\\)", "[$1],")); + record.append(hashValue); } + record.append("\n"); } + return record.toString(); } + } diff --git a/test/common/voldemort/config/stores.xml b/test/common/voldemort/config/stores.xml index 3dba316a34..d5b8782598 100644 --- a/test/common/voldemort/config/stores.xml +++ b/test/common/voldemort/config/stores.xml @@ -232,4 +232,23 @@ java-serialization + + consistency-check + memory + client + 4 + 1 + 1 + 2 + 2 + + string + UTF-8 + + + string + UTF-8 + + 1 +