Permalink
Browse files

Fixed hashmap issues in AdminClient raised during code review. Added …

…'--parse-only' option to ConsistencyFix.

src/java/voldemort/client/protocol/admin/AdminClient.java
- Added hashCode & equals methods to AdminClient.Nodestore
- cleaned up getSocketStore to not leak concurrently created socket stores.

src/java/voldemort/utils/ConsistencyFix(CLI).java
- added parse only flag which limits that actions of the fixer to bootstrapping and parsing the input file.
  • Loading branch information...
jayjwylie committed Feb 12, 2013
1 parent 213003a commit 0c6893c3e7b72f6f5e15b8f1e644e16d1378811f
@@ -1902,15 +1902,28 @@ public ByteArray computeNext() {
private class NodeStore { private class NodeStore {
@SuppressWarnings("unused") final public Integer nodeId;
final public int nodeId;
@SuppressWarnings("unused")
final public String storeName; final public String storeName;
NodeStore(int nodeId, String storeName) { NodeStore(int nodeId, String storeName) {
this.nodeId = nodeId; this.nodeId = new Integer(nodeId);
this.storeName = storeName; this.storeName = storeName;
} }
@Override
public boolean equals(Object obj) {
if(this == obj)
return true;
if(!(obj instanceof NodeStore))
return false;
NodeStore other = (NodeStore) obj;
return nodeId.equals(other.nodeId) && storeName.equals(other.storeName);
}
@Override
public int hashCode() {
return nodeId.hashCode() + storeName.hashCode();
}
} }
final private ClientConfig clientConfig; final private ClientConfig clientConfig;
@@ -1933,27 +1946,33 @@ public ByteArray computeNext() {
public SocketStore getSocketStore(int nodeId, String storeName) { public SocketStore getSocketStore(int nodeId, String storeName) {
NodeStore nodeStore = new NodeStore(nodeId, storeName); NodeStore nodeStore = new NodeStore(nodeId, storeName);
if(!nodeStoreSocketCache.containsKey(nodeStore)) { SocketStore socketStore = nodeStoreSocketCache.get(nodeStore);
SocketStore socketStore = null; if(socketStore == null) {
Node node = getAdminClientCluster().getNodeById(nodeId); Node node = getAdminClientCluster().getNodeById(nodeId);
SocketStore newSocketStore = null;
try { try {
// Can clientConfig.getRequestFormatType() default to // TODO: Can clientConfig.getRequestFormatType() default to
// something? // something?
socketStore = clientPool.create(storeName, newSocketStore = clientPool.create(storeName,
node.getHost(), node.getHost(),
node.getSocketPort(), node.getSocketPort(),
clientConfig.getRequestFormatType(), clientConfig.getRequestFormatType(),
RequestRoutingType.IGNORE_CHECKS); RequestRoutingType.IGNORE_CHECKS);
} catch(Exception e) { } catch(Exception e) {
clientPool.close(); clientPool.close();
throw new VoldemortException(e); throw new VoldemortException(e);
} }
nodeStoreSocketCache.putIfAbsent(nodeStore, socketStore); socketStore = nodeStoreSocketCache.putIfAbsent(nodeStore, newSocketStore);
if(socketStore == null) {
socketStore = newSocketStore;
} else {
newSocketStore.close();
}
} }
return nodeStoreSocketCache.get(nodeStore); return socketStore;
} }
public void stop() { public void stop() {
@@ -60,12 +60,14 @@
private final long perServerIOPSLimit; private final long perServerIOPSLimit;
private final ConcurrentMap<Integer, EventThrottler> putThrottlers; private final ConcurrentMap<Integer, EventThrottler> putThrottlers;
private final boolean dryRun; private final boolean dryRun;
private final boolean parseOnly;
ConsistencyFix(String url, ConsistencyFix(String url,
String storeName, String storeName,
long progressBar, long progressBar,
long perServerIOPSLimit, long perServerIOPSLimit,
boolean dryRun) { boolean dryRun,
boolean parseOnly) {
this.storeName = storeName; this.storeName = storeName;
logger.info("Connecting to bootstrap server: " + url); logger.info("Connecting to bootstrap server: " + url);
this.adminClient = new AdminClient(url, new AdminClientConfig(), 0); this.adminClient = new AdminClient(url, new AdminClientConfig(), 0);
@@ -85,6 +87,7 @@
this.perServerIOPSLimit = perServerIOPSLimit; this.perServerIOPSLimit = perServerIOPSLimit;
this.putThrottlers = new ConcurrentHashMap<Integer, EventThrottler>(); this.putThrottlers = new ConcurrentHashMap<Integer, EventThrottler>();
this.dryRun = dryRun; this.dryRun = dryRun;
this.parseOnly = parseOnly;
} }
public String getStoreName() { public String getStoreName() {
@@ -107,6 +110,10 @@ public boolean isDryRun() {
return dryRun; return dryRun;
} }
public boolean isParseOnly() {
return parseOnly;
}
/** /**
* Throttle put (repair) activity per server. * Throttle put (repair) activity per server.
* *
@@ -295,9 +302,11 @@ public void run() {
counter++; counter++;
logger.debug("BadKeyReader read line: key (" + key + ") and counter (" logger.debug("BadKeyReader read line: key (" + key + ") and counter ("
+ counter + ")"); + counter + ")");
consistencyFixWorkers.submit(new ConsistencyFixWorker(key, if(!consistencyFix.isParseOnly()) {
consistencyFix, consistencyFixWorkers.submit(new ConsistencyFixWorker(key,
badKeyQOut)); consistencyFix,
badKeyQOut));
}
} }
} }
} catch(IOException ioe) { } catch(IOException ioe) {
@@ -411,10 +420,12 @@ public void run() {
values.add(new Versioned<byte[]>(value, vectorClock)); values.add(new Versioned<byte[]>(value, vectorClock));
} }
QueryKeyResult queryKeyResult = new QueryKeyResult(keyByteArray, values); QueryKeyResult queryKeyResult = new QueryKeyResult(keyByteArray, values);
consistencyFixWorkers.submit(new ConsistencyFixWorker(key, if(!consistencyFix.isParseOnly()) {
consistencyFix, consistencyFixWorkers.submit(new ConsistencyFixWorker(key,
badKeyQOut, consistencyFix,
queryKeyResult)); badKeyQOut,
queryKeyResult));
}
} }
} }
} catch(Exception e) { } catch(Exception e) {
@@ -70,6 +70,7 @@ public static void printUsage(String errMessage, OptionParser parser) {
public long progressBar = defaultProgressBar; public long progressBar = defaultProgressBar;
public long perServerIOPSLimit = defaultPerServerIOPSLimit; public long perServerIOPSLimit = defaultPerServerIOPSLimit;
public boolean dryRun = false; public boolean dryRun = false;
public boolean parseOnly = false;
} }
/** /**
@@ -100,6 +101,9 @@ public static void printUsage(String errMessage, OptionParser parser) {
"Indicates format of bad-key-file-in is of 'orphan' key-values."); "Indicates format of bad-key-file-in is of 'orphan' key-values.");
parser.accepts("dry-run", parser.accepts("dry-run",
"Indicates to go through all of the read actions until the point of issuing repair puts. Then, do a 'no-op'."); "Indicates to go through all of the read actions until the point of issuing repair puts. Then, do a 'no-op'.");
parser.accepts("parse-only",
"Indicates to only parse the input file. Does not perform any key queries or repair puts. "
+ "Does bootstrap though so bootstrapUrl and storeName must be specified.");
parser.accepts("bad-key-file-out", parser.accepts("bad-key-file-out",
"Name of bad-key-file-out. " "Name of bad-key-file-out. "
+ "Keys that are not mae consistent are output to this file.") + "Keys that are not mae consistent are output to this file.")
@@ -167,6 +171,9 @@ public static void printUsage(String errMessage, OptionParser parser) {
if(optionSet.has("dry-run")) { if(optionSet.has("dry-run")) {
options.dryRun = true; options.dryRun = true;
} }
if(optionSet.has("parse-only")) {
options.parseOnly = true;
}
return options; return options;
} }
@@ -178,7 +185,8 @@ public static void main(String[] args) throws Exception {
options.storeName, options.storeName,
options.progressBar, options.progressBar,
options.perServerIOPSLimit, options.perServerIOPSLimit,
options.dryRun); options.dryRun,
options.parseOnly);
String summary = consistencyFix.execute(options.parallelism, String summary = consistencyFix.execute(options.parallelism,
options.badKeyFileIn, options.badKeyFileIn,

0 comments on commit 0c6893c

Please sign in to comment.