Permalink
Browse files

Added unit tests for ConsistencyFix, ConsistencyFixWorker, and QueryK…

…eyResult.

Many other fixes and cleanup:

src/java/voldemort/utils/ConsistencyFix.java
- tweak many variable names
- add close method to stop adminClient
- broke out BadKey to wrap a key with its string representation st failed fixes of badkey's can be dumped in full to file to be retried (without any additional effort)
- marked 'parseVersion' as deprecated since, if we do this again, we should dump bytes not strings
- track obsolete version exceptions and various statuses in Stats

src/java/voldemort/utils/ConsistencyFixCLI.java
- clean up of arguments, variable names, etc.
- cleanly close down fixer...

src/java/voldemort/utils/ConsistencyFixWorker.java
- more logger.trace output
- minor cleanup

test/common/voldemort/TestUtils.java
- added getVersioned() helper method

test/common/voldemort/config/stores.xml
- added consistency-fix store

test/unit/voldemort/store/routed/ReadRepairerTest.java
- marked all tests as @Test

test/unit/voldemort/utils/ConsistencyCheckTest.java
- update copyright notice
  • Loading branch information...
1 parent b0a000d commit 78e9417e91107a23b29be1d2f9c64c6fbfd1fc81 @jayjwylie jayjwylie committed Mar 8, 2013

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -34,7 +34,7 @@ public static void printUsage() {
sb.append("Optional arguments: \n");
sb.append("\t--orphan-format\n");
sb.append("\t--dry-run\n");
- sb.append("\t--progress-bar <progressBarPeriod>\n");
+ sb.append("\t--progress-period-ops <progressPeriodOps>\n");
sb.append("\t--parallelism <parallelism>\n");
sb.append("\t--per-server-iops-limit <perServerIOPSLimit>\n");
sb.append("\n");
@@ -57,18 +57,17 @@ public static void printUsage(String errMessage, OptionParser parser) {
private static class Options {
public final static int defaultParallelism = 8;
- // TODO: change name to progressPeriodMs (or S, as case may be)
- public final static long defaultProgressBar = 1000;
- public final static long defaultPerServerIOPSLimit = 100;
+ public final static long defaultProgressPeriodOps = 1000;
+ public final static long defaultPerServerQPSLimit = 100;
public String url = null;
public String storeName = null;
public String badKeyFileIn = null;
public boolean badKeyFileInOrphanFormat = false;
public String badKeyFileOut = null;
public int parallelism = defaultParallelism;
- public long progressBar = defaultProgressBar;
- public long perServerIOPSLimit = defaultPerServerIOPSLimit;
+ public long progressPeriodOps = defaultProgressPeriodOps;
+ public long perServerQPSLimit = defaultPerServerQPSLimit;
public boolean dryRun = false;
public boolean parseOnly = false;
}
@@ -115,15 +114,17 @@ public static void printUsage(String errMessage, OptionParser parser) {
.withRequiredArg()
.describedAs("parallelism [Default value: " + Options.defaultParallelism + "]")
.ofType(Integer.class);
- parser.accepts("progress-bar", "Number of operations between 'info' progress messages. ")
+ parser.accepts("progress-period-ops",
+ "Number of operations between 'info' progress messages. ")
.withRequiredArg()
- .describedAs("progressBar [Default value: " + Options.defaultProgressBar + "]")
+ .describedAs("period (in operations) between outputting progress [Default value: "
+ + Options.defaultProgressPeriodOps + "]")
.ofType(Long.class);
- parser.accepts("per-server-iops-limit",
- "Number of operations that the consistency fixer will issue into any individual server in one second. ")
+ parser.accepts("per-server-qps-limit",
+ "Number of operations that the consistency fixer will issue to any individual server in one second. ")
.withRequiredArg()
- .describedAs("perServerIOPSLimit [Default value: "
- + Options.defaultPerServerIOPSLimit + "]")
+ .describedAs("perServerQPSLimit [Default value: " + Options.defaultPerServerQPSLimit
+ + "]")
.ofType(Long.class);
OptionSet optionSet = parser.parse(args);
@@ -162,11 +163,11 @@ public static void printUsage(String errMessage, OptionParser parser) {
if(optionSet.has("parallelism")) {
options.parallelism = (Integer) optionSet.valueOf("parallelism");
}
- if(optionSet.has("progress-bar")) {
- options.progressBar = (Long) optionSet.valueOf("progress-bar");
+ if(optionSet.has("progress-period-ops")) {
+ options.progressPeriodOps = (Long) optionSet.valueOf("progress-period-ops");
}
- if(optionSet.has("per-server-iops-limit")) {
- options.perServerIOPSLimit = (Long) optionSet.valueOf("per-server-iops-limit");
+ if(optionSet.has("per-server-qps-limit")) {
+ options.perServerQPSLimit = (Long) optionSet.valueOf("per-server-qps-limit");
}
if(optionSet.has("dry-run")) {
options.dryRun = true;
@@ -183,8 +184,8 @@ public static void main(String[] args) throws Exception {
ConsistencyFix consistencyFix = new ConsistencyFix(options.url,
options.storeName,
- options.progressBar,
- options.perServerIOPSLimit,
+ options.progressPeriodOps,
+ options.perServerQPSLimit,
options.dryRun,
options.parseOnly);
@@ -193,6 +194,8 @@ public static void main(String[] args) throws Exception {
options.badKeyFileInOrphanFormat,
options.badKeyFileOut);
+ consistencyFix.close();
+
System.out.println(summary);
}
}
@@ -28,7 +28,8 @@
import voldemort.client.protocol.admin.QueryKeyResult;
import voldemort.store.routed.NodeValue;
import voldemort.store.routed.ReadRepairer;
-import voldemort.utils.ConsistencyFix.BadKeyResult;
+import voldemort.utils.ConsistencyFix.BadKey;
+import voldemort.utils.ConsistencyFix.BadKeyStatus;
import voldemort.utils.ConsistencyFix.Status;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.VectorClock;
@@ -41,9 +42,9 @@
private static final Logger logger = Logger.getLogger(ConsistencyFixWorker.class);
private static final int fakeNodeID = Integer.MIN_VALUE;
- private final String keyInHexFormat;
+ private final BadKey badKey;
private final ConsistencyFix consistencyFix;
- private final BlockingQueue<BadKeyResult> badKeyQOut;
+ private final BlockingQueue<BadKeyStatus> badKeyQOut;
private final QueryKeyResult orphanedValues;
/**
@@ -53,10 +54,10 @@
* @param consistencyFix
* @param badKeyQOut
*/
- ConsistencyFixWorker(String keyInHexFormat,
+ ConsistencyFixWorker(BadKey badKey,
ConsistencyFix consistencyFix,
- BlockingQueue<BadKeyResult> badKeyQOut) {
- this(keyInHexFormat, consistencyFix, badKeyQOut, null);
+ BlockingQueue<BadKeyStatus> badKeyQOut) {
+ this(badKey, consistencyFix, badKeyQOut, null);
}
/**
@@ -69,11 +70,11 @@
* @param badKeyQOut
* @param orphanedValues Set to null if no orphaned values to be included.
*/
- ConsistencyFixWorker(String keyInHexFormat,
+ ConsistencyFixWorker(BadKey badKey,
ConsistencyFix consistencyFix,
- BlockingQueue<BadKeyResult> badKeyQOut,
+ BlockingQueue<BadKeyStatus> badKeyQOut,
QueryKeyResult orphanedValues) {
- this.keyInHexFormat = keyInHexFormat;
+ this.badKey = badKey;
this.consistencyFix = consistencyFix;
this.badKeyQOut = badKeyQOut;
this.orphanedValues = orphanedValues;
@@ -85,29 +86,28 @@ private String myName() {
@Override
public void run() {
- logger.trace("About to process key " + keyInHexFormat + " (" + myName() + ")");
- Status status = doConsistencyFix(keyInHexFormat);
- logger.trace("Finished processing key " + keyInHexFormat + " (" + myName() + ")");
+ logger.trace("About to process key " + badKey + " (" + myName() + ")");
+ Status status = doConsistencyFix(badKey);
+ logger.trace("Finished processing key " + badKey + " (" + myName() + ")");
consistencyFix.getStats().incrementFixCount();
if(status != Status.SUCCESS) {
try {
- badKeyQOut.put(consistencyFix.new BadKeyResult(keyInHexFormat, status));
+ badKeyQOut.put(new BadKeyStatus(badKey, status));
} catch(InterruptedException ie) {
logger.warn("Worker thread " + myName() + " interrupted.");
}
- consistencyFix.getStats().incrementFailures();
+ consistencyFix.getStats().incrementFailures(status);
}
}
- public Status doConsistencyFix(String keyInHexFormat) {
-
+ public Status doConsistencyFix(BadKey badKey) {
// Initialization.
byte[] keyInBytes;
List<Integer> nodeIdList = null;
int masterPartitionId = -1;
try {
- keyInBytes = ByteUtils.fromHexString(keyInHexFormat);
+ keyInBytes = ByteUtils.fromHexString(badKey.getKeyInHexFormat());
masterPartitionId = consistencyFix.getStoreInstance().getMasterPartitionId(keyInBytes);
nodeIdList = consistencyFix.getStoreInstance()
.getReplicationNodeList(masterPartitionId);
@@ -123,19 +123,27 @@ public Status doConsistencyFix(String keyInHexFormat) {
// Do the reads
Map<Integer, QueryKeyResult> nodeIdToKeyValues = doReads(nodeIdList,
keyInBytes,
- keyInHexFormat);
+ badKey.getKeyInHexFormat());
// Process read replies (i.e., nodeIdToKeyValues)
ProcessReadRepliesResult result = processReadReplies(nodeIdList,
keyAsByteArray,
- keyInHexFormat,
+ badKey.getKeyInHexFormat(),
nodeIdToKeyValues);
if(result.status != Status.SUCCESS) {
return result.status;
}
// Resolve conflicts indicated in nodeValues
List<NodeValue<ByteArray, byte[]>> toReadRepair = resolveReadConflicts(result.nodeValues);
+ if(logger.isTraceEnabled()) {
+ if(toReadRepair.size() == 0) {
+ logger.trace("Nothing to repair");
+ }
+ for(NodeValue<ByteArray, byte[]> nodeValue: toReadRepair) {
+ logger.trace(nodeValue.getNodeId() + " --- " + nodeValue.getKey().toString());
+ }
+ }
// Do the repairs
Status status = doRepairPut(toReadRepair);
@@ -248,6 +256,7 @@ private ProcessReadRepliesResult processReadReplies(final List<Integer> nodeIdLi
logger.info("Aborting fixKey because exceptions were encountered when fetching key-values.");
return new ProcessReadRepliesResult(Status.FETCH_EXCEPTION);
}
+
if(logger.isDebugEnabled()) {
for(NodeValue<ByteArray, byte[]> nkv: nodeValues) {
logger.debug("\tRead NodeKeyValue : " + ByteUtils.toHexString(nkv.getKey().get())
@@ -267,6 +276,17 @@ private ProcessReadRepliesResult processReadReplies(final List<Integer> nodeIdLi
*/
private List<NodeValue<ByteArray, byte[]>> resolveReadConflicts(final List<NodeValue<ByteArray, byte[]>> nodeValues) {
+ if(logger.isTraceEnabled()) {
+ logger.trace("NodeValues passed into resolveReadConflicts.");
+ if(nodeValues.size() == 0) {
+ logger.trace("Empty nodeValues passed to resolveReadConflicts");
+ }
+ for(NodeValue<ByteArray, byte[]> nodeValue: nodeValues) {
+ logger.trace("\t" + nodeValue.getNodeId() + " - " + nodeValue.getKey().toString()
+ + " - " + nodeValue.getVersion().toString());
+ }
+ }
+
// If orphaned values exist, add them to fake nodes to be processed by
// "getRepairs"
int currentFakeNodeId = fakeNodeID;
@@ -279,21 +299,26 @@ private ProcessReadRepliesResult processReadReplies(final List<Integer> nodeIdLi
}
}
- // Some cut-paste-and-modify (CPAM) coding from
+ // Some cut-paste-and-modify coding from
// store/routed/action/AbstractReadRepair.java and
// store/routed/ThreadPoolRoutedStore.java
ReadRepairer<ByteArray, byte[]> readRepairer = new ReadRepairer<ByteArray, byte[]>();
- if(logger.isDebugEnabled()) {
- for(NodeValue<ByteArray, byte[]> nodeKeyValue: readRepairer.getRepairs(nodeValues)) {
- logger.debug("\tNodeKeyValue result from readRepairer.getRepairs : "
+ List<NodeValue<ByteArray, byte[]>> nodeKeyValues = readRepairer.getRepairs(nodeValues);
+
+ if(logger.isTraceEnabled()) {
+ if(nodeKeyValues.size() == 0) {
+ logger.trace("\treadRepairer returned an empty list.");
+ }
+ for(NodeValue<ByteArray, byte[]> nodeKeyValue: nodeKeyValues) {
+ logger.trace("\tNodeKeyValue result from readRepairer.getRepairs : "
+ ByteUtils.toHexString(nodeKeyValue.getKey().get())
+ " on node with id " + nodeKeyValue.getNodeId() + " for version "
+ nodeKeyValue.getVersion());
}
}
List<NodeValue<ByteArray, byte[]>> toReadRepair = Lists.newArrayList();
- for(NodeValue<ByteArray, byte[]> v: readRepairer.getRepairs(nodeValues)) {
+ for(NodeValue<ByteArray, byte[]> v: nodeKeyValues) {
if(v.getNodeId() > currentFakeNodeId) {
// Only copy repairs intended for real nodes.
Versioned<byte[]> versioned = Versioned.value(v.getVersioned().getValue(),
@@ -311,9 +336,12 @@ private ProcessReadRepliesResult processReadReplies(final List<Integer> nodeIdLi
}
- if(logger.isDebugEnabled()) {
+ if(logger.isTraceEnabled()) {
+ if(toReadRepair.size() == 0) {
+ logger.trace("\ttoReadRepair is empty.");
+ }
for(NodeValue<ByteArray, byte[]> nodeKeyValue: toReadRepair) {
- logger.debug("\tRepair key " + ByteUtils.toHexString(nodeKeyValue.getKey().get())
+ logger.trace("\tRepair key " + ByteUtils.toHexString(nodeKeyValue.getKey().get())
+ " on node with id " + nodeKeyValue.getNodeId() + " for version "
+ nodeKeyValue.getVersion());
@@ -342,8 +370,8 @@ public Status doRepairPut(final List<NodeValue<ByteArray, byte[]>> toReadRepair)
nodeKeyValue);
consistencyFix.getStats().incrementPutCount();
} catch(ObsoleteVersionException ove) {
- // TODO: Add OVE catches to some statistics?
- // NOOP. Treat OVE as success.
+ // Treat OVE as success.
+ consistencyFix.getStats().incrementObsoleteVersionExceptions();
} catch(VoldemortException ve) {
allRepairsSuccessful = false;
logger.debug("Repair of key " + nodeKeyValue.getKey() + "on node with id "
@@ -1,5 +1,5 @@
/*
- * Copyright 2008-2009 LinkedIn, Inc
+ * Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -70,6 +70,16 @@ public static VectorClock getClock(int... nodes) {
return clock;
}
+ /**
+ * Helper method to construct Versioned byte value.
+ *
+ * @param nodes See getClock method for explanation of this argument
+ * @return
+ */
+ public static Versioned<byte[]> getVersioned(byte[] value, int... nodes) {
+ return new Versioned<byte[]>(value, getClock(nodes));
+ }
+
/**
* Record events for the given sequence of nodes
*
@@ -251,4 +251,23 @@
</value-serializer>
<retention-days>1</retention-days>
</store>
+ <store>
+ <name>consistency-fix</name>
+ <persistence>memory</persistence>
+ <routing>client</routing>
+ <replication-factor>4</replication-factor>
+ <preferred-reads>1</preferred-reads>
+ <required-reads>1</required-reads>
+ <preferred-writes>2</preferred-writes>
+ <required-writes>2</required-writes>
+ <key-serializer>
+ <type>string</type>
+ <schema-info>UTF-8</schema-info>
+ </key-serializer>
+ <value-serializer>
+ <type>string</type>
+ <schema-info>UTF-8</schema-info>
+ </value-serializer>
+ <retention-days>1</retention-days>
+ </store>
</stores>
@@ -13,7 +13,6 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
-
package voldemort.client;
import static junit.framework.Assert.assertEquals;
Oops, something went wrong.

0 comments on commit 78e9417

Please sign in to comment.