Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Refactored Consistency Check

  • Loading branch information...
commit 433ce0288798ee0f4a0bf4c7ccc0f2bce4cabbd1 1 parent 9d3fb7e
@zhongjiewu zhongjiewu authored jayjwylie committed
View
681 src/java/voldemort/utils/ConsistencyCheck.java
@@ -1,18 +1,20 @@
package voldemort.utils;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
+
+import org.apache.log4j.Logger;
+
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
@@ -26,22 +28,28 @@
public class ConsistencyCheck {
- private Boolean verbose = false;
- private List<String> urls;
- private String storeName;
- private Integer partitionId;
+ private static Logger logger = Logger.getLogger(ConsistencyCheck.class);
+ private final Boolean quiet;
+ private final List<String> urls;
+ private final String storeName;
+ private final Integer partitionId;
- private Integer retentionDays = 0;
+ private Integer retentionDays = null;
private Integer replicationFactor = 0;
private Integer requiredWrites = 0;
- private Map<PrefixNode, Iterator<Pair<ByteArray, Versioned<byte[]>>>> nodeEntriesMap;
- public ConsistencyCheck(List<String> urls, String storeName, int partitionId, boolean verbose) {
+ private List<AdminClient> adminClients;
+ private List<ClusterNode> clusterNodeList = new ArrayList<ClusterNode>();
+ private final ProgressReporter reporter = new ProgressReporter();
+ private final Map<ByteArray, Map<Version, Set<ClusterNode>>> keyVersionNodeSetMap = new HashMap<ByteArray, Map<Version, Set<ClusterNode>>>();
+ private RetentionChecker retentionChecker;
+ private KeyFetchTracker keyFetchTracker;
+
+ public ConsistencyCheck(List<String> urls, String storeName, int partitionId, boolean quiet) {
this.urls = urls;
this.storeName = storeName;
this.partitionId = partitionId;
- this.verbose = verbose;
-
+ this.quiet = quiet;
}
/**
@@ -50,105 +58,92 @@ public ConsistencyCheck(List<String> urls, String storeName, int partitionId, bo
*
* @throws Exception When no such store is found
*/
- public void connect() throws Exception {
- List<Integer> singlePartition = new ArrayList<Integer>();
- singlePartition.add(partitionId);
- nodeEntriesMap = new HashMap<PrefixNode, Iterator<Pair<ByteArray, Versioned<byte[]>>>>();
-
- List<AdminClient> adminClients = new ArrayList<AdminClient>(urls.size());
- int urlId = 0;
- List<PrefixNode> nodeList = new ArrayList<PrefixNode>();
+ public void connect() throws Exception {
+ adminClients = new ArrayList<AdminClient>(urls.size());
+ // bootstrap from two urls
+ Map<String, Cluster> clusterMap = new HashMap<String, Cluster>(urls.size());
+ Map<String, StoreDefinition> storeDefinitionMap = new HashMap<String, StoreDefinition>(urls.size());
- for(String url: this.urls) {
+ for(String url: urls) {
/* connect to cluster through admin port */
- if(this.verbose) {
- System.out.println("Connecting to bootstrap server: " + url);
+ if(logger.isInfoEnabled()) {
+ logger.info("Connecting to bootstrap server: " + url);
}
AdminClient adminClient = new AdminClient(url, new AdminClientConfig(), 0);
adminClients.add(adminClient);
+ /* get Cluster */
Cluster cluster = adminClient.getAdminClientCluster();
-
- /* find store */
+ clusterMap.put(url, cluster);
+ /* get StoreDefinition */
Versioned<List<StoreDefinition>> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0);
- List<StoreDefinition> StoreDefitions = storeDefinitions.getValue();
- StoreDefinition storeDefinition = null;
- for(StoreDefinition def: StoreDefitions) {
- if(def.getName().equals(storeName)) {
- storeDefinition = def;
- break;
- }
- }
- if(storeDefinition == null) {
- throw new Exception("No such store found: " + storeName);
- }
-
- /* find the shorted retention policy */
- int storeRetentionDays = 0;
- if(storeDefinition.getRetentionDays() != null) {
- storeRetentionDays = storeDefinition.getRetentionDays().intValue();
- }
- if((retentionDays == 0)
- || (storeRetentionDays != 0 && storeRetentionDays < retentionDays)) {
- retentionDays = storeRetentionDays;
- }
+ StoreDefinition storeDefinition = StoreDefinitionUtils.getStoreDefinitionWithName(storeDefinitions.getValue(),
+ storeName);
+ storeDefinitionMap.put(url, storeDefinition);
+ }
- /* make partitionId -> node mapping */
- SortedMap<Integer, Node> partitionToNodeMap = new TreeMap<Integer, Node>();
- Collection<Node> nodes = cluster.getNodes();
- for(Node n: nodes) {
- for(Integer partition: n.getPartitionIds()) {
- if(partitionToNodeMap.containsKey(partition))
- throw new IllegalArgumentException("Duplicate partition id " + partition
- + " in cluster configuration " + nodes);
- partitionToNodeMap.put(partition, n);
- }
- }
+ /* calculate nodes to scan */
+ for(String url: urls) {
+ StoreDefinition storeDefinition = storeDefinitionMap.get(url);
+ Cluster cluster = clusterMap.get(url);
+ Map<Integer, Integer> partitionToNodeMap = ClusterUtils.getCurrentPartitionMapping(cluster);
/* find list of nodeId hosting partition */
List<Integer> partitionList = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition,
cluster)
.getReplicatingPartitionList(partitionId);
- List<Integer> nodeIdList = new ArrayList<Integer>(partitionList.size());
for(int partition: partitionList) {
- Integer nodeId = partitionToNodeMap.get(partition).getId();
- nodeIdList.add(nodeId);
- nodeList.add(new PrefixNode(urlId, cluster.getNodeById(nodeId)));
+ Integer nodeId = partitionToNodeMap.get(partition);
+ Node node = cluster.getNodeById(nodeId);
+ clusterNodeList.add(new ClusterNode(urls.indexOf(url), node));
}
+ }
- /* print config info */
- if(verbose) {
- StringBuilder configInfo = new StringBuilder();
- configInfo.append("TYPE,Store,PartitionId,Node,ZoneId,BootstrapUrl\n");
- for(Integer nodeId: nodeIdList) {
- configInfo.append("CONFIG,");
- configInfo.append(storeName + ",");
- configInfo.append(partitionId + ",");
- configInfo.append(nodeId + ",");
- configInfo.append(cluster.getNodeById(nodeId).getZoneId() + ",");
- configInfo.append(url + "\n");
- }
- System.out.println(configInfo);
+ /* print config info */
+ if(!quiet) {
+ StringBuilder configInfo = new StringBuilder();
+ configInfo.append("TYPE,Store,PartitionId,Node,ZoneId,BootstrapUrl\n");
+ for(ClusterNode clusterNode: clusterNodeList) {
+ configInfo.append("CONFIG,");
+ configInfo.append(storeName + ",");
+ configInfo.append(partitionId + ",");
+ configInfo.append(clusterNode.getNode().getId() + ",");
+ configInfo.append(clusterNode.getNode().getZoneId() + ",");
+ configInfo.append(urls.get(clusterNode.getPrefixId()) + "\n");
}
+ for(String line: configInfo.toString().split("\n")) {
+ logger.info(line);
+ }
+ }
- /* get entry Iterator from each node */
- for(Integer nodeId: nodeIdList) {
- Iterator<Pair<ByteArray, Versioned<byte[]>>> entries;
- entries = adminClient.bulkFetchOps.fetchEntries(nodeId,
- storeName,
- singlePartition,
- null,
- false);
- nodeEntriesMap.put(new PrefixNode(urlId, cluster.getNodeById(nodeId)), entries);
+ /* calculate retention days and more */
+ for(String url: urls) {
+ StoreDefinition storeDefinition = storeDefinitionMap.get(url);
+ /* retention */
+ int storeRetentionDays = 0;
+ if(storeDefinition.getRetentionDays() != null) {
+ storeRetentionDays = storeDefinition.getRetentionDays().intValue();
+ }
+ if(retentionDays == null) {
+ retentionDays = storeRetentionDays;
+ }
+ if(retentionDays != storeRetentionDays) {
+ if(storeRetentionDays != 0 && (storeRetentionDays < retentionDays)) {
+ retentionDays = storeRetentionDays;
+ }
+ logger.warn("Retention-days is not consistent between clusters by urls. Will use the shorter.");
}
- // calculate overall replication factor and required writes
+ /* replication writes */
replicationFactor += storeDefinition.getReplicationFactor();
- if(requiredWrites == 0) {
- requiredWrites = storeDefinition.getRequiredWrites();
- }
- urlId++;
+
+ /* required writes */
+ requiredWrites += storeDefinition.getRequiredWrites();
+ }
+ if(replicationFactor != clusterNodeList.size()) {
+ logger.error("Replication factor is not consistent with number of nodes routed to.");
}
+ retentionChecker = new RetentionChecker(retentionDays);
}
/**
@@ -156,227 +151,282 @@ public void connect() throws Exception {
*
* @return Results in form of ConsistencyCheckStats
*/
- public ConsistencyCheckStats execute() {
- // retention checker
- RetentionChecker retentionChecker = new RetentionChecker(retentionDays);
-
- // map to remember key-version-node information
- Map<ByteArray, Map<Version, Set<PrefixNode>>> keyVersionNodeSetMap;
- keyVersionNodeSetMap = new HashMap<ByteArray, Map<Version, Set<PrefixNode>>>();
-
- // variables to sweep good keys on the fly
- Map<ByteArray, Set<Iterator<Pair<ByteArray, Versioned<byte[]>>>>> fullyFetchedKeys;
- fullyFetchedKeys = new HashMap<ByteArray, Set<Iterator<Pair<ByteArray, Versioned<byte[]>>>>>();
- Map<Iterator<Pair<ByteArray, Versioned<byte[]>>>, ByteArray> lastFetchedKey;
- lastFetchedKey = new HashMap<Iterator<Pair<ByteArray, Versioned<byte[]>>>, ByteArray>();
-
- /* start fetch */
- boolean anyNodeHasNext;
- long consistentKeys = 0;
- ProgressReporter reporter = new ProgressReporter();
+ public ProgressReporter execute() {
+ Map<ClusterNode, Iterator<Pair<ByteArray, Versioned<byte[]>>>> nodeFetchIteratorMap;
+ nodeFetchIteratorMap = new HashMap<ClusterNode, Iterator<Pair<ByteArray, Versioned<byte[]>>>>();
+ /* start fetch from each node */
+ for(ClusterNode clusterNode: clusterNodeList) {
+ AdminClient adminClient = adminClients.get(clusterNode.getPrefixId());
+ List<Integer> singlePartition = new ArrayList<Integer>();
+ singlePartition.add(partitionId);
+ if(logger.isDebugEnabled()) {
+ logger.debug("Start fetch request to Node[" + clusterNode.toString()
+ + "] for partition[" + partitionId + "] of store[" + storeName + "]");
+ }
+
+ Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchIterator;
+ fetchIterator = adminClient.bulkFetchOps.fetchEntries(clusterNode.getNode().getId(),
+ storeName,
+ singlePartition,
+ null,
+ false);
+ nodeFetchIteratorMap.put(clusterNode, fetchIterator);
+ }
+ keyFetchTracker = new KeyFetchTracker(clusterNodeList.size());
+
+ /* start to fetch */
+ boolean fetchFinished;
do {
- anyNodeHasNext = false;
- /* for each iterator(fetch one key at a time) */
- for(Map.Entry<PrefixNode, Iterator<Pair<ByteArray, Versioned<byte[]>>>> nodeEntriesMapEntry: nodeEntriesMap.entrySet()) {
- PrefixNode node = nodeEntriesMapEntry.getKey();
- Iterator<Pair<ByteArray, Versioned<byte[]>>> nodeEntries = nodeEntriesMapEntry.getValue();
- if(nodeEntries.hasNext()) {
- anyNodeHasNext = true;
+ fetchFinished = true;
+ for(Map.Entry<ClusterNode, Iterator<Pair<ByteArray, Versioned<byte[]>>>> nodeFetchIteratorMapEntry: nodeFetchIteratorMap.entrySet()) {
+ ClusterNode clusterNode = nodeFetchIteratorMapEntry.getKey();
+ Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchIterator = nodeFetchIteratorMapEntry.getValue();
+ if(fetchIterator.hasNext()) {
+ fetchFinished = false;
reporter.recordScans(1);
- Pair<ByteArray, Versioned<byte[]>> nodeEntry = nodeEntries.next();
- ByteArray key = nodeEntry.getFirst();
- Versioned<byte[]> versioned = nodeEntry.getSecond();
- Version version;
- if(urls.size() == 1) {
- version = nodeEntry.getSecond().getVersion();
- } else {
- version = new HashedValue(versioned);
- }
- // skip version if expired
- if(retentionChecker.isExpired(version)) {
- reporter.recordExpired(1);
- continue;
- }
+
+ Pair<ByteArray, Versioned<byte[]>> fetchedEntry = fetchIterator.next();
+ ByteArray key = fetchedEntry.getFirst();
+ Versioned<byte[]> versioned = fetchedEntry.getSecond();
+
+ // record fetch
+ recordFetch(clusterNode, key, versioned);
+
// try sweep last key fetched by this iterator
- if(lastFetchedKey.containsKey(nodeEntries)) {
- ByteArray lastKey = lastFetchedKey.get(nodeEntries);
- if(!key.equals(lastKey)) {
- if(!fullyFetchedKeys.containsKey(lastKey)) {
- fullyFetchedKeys.put(lastKey,
- new HashSet<Iterator<Pair<ByteArray, Versioned<byte[]>>>>());
- }
- Set<Iterator<Pair<ByteArray, Versioned<byte[]>>>> lastKeyIterSet = fullyFetchedKeys.get(lastKey);
- lastKeyIterSet.add(nodeEntries);
-
- // sweep if fully fetched by all iterators
- if(lastKeyIterSet.size() == nodeEntriesMap.size()) {
- // keyFetchComplete
- ConsistencyLevel level = determineConsistency(keyVersionNodeSetMap.get(lastKey),
- replicationFactor);
- if(level == ConsistencyLevel.FULL
- || level == ConsistencyLevel.LATEST_CONSISTENT) {
- keyVersionNodeSetMap.remove(lastKey);
- consistentKeys++;
- }
- fullyFetchedKeys.remove(lastKey);
- }
- }
- }
- // remember key fetch states
- lastFetchedKey.put(nodeEntries, key);
- // initialize key -> Map<Version, Set<nodeId>>
- if(!keyVersionNodeSetMap.containsKey(key)) {
- keyVersionNodeSetMap.put(key, new HashMap<Version, Set<PrefixNode>>());
- }
- Map<Version, Set<PrefixNode>> versionNodeSetMap = keyVersionNodeSetMap.get(key);
- // Initialize Version -> Set<nodeId>
- if(!versionNodeSetMap.containsKey(version)) {
- // decide if this is the newest version
- Iterator<Version> iter = versionNodeSetMap.keySet().iterator();
- // if after any one in the map, then reset map
- if(iter.hasNext()) {
- Version existingVersion = iter.next();
- // existing version(s) are old
- if(version.compare(existingVersion) == Occurred.AFTER) {
- // swap out the old map and put a new
- // map
- versionNodeSetMap = new HashMap<Version, Set<PrefixNode>>();
- keyVersionNodeSetMap.put(key, versionNodeSetMap);
- } else if(existingVersion.compare(version) == Occurred.AFTER) {
- // ignore this version
- continue;
- } else if(existingVersion.compare(version) == Occurred.CONCURRENTLY) {
- // put it into the node set
- } else {
- System.err.print("[ERROR]Two versions are not after each other nor currently(key, v1, v2)");
- System.err.print(key + ", " + existingVersion + ", " + version);
- }
- }
- // insert nodeIdSet into the map
- versionNodeSetMap.put(version, new HashSet<PrefixNode>());
- }
- // add nodeId to set
- Set<PrefixNode> nodeSet = versionNodeSetMap.get(version);
- nodeSet.add(node);
+ keyFetchTracker.recordFetch(clusterNode, key);
+ System.out.println("fetched " + new String(key.get()));
+ System.out.println("map has keys: " + keyVersionNodeSetMap.size());
+ trySweepAll();
+ System.out.println("sweeped; keys left: " + keyVersionNodeSetMap.size());
}
}
+
// stats reporting
- if(verbose) {
- reporter.tryReport();
+ if(logger.isInfoEnabled() && !quiet) {
+ String report = reporter.tryReport();
+ if(report != null) {
+ for(String line: report.split("\n")) {
+ logger.info(line);
+ }
+ }
+ }
+ } while(!fetchFinished);
+
+ /* adminClient shutdown */
+ for(AdminClient adminClient: adminClients) {
+ if(adminClient != null) {
+ adminClient.stop();
}
- } while(anyNodeHasNext);
+ }
// clean keys not sufficient for write
cleanIneligibleKeys(keyVersionNodeSetMap, requiredWrites);
- // clean the rest of consistent keys
- Set<ByteArray> keysToDelete = new HashSet<ByteArray>();
- for(ByteArray key: keyVersionNodeSetMap.keySet()) {
- ConsistencyLevel level = determineConsistency(keyVersionNodeSetMap.get(key),
- replicationFactor);
- if(level == ConsistencyLevel.FULL || level == ConsistencyLevel.LATEST_CONSISTENT) {
- keysToDelete.add(key);
+ keyFetchTracker.finishAll();
+ trySweepAll();
+
+ // print inconsistent keys
+ if(!quiet) {
+ logger.warn("TYPE,Store,ParId,Key,ServerSet,VersionTS,VectorClock[,ValueHash]");
+ for(Map.Entry<ByteArray, Map<Version, Set<ClusterNode>>> entry: keyVersionNodeSetMap.entrySet()) {
+ ByteArray key = entry.getKey();
+ Map<Version, Set<ClusterNode>> versionMap = entry.getValue();
+ logger.warn(keyVersionToString(key, versionMap, storeName, partitionId));
}
}
- for(ByteArray key: keysToDelete) {
- keyVersionNodeSetMap.remove(key);
- consistentKeys++;
+
+ reporter.recordInconsistentKey(keyVersionNodeSetMap.size());
+
+ return reporter;
+ }
+
+ public void trySweepAll() {
+ for(ByteArray finishedKey = keyFetchTracker.nextFinished(); finishedKey != null; finishedKey = keyFetchTracker.nextFinished()) {
+ if(keyVersionNodeSetMap.containsKey(finishedKey)) {
+ ConsistencyLevel level = determineConsistency(keyVersionNodeSetMap.get(finishedKey),
+ replicationFactor);
+ if(level == ConsistencyLevel.FULL || level == ConsistencyLevel.LATEST_CONSISTENT) {
+ keyVersionNodeSetMap.remove(finishedKey);
+ reporter.recordGoodKey(1);
+ }
+ }
}
+ }
- // print inconsistent keys
- // if(verbose) {
- System.out.println("TYPE,Store,ParId,Key,ServerSet,VersionTS,VectorClock[,ValueHash]");
- for(Map.Entry<ByteArray, Map<Version, Set<PrefixNode>>> entry: keyVersionNodeSetMap.entrySet()) {
- ByteArray key = entry.getKey();
- Map<Version, Set<PrefixNode>> versionMap = entry.getValue();
- System.out.print(keyVersionToString(key, versionMap, storeName, partitionId));
+ public void recordFetch(ClusterNode clusterNode, ByteArray key, Versioned<byte[]> versioned) {
+ Version version;
+ if(urls.size() == 1) {
+ version = versioned.getVersion();
+ } else {
+ version = new HashedValue(versioned);
+ }
+
+ // skip version if expired
+ if(retentionChecker.isExpired(version)) {
+ reporter.recordExpired(1);
+ return;
}
- // }
- ConsistencyCheckStats stats = new ConsistencyCheckStats();
- stats.consistentKeys = consistentKeys;
- stats.totalKeys = keyVersionNodeSetMap.size() + consistentKeys;
+ // initialize key -> Map<Version, Set<nodeId>>
+ if(!keyVersionNodeSetMap.containsKey(key)) {
+ keyVersionNodeSetMap.put(key, new HashMap<Version, Set<ClusterNode>>());
+ }
+ Map<Version, Set<ClusterNode>> versionNodeSetMap = keyVersionNodeSetMap.get(key);
- return stats;
+ // check existing version
+ if(!versionNodeSetMap.containsKey(version) && versionNodeSetMap.size() != 0) {
+ // if this version is new, sweep old version
+ // if this version is old, ignore this version
+ Version oneExistingVersion = versionNodeSetMap.keySet().iterator().next();
+ if(version.compare(oneExistingVersion) == Occurred.AFTER) {
+ versionNodeSetMap.clear();
+ } else if(oneExistingVersion.compare(version) == Occurred.AFTER) {
+ return;
+ }
+ }
+
+ if(!versionNodeSetMap.containsKey(version)) {
+ // insert nodeSet into the map
+ versionNodeSetMap.put(version, new HashSet<ClusterNode>());
+ }
+
+ // add node to set
+ versionNodeSetMap.get(version).add(clusterNode);
+ }
+
+ /**
+ * A class to track what keys has been fetched and what keys will not appear
+ * any more It is used to detect keys that will not show up any more so that
+ * existing versions can be processed
+ */
+ protected static class KeyFetchTracker {
+
+ private final Integer fetcherCount;
+ Map<ByteArray, Set<ClusterNode>> fullyFetchedKeyMap = new HashMap<ByteArray, Set<ClusterNode>>();
+ Map<ClusterNode, ByteArray> lastFetchedKey = new HashMap<ClusterNode, ByteArray>();
+ List<ByteArray> fullyFetchedKeys = new LinkedList<ByteArray>();
+
+ public KeyFetchTracker(Integer fetcherCount) {
+ this.fetcherCount = fetcherCount;
+ }
+
+ /**
+ * Record a fetched result
+ *
+ * @param clusterNode The clusterNode from which the key has been
+ * fetched
+ * @param key The key itself
+ */
+ public void recordFetch(ClusterNode clusterNode, ByteArray key) {
+ if(lastFetchedKey.containsKey(clusterNode)) {
+ ByteArray lastKey = lastFetchedKey.get(clusterNode);
+ if(!key.equals(lastKey)) {
+ if(!fullyFetchedKeyMap.containsKey(lastKey)) {
+ fullyFetchedKeyMap.put(lastKey, new HashSet<ClusterNode>());
+ }
+ Set<ClusterNode> lastKeyIterSet = fullyFetchedKeyMap.get(lastKey);
+ lastKeyIterSet.add(clusterNode);
+
+ // sweep if fully fetched by all iterators
+ if(lastKeyIterSet.size() == fetcherCount) {
+ fullyFetchedKeys.add(lastKey);
+ fullyFetchedKeyMap.remove(lastKey);
+ }
+ }
+ }
+ // remember key fetch states
+ lastFetchedKey.put(clusterNode, key);
+ }
+
+ /**
+ * mark all keys appeared as finished So that they are all in the
+ * finished keys queue
+ */
+ public void finishAll() {
+ Set<ByteArray> keySet = new HashSet<ByteArray>();
+ keySet.addAll(fullyFetchedKeyMap.keySet());
+ keySet.addAll(lastFetchedKey.values());
+ fullyFetchedKeys.addAll(keySet);
+ fullyFetchedKeyMap.clear();
+ }
+
+ /**
+ * Get a key that are completed in fetching
+ *
+ * @return key considered finished; otherwise null
+ */
+ public ByteArray nextFinished() {
+ if(fullyFetchedKeys.size() > 0) {
+ return fullyFetchedKeys.remove(0);
+ } else {
+ return null;
+ }
+ }
}
protected enum ConsistencyLevel {
FULL,
LATEST_CONSISTENT,
- INCONSISTENT
+ INCONSISTENT,
+ EXPIRED,
+ INSUFFICIENT_WRITE
}
- protected static class PrefixNode {
+ /**
+ * Used to track nodes that may share the same nodeId in different clusters
+ *
+ */
+ protected static class ClusterNode {
- private Integer prefixId;
- private Node node;
+ private final Integer clusterId;
+ private final Node node;
/**
- * Used to track nodes that may share the same nodeId in different
- * clusters
- *
- * @param prefixId a prefix to be associated different clusters
+ * @param clusterId a prefix to be associated different clusters
* @param node the real node
*/
- public PrefixNode(Integer prefixId, Node node) {
- this.prefixId = prefixId;
+ public ClusterNode(Integer clusterId, Node node) {
+ this.clusterId = clusterId;
this.node = node;
}
- public Node getNode() {
- return node;
+ public Integer getPrefixId() {
+ return clusterId;
}
- public Integer getPrefixId() {
- return prefixId;
+ public Node getNode() {
+ return node;
}
@Override
public boolean equals(Object o) {
if(this == o)
return true;
- if(!(o instanceof PrefixNode))
+ if(!(o instanceof ClusterNode))
return false;
- PrefixNode n = (PrefixNode) o;
- return prefixId.equals(n.getPrefixId()) && node.equals(n.getNode());
+ ClusterNode n = (ClusterNode) o;
+ return clusterId.equals(n.getPrefixId()) && node.equals(n.getNode());
}
@Override
public String toString() {
- return prefixId + "." + node.getId();
+ return clusterId + "." + node.getId();
}
}
- protected static class ConsistencyCheckStats {
-
- public long consistentKeys;
- public long totalKeys;
-
- /**
- * Used to track consistency results
- */
- public ConsistencyCheckStats() {
- consistentKeys = 0;
- totalKeys = 0;
- }
-
- public void append(ConsistencyCheckStats that) {
- consistentKeys += that.consistentKeys;
- totalKeys += that.totalKeys;
- }
- }
-
+ /**
+ * A class to save version and value hash It is used to compare versions by
+ * the value hash
+ *
+ */
protected static class HashedValue implements Version {
final private Version innerVersion;
final private Integer valueHash;
/**
- * A class to save version and value hash It is used to compare versions
- * by the value hash
- *
* @param versioned Versioned value with version information and value
* itself
*/
@@ -417,15 +467,17 @@ public Occurred compare(Version v) {
}
}
+ /**
+ * A checker to determine if a key is to be cleaned according to retention
+ * policy
+ *
+ */
protected static class RetentionChecker {
final private long bufferTimeSeconds = 600; // expire N seconds earlier
final private long expiredTimeMs;
/**
- * A checker to determine if a key is to be cleaned according to
- * retention policy
- *
* @param days number of days ago from now to retain keys
*/
public RetentionChecker(int days) {
@@ -433,8 +485,8 @@ public RetentionChecker(int days) {
expiredTimeMs = 0;
} else {
long now = System.currentTimeMillis();
- expiredTimeMs = now - (Time.SECONDS_PER_DAY * days - bufferTimeSeconds)
- * Time.MS_PER_SECOND;
+ long expirationTimeS = TimeUnit.DAYS.toSeconds(days) - bufferTimeSeconds;
+ expiredTimeMs = now - TimeUnit.SECONDS.toMillis(expirationTimeS);
}
}
@@ -456,6 +508,10 @@ public boolean isExpired(Version v) {
}
}
+ /**
+ * Used to record progress and statistics
+ *
+ */
protected static class ProgressReporter {
long lastReportTimeMs = 0;
@@ -463,6 +519,8 @@ public boolean isExpired(Version v) {
long numRecordsScanned = 0;
long numRecordsScannedLast = 0;
long numExpiredRecords = 0;
+ long numGoodKeys = 0;
+ long numTotalKeys = 0;
public ProgressReporter() {
reportPeriodMs = 5000;
@@ -485,22 +543,31 @@ public void recordExpired(long count) {
numExpiredRecords += count;
}
- public void tryReport() {
+ public String tryReport() {
if(System.currentTimeMillis() > lastReportTimeMs + reportPeriodMs) {
long currentTimeMs = System.currentTimeMillis();
StringBuilder s = new StringBuilder();
- s.append("Progress Report\n");
- s.append("===============\n");
- s.append(" Number of records Scanned: " + numRecordsScanned + "\n");
- s.append(" Records Ignored(Retention): " + numExpiredRecords + "\n");
- s.append("Recent fetch speed(records/s): "
- + (numRecordsScanned - numRecordsScannedLast)
- / ((currentTimeMs - lastReportTimeMs) / 1000) + "\n");
- System.out.print(s.toString());
+ s.append("=====Progress=====\n");
+ s.append("Records Scanned: " + numRecordsScanned + "\n");
+ s.append("Records Ignored: " + numExpiredRecords + " (Out of Retention)\n");
+ s.append("Last Fetch Rate: " + (numRecordsScanned - numRecordsScannedLast)
+ / ((currentTimeMs - lastReportTimeMs) / 1000) + " (records/s)\n");
lastReportTimeMs = currentTimeMs;
numRecordsScannedLast = numRecordsScanned;
+ return s.toString();
+ } else {
+ return null;
}
}
+
+ public void recordGoodKey(long count) {
+ numGoodKeys += count;
+ numTotalKeys += count;
+ }
+
+ public void recordInconsistentKey(long count) {
+ numTotalKeys += count;
+ }
}
/**
@@ -526,7 +593,7 @@ private static OptionParser getParser() {
.withRequiredArg()
.describedAs("store-name")
.ofType(String.class);
- parser.accepts("verbose", "verbose");
+ parser.accepts("quiet", "quiet");
return parser;
}
@@ -535,18 +602,25 @@ private static OptionParser getParser() {
*/
private static void printUsage() {
StringBuilder help = new StringBuilder();
- help.append("ConsistencyCheck Tool\n Scan partitions of a store by bootstrap url(s) ");
- help.append("for consistency and optionally print out inconsistent keys\n");
+ help.append("ConsistencyCheck Tool\n");
+ help.append(" Scan partitions of a store by bootstrap url(s) for consistency and\n");
+ help.append(" optionally print out inconsistent keys\n");
help.append("Options:\n");
help.append(" Required:\n");
help.append(" --partitions <partitionId>[,<partitionId>...]\n");
help.append(" --urls <url>[,<url>...]\n");
help.append(" --store <storeName>\n");
help.append(" Optional:\n");
- help.append(" --verbose\n");
+ help.append(" --quiet\n");
help.append(" --help\n");
help.append(" Note:\n");
- help.append(" When multiple urls are used, the versions are identified by value hashes, instead of VectorClocks\n");
+ help.append(" If you have two or more clusters to scan for consistency across them,\n");
+ help.append(" You will need to supply multiple bootstrap urls, one for each cluster.\n");
+ help.append(" When multiple urls are used, all versions are considered as concurrent.\n");
+ help.append(" Versioned objects from different nodes are identified by value hashes,\n");
+ help.append(" instead of VectorClocks\n");
+ help.append(" The behavior will be undefined if clusters does not share the same\n");
+ help.append(" number of partitions\n");
System.out.print(help.toString());
}
@@ -557,11 +631,11 @@ private static void printUsage() {
* @param replicationFactor Total replication factor for the set of clusters
* @return ConsistencyLevel Enum
*/
- public static ConsistencyLevel determineConsistency(Map<Version, Set<PrefixNode>> versionNodeSetMap,
+ public static ConsistencyLevel determineConsistency(Map<Version, Set<ClusterNode>> versionNodeSetMap,
int replicationFactor) {
boolean fullyConsistent = true;
Version latestVersion = null;
- for(Map.Entry<Version, Set<PrefixNode>> versionNodeSetEntry: versionNodeSetMap.entrySet()) {
+ for(Map.Entry<Version, Set<ClusterNode>> versionNodeSetEntry: versionNodeSetMap.entrySet()) {
Version version = versionNodeSetEntry.getKey();
if(version instanceof VectorClock) {
if(latestVersion == null
@@ -569,7 +643,7 @@ public static ConsistencyLevel determineConsistency(Map<Version, Set<PrefixNode>
latestVersion = version;
}
}
- Set<PrefixNode> nodeSet = versionNodeSetEntry.getValue();
+ Set<ClusterNode> nodeSet = versionNodeSetEntry.getValue();
fullyConsistent = fullyConsistent && (nodeSet.size() == replicationFactor);
}
if(fullyConsistent) {
@@ -593,17 +667,17 @@ public static ConsistencyLevel determineConsistency(Map<Version, Set<PrefixNode>
* that maps versions to set of PrefixNodes
* @param requiredWrite Required Write configuration
*/
- public static void cleanIneligibleKeys(Map<ByteArray, Map<Version, Set<PrefixNode>>> keyVersionNodeSetMap,
+ public static void cleanIneligibleKeys(Map<ByteArray, Map<Version, Set<ClusterNode>>> keyVersionNodeSetMap,
int requiredWrite) {
Set<ByteArray> keysToDelete = new HashSet<ByteArray>();
- for(Map.Entry<ByteArray, Map<Version, Set<PrefixNode>>> entry: keyVersionNodeSetMap.entrySet()) {
+ for(Map.Entry<ByteArray, Map<Version, Set<ClusterNode>>> entry: keyVersionNodeSetMap.entrySet()) {
Set<Version> versionsToDelete = new HashSet<Version>();
ByteArray key = entry.getKey();
- Map<Version, Set<PrefixNode>> versionNodeSetMap = entry.getValue();
+ Map<Version, Set<ClusterNode>> versionNodeSetMap = entry.getValue();
// mark version for deletion if not enough writes
- for(Map.Entry<Version, Set<PrefixNode>> versionNodeSetEntry: versionNodeSetMap.entrySet()) {
- Set<PrefixNode> nodeSet = versionNodeSetEntry.getValue();
+ for(Map.Entry<Version, Set<ClusterNode>> versionNodeSetEntry: versionNodeSetMap.entrySet()) {
+ Set<ClusterNode> nodeSet = versionNodeSetEntry.getValue();
if(nodeSet.size() < requiredWrite) {
versionsToDelete.add(versionNodeSetEntry.getKey());
}
@@ -628,7 +702,7 @@ public static void main(String[] args) throws Exception {
OptionSet options = getParser().parse(args);
/* validate options */
- boolean verbose = false;
+ boolean quiet = false;
if(options.hasArgument("help")) {
printUsage();
return;
@@ -638,52 +712,54 @@ public static void main(String[] args) throws Exception {
printUsage();
return;
}
- if(options.has("verbose")) {
- verbose = true;
+ if(options.has("quiet")) {
+ quiet = true;
}
List<String> urls = (List<String>) options.valuesOf("urls");
String storeName = (String) options.valueOf("store");
List<Integer> partitionIds = (List<Integer>) options.valuesOf("partitions");
- ConsistencyCheckStats globalStats = new ConsistencyCheckStats();
- Map<Integer, ConsistencyCheckStats> partitionStatsMap = new HashMap<Integer, ConsistencyCheckStats>();
+ Map<Integer, ProgressReporter> partitionStatsMap = new HashMap<Integer, ProgressReporter>();
+ long numGoodKeys = 0;
+ long numTotalKeys = 0;
/* scan each partitions */
for(Integer partitionId: partitionIds) {
- ConsistencyCheck checker = new ConsistencyCheck(urls, storeName, partitionId, verbose);
+ ConsistencyCheck checker = new ConsistencyCheck(urls, storeName, partitionId, quiet);
checker.connect();
- ConsistencyCheckStats partitionStats = checker.execute();
- partitionStatsMap.put(partitionId, partitionStats);
- globalStats.append(partitionStats);
+ ProgressReporter reporter = checker.execute();
+ partitionStatsMap.put(partitionId, reporter);
+ numGoodKeys += reporter.numGoodKeys;
+ numTotalKeys += reporter.numTotalKeys;
}
/* print stats */
StringBuilder statsString = new StringBuilder();
// each partition
statsString.append("TYPE,Store,ParitionId,KeysConsistent,KeysTotal,Consistency\n");
- for(Map.Entry<Integer, ConsistencyCheckStats> entry: partitionStatsMap.entrySet()) {
+ for(Map.Entry<Integer, ProgressReporter> entry: partitionStatsMap.entrySet()) {
Integer partitionId = entry.getKey();
- ConsistencyCheckStats partitionStats = entry.getValue();
+ ProgressReporter reporter = entry.getValue();
statsString.append("STATS,");
statsString.append(storeName + ",");
statsString.append(partitionId + ",");
- statsString.append(partitionStats.consistentKeys + ",");
- statsString.append(partitionStats.totalKeys + ",");
- statsString.append((double) (partitionStats.consistentKeys)
- / (double) partitionStats.totalKeys);
+ statsString.append(reporter.numGoodKeys + ",");
+ statsString.append(reporter.numTotalKeys + ",");
+ statsString.append((double) (reporter.numGoodKeys) / (double) reporter.numTotalKeys);
statsString.append("\n");
}
// all partitions
statsString.append("STATS,");
statsString.append(storeName + ",");
statsString.append("aggregate,");
- statsString.append(globalStats.consistentKeys + ",");
- statsString.append(globalStats.totalKeys + ",");
- statsString.append((double) (globalStats.consistentKeys) / (double) globalStats.totalKeys);
+ statsString.append(numGoodKeys + ",");
+ statsString.append(numTotalKeys + ",");
+ statsString.append((double) (numGoodKeys) / (double) numTotalKeys);
statsString.append("\n");
- System.out.println();
- System.out.println(statsString.toString());
+ for(String line: statsString.toString().split("\n")) {
+ logger.info(line);
+ }
}
/**
@@ -696,13 +772,13 @@ public static void main(String[] args) throws Exception {
* @return a string that describe the information passed in
*/
public static String keyVersionToString(ByteArray key,
- Map<Version, Set<PrefixNode>> versionMap,
+ Map<Version, Set<ClusterNode>> versionMap,
String storeName,
Integer partitionId) {
StringBuilder record = new StringBuilder();
- for(Map.Entry<Version, Set<PrefixNode>> versionSet: versionMap.entrySet()) {
+ for(Map.Entry<Version, Set<ClusterNode>> versionSet: versionMap.entrySet()) {
Version version = versionSet.getKey();
- Set<PrefixNode> nodeSet = versionSet.getValue();
+ Set<ClusterNode> nodeSet = versionSet.getValue();
record.append("BAD_KEY,");
record.append(storeName + ",");
@@ -726,7 +802,6 @@ public static String keyVersionToString(ByteArray key,
.replaceAll("version\\((.*)\\)", "[$1],"));
record.append(hashValue);
}
- record.append("\n");
}
return record.toString();
}
View
291 test/unit/voldemort/utils/ConsistencyCheckTest.java
@@ -3,6 +3,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
@@ -13,6 +14,7 @@
import java.util.Properties;
import java.util.Set;
+import org.junit.Before;
import org.junit.Test;
import voldemort.ServerTestUtils;
@@ -26,125 +28,121 @@
import voldemort.store.StoreDefinition;
import voldemort.store.socket.SocketStoreFactory;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
-import voldemort.utils.ConsistencyCheck.ConsistencyCheckStats;
+import voldemort.utils.ConsistencyCheck.ClusterNode;
import voldemort.utils.ConsistencyCheck.HashedValue;
-import voldemort.utils.ConsistencyCheck.PrefixNode;
+import voldemort.utils.ConsistencyCheck.KeyFetchTracker;
+import voldemort.utils.ConsistencyCheck.ProgressReporter;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
public class ConsistencyCheckTest {
+ final String STORE_NAME = "consistency-check";
+ final String STORES_XML = "test/common/voldemort/config/stores.xml";
+
+ Node n1 = new Node(1, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
+ Node n1_dup = new Node(1, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
+ Node n2 = new Node(2, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
+ Node n3 = new Node(3, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
+ Node n4 = new Node(4, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
+ ClusterNode cn0_1 = new ClusterNode(0, n1);
+ ClusterNode cn0_1_dup = new ClusterNode(0, n1);
+ ClusterNode cn1_1dup = new ClusterNode(1, n1_dup);
+ ClusterNode cn0_2 = new ClusterNode(0, n2);
+ ClusterNode cn0_3 = new ClusterNode(0, n3);
+ ClusterNode cn0_4 = new ClusterNode(0, n4);
+ ClusterNode cn1_2 = new ClusterNode(1, n2); // 1.1
+
+ byte[] value1 = { 0, 1, 2, 3, 4 };
+ byte[] value2 = { 0, 1, 2, 3, 5 };
+ byte[] value3 = { 0, 1, 2, 3, 6 };
+ byte[] value4 = { 0, 1, 2, 3, 7 };
+ Versioned<byte[]> versioned1 = new Versioned<byte[]>(value1);
+ Versioned<byte[]> versioned2 = new Versioned<byte[]>(value2);
+ Version hv1 = new ConsistencyCheck.HashedValue(versioned1);
+ Version hv1_dup = new ConsistencyCheck.HashedValue(versioned1);
+ Version hv2 = new ConsistencyCheck.HashedValue(versioned2);
+
+ long now = System.currentTimeMillis();
+ Version vc1 = new VectorClock(now - Time.MS_PER_DAY);
+ Version vc2 = new VectorClock(now);
+ Version hv3 = new ConsistencyCheck.HashedValue(new Versioned<byte[]>(value1));
+ Version vc3 = new VectorClock(now - Time.MS_PER_HOUR * 24 + 500 * Time.MS_PER_SECOND);
+
+ // make set
+ Set<ConsistencyCheck.ClusterNode> setFourNodes = new HashSet<ConsistencyCheck.ClusterNode>();
+ Set<ConsistencyCheck.ClusterNode> setThreeNodes = new HashSet<ConsistencyCheck.ClusterNode>();
+
+ @Before
+ public void setUp() {
+ setFourNodes.add(cn0_1);
+ setFourNodes.add(cn0_2);
+ setFourNodes.add(cn0_3);
+ setFourNodes.add(cn0_4);
+ setThreeNodes.add(cn0_1);
+ setThreeNodes.add(cn0_2);
+ setThreeNodes.add(cn0_3);
+ }
+
@Test
- public void testPrefixNode() {
- Node n1 = new Node(1, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- Node n2 = new Node(1, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- Node n3 = new Node(2, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- ConsistencyCheck.PrefixNode pn1 = new ConsistencyCheck.PrefixNode(0, n1);
- ConsistencyCheck.PrefixNode pn1dup = new ConsistencyCheck.PrefixNode(0, n1);
- ConsistencyCheck.PrefixNode pn2 = new ConsistencyCheck.PrefixNode(1, n2);
- ConsistencyCheck.PrefixNode pn3 = new ConsistencyCheck.PrefixNode(0, n3);
+ public void testClusterNode() {
// test getter
- assertEquals(pn1.getNode(), n1);
- assertEquals(pn2.getNode(), n2);
- assertEquals(pn3.getNode(), n3);
- assertEquals(new Integer(0), pn1.getPrefixId());
- assertEquals(new Integer(1), pn2.getPrefixId());
- assertEquals(new Integer(0), pn3.getPrefixId());
+ assertEquals(cn0_1.getNode(), n1);
+ assertEquals(cn1_1dup.getNode(), n1_dup);
+ assertEquals(cn0_2.getNode(), n2);
+ assertEquals(new Integer(0), cn0_1.getPrefixId());
+ assertEquals(new Integer(1), cn1_1dup.getPrefixId());
+ assertEquals(new Integer(0), cn0_2.getPrefixId());
// test equals function
- assertTrue(pn1.equals(pn1dup));
- assertFalse(pn2.equals(pn1));
- assertFalse(pn3.equals(pn1));
- assertFalse(pn3.equals(pn2));
+ assertTrue(cn0_1.equals(cn0_1_dup));
+ assertFalse(cn1_1dup.equals(cn0_1));
+ assertFalse(cn0_2.equals(cn0_1));
+ assertFalse(cn0_2.equals(cn1_1dup));
// test toString function
- assertEquals("0.1", pn1.toString());
- assertEquals("1.1", pn2.toString());
- assertEquals("0.2", pn3.toString());
+ assertEquals("0.1", cn0_1.toString());
+ assertEquals("1.1", cn1_1dup.toString());
+ assertEquals("0.2", cn0_2.toString());
}
@Test
public void testHashedValue() {
- byte[] value1 = { 0, 1, 2, 3, 4 };
- byte[] value2 = { 0, 1, 2, 3, 5 };
- Versioned<byte[]> versioned1 = new Versioned<byte[]>(value1);
- Versioned<byte[]> versioned2 = new Versioned<byte[]>(value2);
- Version v1 = new ConsistencyCheck.HashedValue(versioned1);
- Version v2 = new ConsistencyCheck.HashedValue(versioned1);
- Version v3 = new ConsistencyCheck.HashedValue(versioned2);
-
- assertTrue(v1.equals(v2));
- assertEquals(v1.hashCode(), v2.hashCode());
- assertFalse(v1.hashCode() == v3.hashCode());
-
- assertEquals(versioned1.getVersion(), ((ConsistencyCheck.HashedValue) v1).getInner());
- assertEquals(((ConsistencyCheck.HashedValue) v1).getValueHash(), v1.hashCode());
+
+ assertTrue(hv1.equals(hv1_dup));
+ assertEquals(hv1.hashCode(), hv1_dup.hashCode());
+ assertFalse(hv1.hashCode() == hv2.hashCode());
+ assertFalse(hv1.equals(hv2));
+ assertFalse(hv1.equals(null));
+ assertFalse(hv1.equals(new Versioned<byte[]>(null)));
+ assertFalse(hv1.equals(new Integer(0)));
+
+ assertEquals(versioned1.getVersion(), ((ConsistencyCheck.HashedValue) hv1).getInner());
+ assertEquals(((ConsistencyCheck.HashedValue) hv1).getValueHash(), hv1.hashCode());
}
@Test
public void testRetentionChecker() {
- byte[] value = { 0, 1, 2, 3, 5 };
- long now = System.currentTimeMillis();
- Version v1 = new VectorClock(now - Time.MS_PER_DAY);
- Version v2 = new VectorClock(now);
- Version v3 = new ConsistencyCheck.HashedValue(new Versioned<byte[]>(value));
- Version v4 = new VectorClock(now - Time.MS_PER_HOUR * 24 + 500 * Time.MS_PER_SECOND);
ConsistencyCheck.RetentionChecker rc1 = new ConsistencyCheck.RetentionChecker(0);
ConsistencyCheck.RetentionChecker rc2 = new ConsistencyCheck.RetentionChecker(1);
- assertFalse(rc1.isExpired(v1));
- assertFalse(rc1.isExpired(v2));
- assertFalse(rc1.isExpired(v3));
- assertTrue(rc2.isExpired(v1));
- assertFalse(rc2.isExpired(v2));
- assertFalse(rc2.isExpired(v3));
- assertTrue(rc2.isExpired(v4));
+ assertFalse(rc1.isExpired(vc1));
+ assertFalse(rc1.isExpired(vc2));
+ assertFalse(rc1.isExpired(hv3));
+ assertFalse(rc1.isExpired(vc3));
+ assertTrue(rc2.isExpired(vc1));
+ assertFalse(rc2.isExpired(vc2));
+ assertFalse(rc2.isExpired(hv3));
+ assertTrue(rc2.isExpired(vc3));
}
@Test
- public void testConsistencyCheckStats() {
- ConsistencyCheck.ConsistencyCheckStats stats1 = new ConsistencyCheck.ConsistencyCheckStats();
- ConsistencyCheck.ConsistencyCheckStats stats2 = new ConsistencyCheck.ConsistencyCheckStats();
-
- assertEquals(0, stats1.consistentKeys);
- assertEquals(0, stats1.totalKeys);
-
- stats1.consistentKeys = 500;
- stats1.totalKeys = 2000;
- stats2.consistentKeys = 600;
- stats2.totalKeys = 2400;
-
- stats1.append(stats2);
- assertEquals(1100, stats1.consistentKeys);
- assertEquals(4400, stats1.totalKeys);
- }
-
- @Test
- public void testDetermineConsistency() {
- Node n1 = new Node(1, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- Node n2 = new Node(2, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- Node n3 = new Node(3, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- Node n4 = new Node(4, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- PrefixNode pn1 = new PrefixNode(0, n1);
- PrefixNode pn2 = new PrefixNode(0, n2);
- PrefixNode pn3 = new PrefixNode(0, n3);
- PrefixNode pn4 = new PrefixNode(0, n4);
- Map<Version, Set<ConsistencyCheck.PrefixNode>> versionNodeSetMap = new HashMap<Version, Set<ConsistencyCheck.PrefixNode>>();
+ public void testDetermineConsistencyVectorClock() {
+ Map<Version, Set<ConsistencyCheck.ClusterNode>> versionNodeSetMap = new HashMap<Version, Set<ConsistencyCheck.ClusterNode>>();
int replicationFactor = 4;
- // make set
- Set<ConsistencyCheck.PrefixNode> setFourNodes = new HashSet<ConsistencyCheck.PrefixNode>();
- setFourNodes.add(pn1);
- setFourNodes.add(pn2);
- setFourNodes.add(pn3);
- setFourNodes.add(pn4);
- Set<ConsistencyCheck.PrefixNode> setThreeNodes = new HashSet<ConsistencyCheck.PrefixNode>();
- setFourNodes.add(pn1);
- setFourNodes.add(pn2);
- setFourNodes.add(pn3);
-
// Version is vector clock
Version v1 = new VectorClock();
((VectorClock) v1).incrementVersion(1, 100000001);
@@ -196,12 +194,24 @@ public void testDetermineConsistency() {
versionNodeSetMap.put(v3, setThreeNodes);
assertEquals(ConsistencyCheck.ConsistencyLevel.INCONSISTENT,
ConsistencyCheck.determineConsistency(versionNodeSetMap, replicationFactor));
+ }
+
+ public void testDetermineConsistencyHashValue() {
+ Map<Version, Set<ConsistencyCheck.ClusterNode>> versionNodeSetMap = new HashMap<Version, Set<ConsistencyCheck.ClusterNode>>();
+ int replicationFactor = 4;
+
+ // vector clocks
+ Version v1 = new VectorClock();
+ ((VectorClock) v1).incrementVersion(1, 100000001);
+ ((VectorClock) v1).incrementVersion(2, 100000003);
+ Version v2 = new VectorClock();
+ ((VectorClock) v2).incrementVersion(1, 100000001);
+ ((VectorClock) v2).incrementVersion(3, 100000002);
+ Version v3 = new VectorClock();
+ ((VectorClock) v3).incrementVersion(1, 100000001);
+ ((VectorClock) v3).incrementVersion(4, 100000001);
// Version is HashedValue
- // Version is vector clock
- byte[] value1 = { 0, 1, 2, 3, 4 };
- byte[] value2 = { 0, 1, 2, 3, 5 };
- byte[] value3 = { 0, 1, 2, 3, 6 };
Versioned<byte[]> versioned1 = new Versioned<byte[]>(value1, v1);
Versioned<byte[]> versioned2 = new Versioned<byte[]>(value2, v2);
Versioned<byte[]> versioned3 = new Versioned<byte[]>(value3, v3);
@@ -241,12 +251,6 @@ public void testDetermineConsistency() {
@Test
public void testCleanInlegibleKeys() {
- // nodes
- Node n1 = new Node(1, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- Node n2 = new Node(2, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- PrefixNode pn1 = new PrefixNode(0, n1);
- PrefixNode pn2 = new PrefixNode(0, n2);
-
// versions
Version v1 = new VectorClock();
((VectorClock) v1).incrementVersion(1, 100000001);
@@ -255,16 +259,15 @@ public void testCleanInlegibleKeys() {
((VectorClock) v2).incrementVersion(1, 100000002);
// setup
- Map<ByteArray, Map<Version, Set<PrefixNode>>> map = new HashMap<ByteArray, Map<Version, Set<PrefixNode>>>();
- Map<Version, Set<PrefixNode>> nodeSetMap = new HashMap<Version, Set<PrefixNode>>();
- Set<PrefixNode> oneNodeSet = new HashSet<PrefixNode>();
- oneNodeSet.add(pn1);
- Set<PrefixNode> twoNodeSet = new HashSet<PrefixNode>();
- twoNodeSet.add(pn1);
- twoNodeSet.add(pn2);
+ Map<ByteArray, Map<Version, Set<ClusterNode>>> map = new HashMap<ByteArray, Map<Version, Set<ClusterNode>>>();
+ Map<Version, Set<ClusterNode>> nodeSetMap = new HashMap<Version, Set<ClusterNode>>();
+ Set<ClusterNode> oneNodeSet = new HashSet<ClusterNode>();
+ oneNodeSet.add(cn0_1);
+ Set<ClusterNode> twoNodeSet = new HashSet<ClusterNode>();
+ twoNodeSet.add(cn0_1);
+ twoNodeSet.add(cn0_2);
int requiredWrite = 2;
- byte[] keybytes1 = { 1, 2, 3, 4, 5 };
- ByteArray key1 = new ByteArray(keybytes1);
+ ByteArray key1 = new ByteArray(value1);
// delete one key
map.clear();
@@ -294,31 +297,24 @@ public void testCleanInlegibleKeys() {
public void testKeyVersionToString() {
byte[] keyBytes = { 0, 1, 2, 17, 4 };
ByteArray key = new ByteArray(keyBytes);
- byte[] value1 = { 0, 1, 2, 3, 4 };
long now = System.currentTimeMillis();
Version v1 = new VectorClock(now);
Version v2 = new VectorClock(now + 1);
Versioned<byte[]> versioned = new Versioned<byte[]>(value1, v1);
// make Prefix Nodes
- Node n1 = new Node(1, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- Node n2 = new Node(1, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- Node n3 = new Node(2, "localhost", 10000, 10001, 10002, 0, new ArrayList<Integer>());
- ConsistencyCheck.PrefixNode pn1 = new ConsistencyCheck.PrefixNode(0, n1); // 0.1
- ConsistencyCheck.PrefixNode pn2 = new ConsistencyCheck.PrefixNode(1, n2); // 1.1
- ConsistencyCheck.PrefixNode pn3 = new ConsistencyCheck.PrefixNode(0, n3); // 0.2
- Set<PrefixNode> set = new HashSet<PrefixNode>();
- set.add(pn1);
- set.add(pn2);
- set.add(pn3);
+ Set<ClusterNode> set = new HashSet<ClusterNode>();
+ set.add(cn0_1);
+ set.add(cn1_2);
+ set.add(cn0_3);
// test vector clock
- Map<Version, Set<PrefixNode>> mapVector = new HashMap<Version, Set<PrefixNode>>();
+ Map<Version, Set<ClusterNode>> mapVector = new HashMap<Version, Set<ClusterNode>>();
mapVector.put(v1, set);
((VectorClock) v1).incrementVersion(1, now);
String sVector = ConsistencyCheck.keyVersionToString(key, mapVector, "testStore", 99);
assertEquals("BAD_KEY,testStore,99,0001021104," + set.toString().replace(", ", ";") + ","
- + now + ",[1:1]\n", sVector);
+ + now + ",[1:1]", sVector);
// test two lines
((VectorClock) v2).incrementVersion(1, now);
@@ -326,29 +322,62 @@ public void testKeyVersionToString() {
mapVector.put(v2, set);
String sVector2 = ConsistencyCheck.keyVersionToString(key, mapVector, "testStore", 99);
String s1 = "BAD_KEY,testStore,99,0001021104," + set.toString().replace(", ", ";") + ","
- + now + ",[1:1]\n";
+ + now + ",[1:1]";
String s2 = "BAD_KEY,testStore,99,0001021104," + set.toString().replace(", ", ";") + ","
- + (now + 1) + ",[1:2]\n";
+ + (now + 1) + ",[1:2]";
assertTrue(sVector2.equals(s1 + s2) || sVector2.equals(s2 + s1));
// test value hash
Version v3 = new HashedValue(versioned);
- Map<Version, Set<PrefixNode>> mapHashed = new HashMap<Version, Set<PrefixNode>>();
+ Map<Version, Set<ClusterNode>> mapHashed = new HashMap<Version, Set<ClusterNode>>();
mapHashed.put(v3, set);
assertEquals("BAD_KEY,testStore,99,0001021104," + set.toString().replace(", ", ";") + ","
- + now + ",[1:1],-1172398097\n",
+ + now + ",[1:1],-1172398097",
ConsistencyCheck.keyVersionToString(key, mapHashed, "testStore", 99));
}
@Test
+ public void testKeyFetchTracker() {
+ KeyFetchTracker tracker = new KeyFetchTracker(4);
+ tracker.recordFetch(cn0_1, new ByteArray(value1));
+ tracker.recordFetch(cn0_2, new ByteArray(value1));
+ tracker.recordFetch(cn0_3, new ByteArray(value1));
+ tracker.recordFetch(cn0_4, new ByteArray(value1));
+ tracker.recordFetch(cn0_1, new ByteArray(value2));
+ tracker.recordFetch(cn0_2, new ByteArray(value2));
+ tracker.recordFetch(cn0_3, new ByteArray(value2));
+ assertNull(tracker.nextFinished());
+ tracker.recordFetch(cn0_4, new ByteArray(value2));
+ assertEquals(new ByteArray(value1), tracker.nextFinished());
+ assertNull(tracker.nextFinished());
+ // multiple fetch on same node same key
+ tracker.recordFetch(cn0_1, new ByteArray(value3));
+ tracker.recordFetch(cn0_2, new ByteArray(value3));
+ tracker.recordFetch(cn0_3, new ByteArray(value3));
+ tracker.recordFetch(cn0_4, new ByteArray(value3));
+ tracker.recordFetch(cn0_4, new ByteArray(value3));
+ tracker.recordFetch(cn0_4, new ByteArray(value3));
+ assertEquals(new ByteArray(value2), tracker.nextFinished());
+
+ tracker.recordFetch(cn0_1, new ByteArray(value4));
+ tracker.recordFetch(cn0_2, new ByteArray(value4));
+ tracker.recordFetch(cn0_3, new ByteArray(value4));
+
+ assertNull(tracker.nextFinished());
+
+ tracker.finishAll();
+ assertEquals(new ByteArray(value3), tracker.nextFinished());
+ assertEquals(new ByteArray(value4), tracker.nextFinished());
+ assertNull(tracker.nextFinished());
+ }
+
+ @Test
public void testOnePartitionEndToEnd() throws Exception {
long now = System.currentTimeMillis();
// setup four nodes with one store and one partition
- final String STORE_NAME = "consistency-check";
- final String STORES_XML = "test/common/voldemort/config/stores.xml";
final SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2,
10000,
100000,
@@ -480,11 +509,11 @@ public void testOnePartitionEndToEnd() throws Exception {
List<String> urls = new ArrayList<String>();
urls.add(bootstrapUrl);
ConsistencyCheck checker = new ConsistencyCheck(urls, STORE_NAME, 0, true);
- ConsistencyCheckStats partitionStats = null;
+ ProgressReporter reporter = null;
checker.connect();
- partitionStats = checker.execute();
+ reporter = checker.execute();
- assertEquals(7 - 2, partitionStats.totalKeys);
- assertEquals(3, partitionStats.consistentKeys);
+ assertEquals(7 - 2, reporter.numTotalKeys);
+ assertEquals(3, reporter.numGoodKeys);
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.