Skip to content

Commit

Permalink
src/java/voldemort/store/routed/action/PerformParallelPutRequests.java
Browse files Browse the repository at this point in the history
- clean up handling of ObsoleteVersionException to do what the
  client said should be done. This will stop
  ObsoleteVersionExceptions from being escalated to
  InsufficientOperationalNodes exceptions.

test/integration/voldemort/performance/benchmark/Benchmark.java
- Add connections/node to configuration of benchmark tool
- Set client timeouts to recommended values

src/java/voldemort/store/socket/clientrequest/ClientRequestExecutor.java
- refactored a synchronized method to isolate the bare minimum
  steps that need to be synchronized. This allows local.complete
  to be called outside of 'synchronized' which avoids deadlocking
  nio selector threads.
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent 4b05dcf commit 8f7c4a7
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 26 deletions.
Expand Up @@ -196,14 +196,12 @@ public void requestComplete(Object result, long requestTime) {

for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
if(response.getValue() instanceof Exception) {
if(response.getValue() instanceof ObsoleteVersionException) {
// ignore this completely here
// this means that a higher version was able
// to write on this node and should be termed as
// clean success.
responses.remove(responseEntry.getKey());
} else if(handleResponseError(response, pipeline, failureDetector))
// Treat ObsoleteVersionExceptions as success since such an
// exception means that a higher version was able to write on the
// node.
if(response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
if(handleResponseError(response, pipeline, failureDetector))
return;
} else {
pipelineData.incrementSuccesses();
Expand All @@ -227,14 +225,12 @@ public void requestComplete(Object result, long requestTime) {

for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
if(response.getValue() instanceof Exception) {
if(response.getValue() instanceof ObsoleteVersionException) {
// ignore this completely here
// this means that a higher version was able
// to write on this node and should be termed as
// clean success.
responses.remove(responseEntry.getKey());
} else if(handleResponseError(response, pipeline, failureDetector))
// Treat ObsoleteVersionExceptions as success since such an
// exception means that a higher version was able to write
// on the node.
if(response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
if(handleResponseError(response, pipeline, failureDetector))
return;
} else {
pipelineData.incrementSuccesses();
Expand Down
Expand Up @@ -253,20 +253,29 @@ protected void write(SelectionKey selectionKey) throws IOException {
* check in the instance again which causes problems for the pool
* maintenance.
*/
private synchronized ClientRequest<?> atomicNullOutClientRequest() {
ClientRequest<?> local = clientRequest;
clientRequest = null;
expiration = 0;

private synchronized void completeClientRequest() {
if(clientRequest == null) {
return local;
}

/**
* Null out current clientRequest before calling complete. timeOut and
* complete must *not* be within a synchronized block since both eventually
* check in the client request executor. Such a check in can trigger
* additional synchronized methods deeper in the stack.
*/
private void completeClientRequest() {
ClientRequest<?> local = atomicNullOutClientRequest();
if(local == null) {
if(logger.isEnabledFor(Level.WARN))
logger.warn("No client associated with " + socketChannel.socket());

return;
}

// Sorry about this - please see the method comments...
ClientRequest<?> local = clientRequest;
clientRequest = null;
expiration = 0;

if(isExpired)
local.timeOut();
else
Expand Down
29 changes: 26 additions & 3 deletions test/integration/voldemort/performance/benchmark/Benchmark.java
Expand Up @@ -22,6 +22,7 @@
import java.text.NumberFormat;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.TimeUnit;

import joptsimple.OptionParser;
import joptsimple.OptionSet;
Expand All @@ -34,6 +35,7 @@
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClient;
import voldemort.client.StoreClientFactory;
import voldemort.client.protocol.RequestFormatType;
import voldemort.serialization.IdentitySerializer;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
Expand All @@ -59,12 +61,14 @@
public class Benchmark {

private static final int MAX_WORKERS = 8;
private static final int MAX_CONNECTIONS_PER_NODE = 50;

/**
* Constants for the benchmark file
*/
public static final String PROP_FILE = "prop-file";
public static final String THREADS = "threads";
public static final String NUM_CONNECTIONS_PER_NODE = "num-connections-per-node";
public static final String ITERATIONS = "iterations";
public static final String STORAGE_CONFIGURATION_CLASS = "storage-configuration-class";
public static final String INTERVAL = "interval";
Expand Down Expand Up @@ -116,8 +120,8 @@ public class Benchmark {
private StoreClient<Object, Object> storeClient;
private StoreClientFactory factory;

private int numThreads, numIterations, targetThroughput, recordCount, opsCount,
statusIntervalSec;
private int numThreads, numConnectionsPerNode, numIterations, targetThroughput, recordCount,
opsCount, statusIntervalSec;
private double perThreadThroughputPerMs;
private Workload workLoad;
private String pluginName;
Expand Down Expand Up @@ -220,6 +224,7 @@ public void run() {
e.printStackTrace();
}
opsDone++;

if(targetThroughputPerMs > 0) {
double timePerOp = ((double) opsDone) / targetThroughputPerMs;
while(System.currentTimeMillis() - startTime < timePerOp) {
Expand Down Expand Up @@ -315,6 +320,8 @@ public void initializeWorkload(Props workloadProps) throws Exception {
public void initializeStore(Props benchmarkProps) throws Exception {

this.numThreads = benchmarkProps.getInt(THREADS, MAX_WORKERS);
this.numConnectionsPerNode = benchmarkProps.getInt(NUM_CONNECTIONS_PER_NODE,
MAX_CONNECTIONS_PER_NODE);
this.numIterations = benchmarkProps.getInt(ITERATIONS, 1);
this.statusIntervalSec = benchmarkProps.getInt(INTERVAL, 0);
this.verbose = benchmarkProps.getBoolean(VERBOSE, false);
Expand All @@ -334,7 +341,14 @@ public void initializeStore(Props benchmarkProps) throws Exception {

ClientConfig clientConfig = new ClientConfig().setMaxThreads(numThreads)
.setMaxTotalConnections(numThreads)
.setMaxConnectionsPerNode(numThreads)
.setMaxConnectionsPerNode(numConnectionsPerNode)
.setRoutingTimeout(1500,
TimeUnit.MILLISECONDS)
.setSocketTimeout(1500,
TimeUnit.MILLISECONDS)
.setConnectionTimeout(500,
TimeUnit.MILLISECONDS)
.setRequestFormatType(RequestFormatType.VOLDEMORT_V3)
.setBootstrapUrls(socketUrl);

if(clientZoneId >= 0) {
Expand Down Expand Up @@ -519,6 +533,12 @@ public static void main(String args[]) throws IOException {
.withRequiredArg()
.describedAs("num-threads")
.ofType(Integer.class);
parser.accepts(NUM_CONNECTIONS_PER_NODE,
"max number of connections to any node; Default = "
+ MAX_CONNECTIONS_PER_NODE)
.withRequiredArg()
.describedAs("num-connections-per-node")
.ofType(Integer.class);
parser.accepts(ITERATIONS, "number of times to repeat benchmark phase; Default = 1")
.withRequiredArg()
.describedAs("num-iter")
Expand Down Expand Up @@ -658,6 +678,9 @@ public static void main(String args[]) throws IOException {
mainProps.put(VALUE_SIZE, CmdUtils.valueOf(options, VALUE_SIZE, 1024));
mainProps.put(ITERATIONS, CmdUtils.valueOf(options, ITERATIONS, 1));
mainProps.put(THREADS, CmdUtils.valueOf(options, THREADS, MAX_WORKERS));
mainProps.put(NUM_CONNECTIONS_PER_NODE, CmdUtils.valueOf(options,
NUM_CONNECTIONS_PER_NODE,
MAX_CONNECTIONS_PER_NODE));
mainProps.put(PERCENT_CACHED, CmdUtils.valueOf(options, PERCENT_CACHED, 0));
mainProps.put(INTERVAL, CmdUtils.valueOf(options, INTERVAL, 0));
mainProps.put(TARGET_THROUGHPUT, CmdUtils.valueOf(options, TARGET_THROUGHPUT, -1));
Expand Down

0 comments on commit 8f7c4a7

Please sign in to comment.