Skip to content

Commit

Permalink
Moved AdminClient.QueryKeyResult into its own file.
Browse files Browse the repository at this point in the history
src/java/voldemort/client/protocol/admin/QueryKeyResult.java
- A more complete, proper class based on the inner class that had been in AdminClient.java
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent 5db6a1d commit 83b87bc
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 80 deletions.
10 changes: 5 additions & 5 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -55,8 +55,8 @@
import org.codehaus.jackson.map.ObjectMapper;

import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClient.QueryKeyResult;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.protocol.admin.QueryKeyResult;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.serialization.DefaultSerializerFactory;
Expand Down Expand Up @@ -1670,13 +1670,13 @@ public void writeTo(BufferedWriter out) throws IOException {
while(iterator.hasNext()) {
QueryKeyResult queryKeyResult = iterator.next();
// unserialize and write key
byte[] keyBytes = queryKeyResult.key.get();
byte[] keyBytes = queryKeyResult.getKey().get();
Object keyObject = keySerializer.toObject((null == keyCompressionStrategy) ? keyBytes
: keyCompressionStrategy.inflate(keyBytes));
generator.writeObject(keyObject);

// iterate through, unserialize and write values
List<Versioned<byte[]>> values = queryKeyResult.values;
List<Versioned<byte[]>> values = queryKeyResult.getValues();
if(values != null) {
if(values.size() == 0) {
stringWriter.write(", null");
Expand All @@ -1698,9 +1698,9 @@ public void writeTo(BufferedWriter out) throws IOException {
stringWriter.write(", null");
}
// write out exception
if(queryKeyResult.exception != null) {
if(queryKeyResult.hasException()) {
stringWriter.write(", ");
stringWriter.write(queryKeyResult.exception.toString());
stringWriter.write(queryKeyResult.getException().toString());
}

StringBuffer buf = stringWriter.getBuffer();
Expand Down
55 changes: 55 additions & 0 deletions src/java/voldemort/client/protocol/admin/QueryKeyResult.java
@@ -0,0 +1,55 @@
package voldemort.client.protocol.admin;

import java.util.List;

import voldemort.utils.ByteArray;
import voldemort.versioning.Versioned;

/**
* Return type of AdminClient.QueryKeys. Intended to ensure the following
* invariant: .hasValues() == !.hasException()
*/
public class QueryKeyResult {

private final ByteArray key;
private final List<Versioned<byte[]>> values;
private final Exception exception;

QueryKeyResult(ByteArray key, List<Versioned<byte[]>> values) {
this.key = key;
this.values = values;
this.exception = null;
}

QueryKeyResult(ByteArray key, Exception exception) {
this.key = key;
this.values = null;
this.exception = exception;
}

public ByteArray getKey() {
return key;
}

/**
* @return true iff values were returned.
*/
public boolean hasValues() {
return (values != null);
}

public List<Versioned<byte[]>> getValues() {
return values;
}

/**
* @return true iff exception occured during queryKeys.
*/
public boolean hasException() {
return (exception != null);
}

public Exception getException() {
return exception;
}
}
44 changes: 12 additions & 32 deletions src/java/voldemort/utils/ConsistencyFix.java
Expand Up @@ -35,8 +35,8 @@
import org.apache.commons.codec.binary.Hex;

import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClient.QueryKeyResult;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.protocol.admin.QueryKeyResult;
import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;
import voldemort.store.routed.NodeValue;
Expand Down Expand Up @@ -334,31 +334,31 @@ private static ConsistencyFix.FixKeyResult processReadReplies(final List<Integer
}
keyValue = nodeIdToKeyValues.get(nodeId).next();

if(keyValue.exception != null) {
if(keyValue.hasException()) {
if(verbose) {
System.out.println("\t... Exception encountered while fetching key "
+ keyInHexFormat + " from node with nodeId " + nodeId
+ " : " + keyValue.exception.getMessage());
+ " : " + keyValue.getException().getMessage());
}
exceptionsEncountered = true;
} else {
if(keyValue.values.isEmpty()) {
if(keyValue.getValues().isEmpty()) {
if(verbose) {
System.out.println("\t... Adding null version to nodeValues");
}
Versioned<byte[]> versioned = new Versioned<byte[]>(null);
nodeValues.add(new NodeValue<ByteArray, byte[]>(nodeId,
keyValue.key,
keyValue.getKey(),
versioned));

} else {
for(Versioned<byte[]> value: keyValue.values) {
for(Versioned<byte[]> value: keyValue.getValues()) {
if(verbose) {
System.out.println("\t... Adding following version to nodeValues: "
+ value.getVersion());
}
nodeValues.add(new NodeValue<ByteArray, byte[]>(nodeId,
keyValue.key,
keyValue.getKey(),
value));
}
}
Expand All @@ -383,6 +383,7 @@ private static ConsistencyFix.FixKeyResult processReadReplies(final List<Integer
}

/**
* Decide on the specific key-value to write everywhere.
*
* @param verbose
* @param nodeValues
Expand All @@ -391,27 +392,14 @@ private static ConsistencyFix.FixKeyResult processReadReplies(final List<Integer
private static List<NodeValue<ByteArray, byte[]>> resolveReadConflicts(boolean verbose,
final List<NodeValue<ByteArray, byte[]>> nodeValues) {

// Decide on the specific key-value to write everywhere.
// Some cut-paste-and-modify coding from AbstractReadRepair.java...
// Some cut-paste-and-modify coding from
// store/routed/action/AbstractReadRepair.java and
// store/routed/ThreadPoolRoutedStore.java
if(verbose) {
System.out.println("Resolving conflicts in responses.");
}
// TODO: Figure out if 'cloning' is necessary. It does not seem to be
// necessary. See both store/routed/action/AbstractReadRepair.java and
// store/routed/ThreadPoolRoutedStore.java for other copies of this
// code. I think the cut-and-paste comment below may just be confusing.
// We need to "clone" the subset of the nodeValues that we actually want
// to repair. But, I am not sure we need to clone the versioned part of
// each copied object.
ReadRepairer<ByteArray, byte[]> readRepairer = new ReadRepairer<ByteArray, byte[]>();
List<NodeValue<ByteArray, byte[]>> toReadRepair = Lists.newArrayList();
// TODO: Remove/clean up this comment (and possibly the two copies of
// this comment in the code.
/*
* We clone after computing read repairs in the assumption that the
* output will be smaller than the input. Note that we clone the
* version, but not the key or value as the latter two are not mutated.
*/
for(NodeValue<ByteArray, byte[]> v: readRepairer.getRepairs(nodeValues)) {
Versioned<byte[]> versioned = Versioned.value(v.getVersioned().getValue(),
((VectorClock) v.getVersion()).clone());
Expand All @@ -420,16 +408,8 @@ private static List<NodeValue<ByteArray, byte[]>> resolveReadConflicts(boolean v
+ versioned.getVersion() + ")");
}
toReadRepair.add(new NodeValue<ByteArray, byte[]>(v.getNodeId(), v.getKey(), versioned));
/*-
* The below code seems to work in lieu of the above line. So, not sure
* why it is necessary to construct new versioned object above based
* on cloned timestamp.
*
toReadRepair.add(new NodeValue<ByteArray, byte[]>(v.getNodeId(),
v.getKey(),
v.getVersioned()));
*/
}

// TODO: As we discussed, I don't know the read repair code path very
// well. So, feel free to discard my comments if I am off target w.r.t
// to simply doing a get() to fix everything. Semantically, it then
Expand Down

0 comments on commit 83b87bc

Please sign in to comment.