Permalink
Browse files

finer timeouts and partial getalls.

  • Loading branch information...
1 parent 32aa7d5 commit 10211c71f35b358ffc38bdbf7d75d6f0cf1c1480 @vinothchandar vinothchandar committed Jun 1, 2012
View
2 src/java/voldemort/client/AbstractStoreClientFactory.java
@@ -114,7 +114,7 @@ public AbstractStoreClientFactory(ClientConfig config) {
this.clientZoneId = config.getClientZoneId();
this.routedStoreFactory = new RoutedStoreFactory(config.isPipelineRoutedStoreEnabled(),
threadPool,
- config.getRoutingTimeout(TimeUnit.MILLISECONDS));
+ config.getTimeoutConfig());
if(this.isJmxEnabled) {
JmxUtils.registerMbean(threadPool,
View
60 src/java/voldemort/client/ClientConfig.java
@@ -36,6 +36,7 @@
import voldemort.utils.ConfigurationException;
import voldemort.utils.Props;
import voldemort.utils.ReflectUtils;
+import voldemort.utils.TimeoutConfig;
import voldemort.utils.Utils;
/**
@@ -55,6 +56,7 @@
private volatile boolean socketKeepAlive = false;
private volatile int selectors = 8;
private volatile long routingTimeoutMs = 15000;
+ private volatile TimeoutConfig timeoutConfig = new TimeoutConfig(routingTimeoutMs, false);
private volatile int socketBufferSize = 64 * 1024;
private volatile SerializerFactory serializerFactory = new DefaultSerializerFactory();
private volatile List<String> bootstrapUrls = null;
@@ -91,6 +93,12 @@ public ClientConfig() {}
public static final String SOCKET_KEEPALIVE_PROPERTY = "socket_keepalive";
public static final String SELECTORS_PROPERTY = "selectors";
public static final String ROUTING_TIMEOUT_MS_PROPERTY = "routing_timeout_ms";
+ public static final String GETALL_ROUTING_TIMEOUT_MS_PROPERTY = "getall_routing_timeout_ms";
+ public static final String PUT_ROUTING_TIMEOUT_MS_PROPERTY = "put_routing_timeout_ms";
+ public static final String GET_ROUTING_TIMEOUT_MS_PROPERTY = "get_routing_timeout_ms";
+ public static final String GET_VERSIONS_ROUTING_TIMEOUT_MS_PROPERTY = "getversions_routing_timeout_ms";
+ public static final String DELETE_ROUTING_TIMEOUT_MS_PROPERTY = "delete_routing_timeout_ms";
+ public static final String ALLOW_PARTIAL_GETALLS_PROPERTY = "allow_partial_getalls";
public static final String NODE_BANNAGE_MS_PROPERTY = "node_bannage_ms";
public static final String SOCKET_BUFFER_SIZE_PROPERTY = "socket_buffer_size";
public static final String SERIALIZER_FACTORY_CLASS_PROPERTY = "serializer_factory_class";
@@ -174,6 +182,36 @@ private void setProperties(Properties properties) {
if(props.containsKey(ROUTING_TIMEOUT_MS_PROPERTY))
this.setRoutingTimeout(props.getInt(ROUTING_TIMEOUT_MS_PROPERTY), TimeUnit.MILLISECONDS);
+ // By default, make all the timeouts equal to routing timeout
+ timeoutConfig = new TimeoutConfig(routingTimeoutMs, false);
+
+ if(props.containsKey(GETALL_ROUTING_TIMEOUT_MS_PROPERTY))
+ timeoutConfig.getAllTimeoutMs(props.getInt(GETALL_ROUTING_TIMEOUT_MS_PROPERTY),
+ TimeUnit.MILLISECONDS);
+
+ if(props.containsKey(GET_ROUTING_TIMEOUT_MS_PROPERTY))
+ timeoutConfig.getTimeoutMs(props.getInt(GET_ROUTING_TIMEOUT_MS_PROPERTY),
+ TimeUnit.MILLISECONDS);
+
+ if(props.containsKey(PUT_ROUTING_TIMEOUT_MS_PROPERTY)) {
+ long putTimeoutMs = props.getInt(PUT_ROUTING_TIMEOUT_MS_PROPERTY);
+ timeoutConfig.putTimeoutMs(putTimeoutMs, TimeUnit.MILLISECONDS);
+ // By default, use the same thing for getVersions() also
+ timeoutConfig.getVersionsTimeoutMs(putTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ // of course, if someone overrides it, we will respect that
+ if(props.containsKey(GET_VERSIONS_ROUTING_TIMEOUT_MS_PROPERTY))
+ timeoutConfig.getVersionsTimeoutMs(props.getInt(GET_VERSIONS_ROUTING_TIMEOUT_MS_PROPERTY),
+ TimeUnit.MILLISECONDS);
+
+ if(props.containsKey(DELETE_ROUTING_TIMEOUT_MS_PROPERTY))
+ timeoutConfig.deleteTimeoutMs(props.getInt(DELETE_ROUTING_TIMEOUT_MS_PROPERTY),
+ TimeUnit.MILLISECONDS);
+
+ if(props.containsKey(ALLOW_PARTIAL_GETALLS_PROPERTY))
+ timeoutConfig.setPartialGetAllAllowed(props.getBoolean(ALLOW_PARTIAL_GETALLS_PROPERTY));
+
if(props.containsKey(SOCKET_BUFFER_SIZE_PROPERTY))
this.setSocketBufferSize(props.getInt(SOCKET_BUFFER_SIZE_PROPERTY));
@@ -325,6 +363,26 @@ public ClientConfig setRoutingTimeout(int routingTimeout, TimeUnit unit) {
}
/**
+ * Set the timeout configuration for the voldemort operations
+ *
+ * @param tConfig
+ * @return
+ */
+ public ClientConfig setTimeoutConfig(TimeoutConfig tConfig) {
+ this.timeoutConfig = tConfig;
+ return this;
+ }
+
+ /**
+ * Get the timeouts for voldemort operations
+ *
+ * @return
+ */
+ public TimeoutConfig getTimeoutConfig() {
+ return timeoutConfig;
+ }
+
+ /**
* @deprecated Use {@link #getFailureDetectorBannagePeriod()} instead
*/
@Deprecated
@@ -517,7 +575,7 @@ public boolean isLazyEnabled() {
/**
* Enable lazy initialization of clients?
- *
+ *
* @param enableLazy If true clients will be lazily initialized
*/
public ClientConfig setEnableLazy(boolean enableLazy) {
View
25 src/java/voldemort/server/VoldemortConfig.java
@@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import voldemort.client.protocol.RequestFormatType;
import voldemort.cluster.failuredetector.FailureDetectorConfig;
@@ -34,6 +35,7 @@
import voldemort.utils.ConfigurationException;
import voldemort.utils.Props;
import voldemort.utils.Time;
+import voldemort.utils.TimeoutConfig;
import voldemort.utils.UndefinedPropertyException;
import voldemort.utils.Utils;
@@ -112,6 +114,7 @@
private int clientSelectors;
private int clientRoutingTimeoutMs;
+ private TimeoutConfig clientTimeoutConfig;
private int clientMaxConnectionsPerNode;
private int clientConnectionTimeoutMs;
private int clientMaxThreads;
@@ -275,6 +278,24 @@ public VoldemortConfig(Props props) {
this.clientMaxConnectionsPerNode = props.getInt("client.max.connections.per.node", 50);
this.clientConnectionTimeoutMs = props.getInt("client.connection.timeout.ms", 500);
this.clientRoutingTimeoutMs = props.getInt("client.routing.timeout.ms", 15000);
+ this.clientTimeoutConfig = new TimeoutConfig(this.clientRoutingTimeoutMs, false);
+ this.clientTimeoutConfig.getTimeoutMs(props.getInt("client.routing.get.timeout.ms",
+ this.clientRoutingTimeoutMs),
+ TimeUnit.MILLISECONDS);
+ this.clientTimeoutConfig.getAllTimeoutMs(props.getInt("client.routing.getall.timeout.ms",
+ this.clientRoutingTimeoutMs),
+ TimeUnit.MILLISECONDS);
+ this.clientTimeoutConfig.putTimeoutMs(props.getInt("client.routing.put.timeout.ms",
+ this.clientRoutingTimeoutMs),
+ TimeUnit.MILLISECONDS);
+ this.clientTimeoutConfig.getVersionsTimeoutMs(props.getLong("client.routing.getversions.timeout.ms",
+ this.clientTimeoutConfig.putTimeoutMs()),
+ TimeUnit.MILLISECONDS);
+ this.clientTimeoutConfig.deleteTimeoutMs(props.getInt("client.routing.delete.timeout.ms",
+ this.clientRoutingTimeoutMs),
+ TimeUnit.MILLISECONDS);
+ this.clientTimeoutConfig.setPartialGetAllAllowed(props.getBoolean("client.routing.allow.partial.getall",
+ false));
this.clientMaxThreads = props.getInt("client.max.threads", 500);
this.clientThreadIdleMs = props.getInt("client.thread.idle.ms", 100000);
this.clientMaxQueuedRequests = props.getInt("client.max.queued.requests", 1000);
@@ -997,6 +1018,10 @@ public void setClientRoutingTimeoutMs(int routingTimeoutMs) {
this.clientRoutingTimeoutMs = routingTimeoutMs;
}
+ public TimeoutConfig getTimeoutConfig() {
+ return this.clientTimeoutConfig;
+ }
+
public int getClientMaxConnectionsPerNode() {
return clientMaxConnectionsPerNode;
}
View
2 src/java/voldemort/server/storage/StorageService.java
@@ -155,7 +155,7 @@ public StorageService(StoreRepository storeRepository,
this.storeStats = new StoreStats();
this.routedStoreFactory = new RoutedStoreFactory(voldemortConfig.isPipelineRoutedStoreEnabled(),
this.clientThreadPool,
- voldemortConfig.getClientRoutingTimeoutMs());
+ voldemortConfig.getTimeoutConfig());
/*
* Initialize the dynamic throttle limit based on the per node limit
View
46 src/java/voldemort/store/routed/PipelineRoutedStore.java
@@ -58,6 +58,7 @@
import voldemort.utils.ByteUtils;
import voldemort.utils.JmxUtils;
import voldemort.utils.SystemTime;
+import voldemort.utils.TimeoutConfig;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
@@ -102,15 +103,15 @@ public PipelineRoutedStore(String name,
StoreDefinition storeDef,
boolean repairReads,
int clientZoneId,
- long timeoutMs,
+ TimeoutConfig timeoutConfig,
FailureDetector failureDetector,
boolean jmxEnabled) {
super(name,
innerStores,
cluster,
storeDef,
repairReads,
- timeoutMs,
+ timeoutConfig,
failureDetector,
SystemTime.INSTANCE);
this.nonblockingSlopStores = nonblockingSlopStores;
@@ -149,7 +150,9 @@ public PipelineRoutedStore(String name,
pipelineData.setZonesRequired(null);
pipelineData.setStats(stats);
- final Pipeline pipeline = new Pipeline(Operation.GET, timeoutMs, TimeUnit.MILLISECONDS);
+ final Pipeline pipeline = new Pipeline(Operation.GET,
+ timeoutConfig.getTimeoutMs(),
+ TimeUnit.MILLISECONDS);
boolean allowReadRepair = repairReads && transforms == null;
StoreRequest<List<Versioned<byte[]>>> blockingStoreRequest = new StoreRequest<List<Versioned<byte[]>>>() {
@@ -177,7 +180,7 @@ public PipelineRoutedStore(String name,
failureDetector,
storeDef.getPreferredReads(),
storeDef.getRequiredReads(),
- timeoutMs,
+ timeoutConfig.getTimeoutMs(),
nonblockingStores,
Event.INSUFFICIENT_SUCCESSES,
Event.INSUFFICIENT_ZONES));
@@ -198,7 +201,7 @@ public PipelineRoutedStore(String name,
new ReadRepair<BasicPipelineData<List<Versioned<byte[]>>>>(pipelineData,
Event.COMPLETED,
storeDef.getPreferredReads(),
- timeoutMs,
+ timeoutConfig.getTimeoutMs(),
nonblockingStores,
readRepairer));
@@ -255,7 +258,9 @@ public PipelineRoutedStore(String name,
pipelineData.setZonesRequired(null);
pipelineData.setStats(stats);
- Pipeline pipeline = new Pipeline(Operation.GET_ALL, timeoutMs, TimeUnit.MILLISECONDS);
+ Pipeline pipeline = new Pipeline(Operation.GET_ALL,
+ timeoutConfig.getAllTimeoutMs(),
+ TimeUnit.MILLISECONDS);
pipeline.addEventAction(Event.STARTED,
new GetAllConfigureNodes(pipelineData,
Event.CONFIGURED,
@@ -270,7 +275,7 @@ public PipelineRoutedStore(String name,
new PerformParallelGetAllRequests(pipelineData,
Event.INSUFFICIENT_SUCCESSES,
failureDetector,
- timeoutMs,
+ timeoutConfig.getAllTimeoutMs(),
nonblockingStores));
pipeline.addEventAction(Event.INSUFFICIENT_SUCCESSES,
new PerformSerialGetAllRequests(pipelineData,
@@ -280,14 +285,15 @@ public PipelineRoutedStore(String name,
failureDetector,
innerStores,
storeDef.getPreferredReads(),
- storeDef.getRequiredReads()));
+ storeDef.getRequiredReads(),
+ timeoutConfig.isPartialGetAllAllowed()));
if(allowReadRepair)
pipeline.addEventAction(Event.RESPONSES_RECEIVED,
new GetAllReadRepair(pipelineData,
Event.COMPLETED,
storeDef.getPreferredReads(),
- timeoutMs,
+ timeoutConfig.getAllTimeoutMs(),
nonblockingStores,
readRepairer));
@@ -323,7 +329,9 @@ public PipelineRoutedStore(String name,
else
pipelineData.setZonesRequired(null);
pipelineData.setStats(stats);
- Pipeline pipeline = new Pipeline(Operation.GET_VERSIONS, timeoutMs, TimeUnit.MILLISECONDS);
+ Pipeline pipeline = new Pipeline(Operation.GET_VERSIONS,
+ timeoutConfig.getVersionsTimeoutMs(),
+ TimeUnit.MILLISECONDS);
StoreRequest<List<Version>> blockingStoreRequest = new StoreRequest<List<Version>>() {
@@ -349,7 +357,7 @@ public PipelineRoutedStore(String name,
failureDetector,
storeDef.getPreferredReads(),
storeDef.getRequiredReads(),
- timeoutMs,
+ timeoutConfig.getVersionsTimeoutMs(),
nonblockingStores,
Event.INSUFFICIENT_SUCCESSES,
Event.INSUFFICIENT_ZONES));
@@ -408,7 +416,9 @@ public boolean delete(final ByteArray key, final Version version) throws Voldemo
pipelineData.setStoreName(name);
pipelineData.setStats(stats);
- Pipeline pipeline = new Pipeline(Operation.DELETE, timeoutMs, TimeUnit.MILLISECONDS);
+ Pipeline pipeline = new Pipeline(Operation.DELETE,
+ timeoutConfig.deleteTimeoutMs(),
+ TimeUnit.MILLISECONDS);
pipeline.setEnableHintedHandoff(isHintedHandoffEnabled());
HintedHandoff hintedHandoff = null;
@@ -419,7 +429,7 @@ public boolean delete(final ByteArray key, final Version version) throws Voldemo
nonblockingSlopStores,
handoffStrategy,
pipelineData.getFailedNodes(),
- timeoutMs);
+ timeoutConfig.deleteTimeoutMs());
pipeline.addEventAction(Event.STARTED,
new ConfigureNodes<Boolean, BasicPipelineData<Boolean>>(pipelineData,
@@ -437,7 +447,7 @@ public boolean delete(final ByteArray key, final Version version) throws Voldemo
failureDetector,
storeDef.getPreferredWrites(),
storeDef.getRequiredWrites(),
- timeoutMs,
+ timeoutConfig.deleteTimeoutMs(),
nonblockingStores,
hintedHandoff,
version));
@@ -496,7 +506,9 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
pipelineData.setStoreName(name);
pipelineData.setStats(stats);
- Pipeline pipeline = new Pipeline(Operation.PUT, timeoutMs, TimeUnit.MILLISECONDS);
+ Pipeline pipeline = new Pipeline(Operation.PUT,
+ timeoutConfig.putTimeoutMs(),
+ TimeUnit.MILLISECONDS);
pipeline.setEnableHintedHandoff(isHintedHandoffEnabled());
HintedHandoff hintedHandoff = null;
@@ -507,7 +519,7 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
nonblockingSlopStores,
handoffStrategy,
pipelineData.getFailedNodes(),
- timeoutMs);
+ timeoutConfig.putTimeoutMs());
pipeline.addEventAction(Event.STARTED,
new ConfigureNodes<Void, PutPipelineData>(pipelineData,
@@ -537,7 +549,7 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
failureDetector,
storeDef.getPreferredWrites(),
storeDef.getRequiredWrites(),
- timeoutMs,
+ timeoutConfig.putTimeoutMs(),
nonblockingStores,
hintedHandoff));
if(isHintedHandoffEnabled()) {
View
7 src/java/voldemort/store/routed/RoutedStore.java
@@ -32,6 +32,7 @@
import voldemort.store.StoreDefinition;
import voldemort.utils.ByteArray;
import voldemort.utils.Time;
+import voldemort.utils.TimeoutConfig;
import voldemort.utils.Utils;
/**
@@ -45,7 +46,7 @@
protected final Map<Integer, Store<ByteArray, byte[], byte[]>> innerStores;
protected final boolean repairReads;
protected final ReadRepairer<ByteArray, byte[]> readRepairer;
- protected final long timeoutMs;
+ protected final TimeoutConfig timeoutConfig;
protected final Time time;
protected final StoreDefinition storeDef;
protected final FailureDetector failureDetector;
@@ -57,7 +58,7 @@ protected RoutedStore(String name,
Cluster cluster,
StoreDefinition storeDef,
boolean repairReads,
- long timeoutMs,
+ TimeoutConfig timeoutConfig,
FailureDetector failureDetector,
Time time) {
if(storeDef.getRequiredReads() < 1)
@@ -77,7 +78,7 @@ protected RoutedStore(String name,
this.innerStores = new ConcurrentHashMap<Integer, Store<ByteArray, byte[], byte[]>>(innerStores);
this.repairReads = repairReads;
this.readRepairer = new ReadRepairer<ByteArray, byte[]>();
- this.timeoutMs = timeoutMs;
+ this.timeoutConfig = timeoutConfig;
this.time = Utils.notNull(time);
this.storeDef = storeDef;
this.failureDetector = failureDetector;
View
11 src/java/voldemort/store/routed/RoutedStoreFactory.java
@@ -18,6 +18,7 @@
import voldemort.store.slop.Slop;
import voldemort.utils.ByteArray;
import voldemort.utils.SystemTime;
+import voldemort.utils.TimeoutConfig;
import com.google.common.collect.Maps;
@@ -27,16 +28,16 @@
private final ExecutorService threadPool;
- private final long routingTimeoutMs;
+ private final TimeoutConfig timeoutConfig;
private final Logger logger = Logger.getLogger(getClass());
public RoutedStoreFactory(boolean isPipelineRoutedStoreEnabled,
ExecutorService threadPool,
- long routingTimeoutMs) {
+ TimeoutConfig timeoutConfig) {
this.isPipelineRoutedStoreEnabled = isPipelineRoutedStoreEnabled;
this.threadPool = threadPool;
- this.routingTimeoutMs = routingTimeoutMs;
+ this.timeoutConfig = timeoutConfig;
}
public NonblockingStore toNonblockingStore(Store<ByteArray, byte[], byte[]> store) {
@@ -90,7 +91,7 @@ public RoutedStore create(Cluster cluster,
storeDefinition,
repairReads,
clientZoneId,
- routingTimeoutMs,
+ timeoutConfig,
failureDetector,
jmxEnabled);
} else {
@@ -111,7 +112,7 @@ public RoutedStore create(Cluster cluster,
storeDefinition,
repairReads,
threadPool,
- routingTimeoutMs,
+ timeoutConfig,
failureDetector,
SystemTime.INSTANCE);
}
View
31 src/java/voldemort/store/routed/ThreadPoolRoutedStore.java
@@ -48,6 +48,7 @@
import voldemort.utils.ByteUtils;
import voldemort.utils.SystemTime;
import voldemort.utils.Time;
+import voldemort.utils.TimeoutConfig;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
@@ -102,15 +103,15 @@ public ThreadPoolRoutedStore(String name,
StoreDefinition storeDef,
int numberOfThreads,
boolean repairReads,
- long timeoutMs,
+ TimeoutConfig timeoutConfig,
FailureDetector failureDetector) {
this(name,
innerStores,
cluster,
storeDef,
repairReads,
Executors.newFixedThreadPool(numberOfThreads),
- timeoutMs,
+ timeoutConfig,
failureDetector,
SystemTime.INSTANCE);
}
@@ -134,10 +135,17 @@ public ThreadPoolRoutedStore(String name,
StoreDefinition storeDef,
boolean repairReads,
ExecutorService threadPool,
- long timeoutMs,
+ TimeoutConfig timeoutConfig,
FailureDetector failureDetector,
Time time) {
- super(name, innerStores, cluster, storeDef, repairReads, timeoutMs, failureDetector, time);
+ super(name,
+ innerStores,
+ cluster,
+ storeDef,
+ repairReads,
+ timeoutConfig,
+ failureDetector,
+ time);
this.executor = threadPool;
}
@@ -184,7 +192,8 @@ public void run() {
} catch(Exception e) {
failures.add(e);
logger.warn("Error in DELETE on node " + node.getId() + "("
- + node.getHost() + ")", e);
+ + node.getHost() + ")",
+ e);
} finally {
// signal that the operation is complete
semaphore.release();
@@ -199,6 +208,7 @@ public void run() {
} else {
for(int i = 0; i < numNodes; i++) {
try {
+ long timeoutMs = timeoutConfig.deleteTimeoutMs();
boolean acquired = semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
if(!acquired)
logger.warn("Delete operation timed out waiting for operation " + i
@@ -292,6 +302,7 @@ public void run() {
keyToSuccessCount.put(key, new MutableInt(0));
List<Future<GetAllResult>> futures;
+ long timeoutMs = timeoutConfig.getAllTimeoutMs();
try {
// TODO What to do about timeouts? They should be longer as getAll
// is likely to
@@ -377,7 +388,8 @@ public void run() {
throw e;
} catch(Exception e) {
logger.warn("Error in GET_ALL on node " + node.getId() + "("
- + node.getHost() + ")", e);
+ + node.getHost() + ")",
+ e);
failures.add(e);
}
}
@@ -453,6 +465,8 @@ public Void apply(List<GetResult<Versioned<byte[]>>> nodeResults) {
}
List<Future<GetResult<R>>> futures;
+ long timeoutMs = (fetcher == VERSION_OP) ? timeoutConfig.getVersionsTimeoutMs()
+ : timeoutConfig.getTimeoutMs();
try {
futures = executor.invokeAll(callables, timeoutMs, TimeUnit.MILLISECONDS);
} catch(InterruptedException e) {
@@ -498,8 +512,7 @@ public Void apply(List<GetResult<Versioned<byte[]>>> nodeResults) {
key,
fetcher.execute(innerStores.get(node.getId()),
key,
- transforms),
- null));
+ transforms), null));
++successes;
recordSuccess(node, startNs);
} catch(UnreachableStoreException e) {
@@ -760,7 +773,7 @@ private boolean blockOnPut(long startNs,
for(int i = startingIndex; i < blockCount; i++) {
try {
long ellapsedNs = System.nanoTime() - startNs;
- long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs;
+ long remainingNs = (timeoutConfig.putTimeoutMs() * Time.NS_PER_MS) - ellapsedNs;
boolean acquiredPermit = semaphore.tryAcquire(Math.max(remainingNs, 0),
TimeUnit.NANOSECONDS);
if(!acquiredPermit) {
View
2 src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java
@@ -119,7 +119,7 @@ public void requestComplete(Object result, long requestTime) {
}
try {
- latch.await(timeoutMs * 3, TimeUnit.MILLISECONDS);
+ latch.await(timeoutMs, TimeUnit.MILLISECONDS);
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
View
34 src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java
@@ -52,19 +52,23 @@
private final int required;
+ private final boolean allowPartial;
+
public PerformSerialGetAllRequests(GetAllPipelineData pipelineData,
Event completeEvent,
Iterable<ByteArray> keys,
FailureDetector failureDetector,
Map<Integer, Store<ByteArray, byte[], byte[]>> stores,
int preferred,
- int required) {
+ int required,
+ boolean allowPartial) {
super(pipelineData, completeEvent);
this.keys = keys;
this.failureDetector = failureDetector;
this.stores = stores;
this.preferred = preferred;
this.required = required;
+ this.allowPartial = allowPartial;
}
public void execute(Pipeline pipeline) {
@@ -156,17 +160,23 @@ public void execute(Pipeline pipeline) {
MutableInt successCount = pipelineData.getSuccessCount(key);
if(successCount.intValue() < required) {
- pipelineData.setFatalError(new InsufficientOperationalNodesException(required
- + " "
- + pipeline.getOperation()
- .getSimpleName()
- + "s required, but "
- + successCount.intValue()
- + " succeeded. Failing nodes : "
- + pipelineData.getFailedNodes(),
- pipelineData.getFailures()));
- pipeline.addEvent(Event.ERROR);
- return;
+ // if we allow partial results, then just remove keys that did
+ // not meet 'required' guarantee; else raise error
+ if(allowPartial) {
+ result.remove(key);
+ } else {
+ pipelineData.setFatalError(new InsufficientOperationalNodesException(required
+ + " "
+ + pipeline.getOperation()
+ .getSimpleName()
+ + "s required, but "
+ + successCount.intValue()
+ + " succeeded. Failing nodes : "
+ + pipelineData.getFailedNodes(),
+ pipelineData.getFailures()));
+ pipeline.addEvent(Event.ERROR);
+ return;
+ }
}
}
View
114 src/java/voldemort/utils/TimeoutConfig.java
@@ -0,0 +1,114 @@
+package voldemort.utils;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Encapsulates the timeouts for various voldemort operations
+ *
+ */
+public class TimeoutConfig {
+
+ private long getTimeoutMs;
+
+ private long putTimeoutMs;
+
+ private long getAllTimeoutMs;
+
+ private long deleteTimeoutMs;
+
+ private long getVersionsTimeoutMs;
+
+ private boolean partialGetAllAllowed;
+
+ public TimeoutConfig(long globalTimeout, boolean allowPartialGetAlls) {
+ this(globalTimeout,
+ globalTimeout,
+ globalTimeout,
+ globalTimeout,
+ globalTimeout,
+ allowPartialGetAlls);
+ }
+
+ public TimeoutConfig(long getTimeout,
+ long putTimeout,
+ long deleteTimeout,
+ long getAllTimeout,
+ long getVersionsTimeout,
+ boolean allowPartialGetAlls) {
+ getTimeoutMs(getTimeout, TimeUnit.MILLISECONDS);
+ putTimeoutMs(putTimeout, TimeUnit.MILLISECONDS);
+ deleteTimeoutMs(deleteTimeout, TimeUnit.MILLISECONDS);
+ getAllTimeoutMs(getAllTimeout, TimeUnit.MILLISECONDS);
+ getVersionsTimeoutMs(getVersionsTimeout, TimeUnit.MILLISECONDS);
+ setPartialGetAllAllowed(allowPartialGetAlls);
+ }
+
+ public long getTimeoutMs(TimeUnit unit) {
+ return unit.convert(getTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ public long getTimeoutMs() {
+ return getTimeoutMs;
+ }
+
+ public void getTimeoutMs(long getTimeoutMs, TimeUnit unit) {
+ this.getTimeoutMs = unit.toMillis(getTimeoutMs);
+ }
+
+ public long getVersionsTimeoutMs(TimeUnit unit) {
+ return unit.convert(getVersionsTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ public long getVersionsTimeoutMs() {
+ return getVersionsTimeoutMs;
+ }
+
+ public void getVersionsTimeoutMs(long getTimeoutMs, TimeUnit unit) {
+ this.getVersionsTimeoutMs = unit.toMillis(getTimeoutMs);
+ }
+
+ public long putTimeoutMs(TimeUnit unit) {
+ return unit.convert(putTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ public long putTimeoutMs() {
+ return putTimeoutMs;
+ }
+
+ public void putTimeoutMs(long putTimeoutMs, TimeUnit unit) {
+ this.putTimeoutMs = unit.toMillis(putTimeoutMs);
+ }
+
+ public long getAllTimeoutMs(TimeUnit unit) {
+ return unit.convert(getAllTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ public long getAllTimeoutMs() {
+ return getAllTimeoutMs;
+ }
+
+ public void getAllTimeoutMs(long getAllTimeoutMs, TimeUnit unit) {
+ this.getAllTimeoutMs = unit.toMillis(getAllTimeoutMs);
+ }
+
+ public long deleteTimeoutMs(TimeUnit unit) {
+ return unit.convert(deleteTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+
+ public long deleteTimeoutMs() {
+ return deleteTimeoutMs;
+ }
+
+ public void deleteTimeoutMs(long deleteTimeoutMs, TimeUnit unit) {
+ this.deleteTimeoutMs = unit.toMillis(deleteTimeoutMs);
+ }
+
+ public boolean isPartialGetAllAllowed() {
+ return partialGetAllAllowed;
+ }
+
+ public void setPartialGetAllAllowed(boolean allowPartialGetAlls) {
+ this.partialGetAllAllowed = allowPartialGetAlls;
+ }
+
+}
View
39 test/integration/voldemort/performance/ClientConnectionStressTest.java
@@ -1,12 +1,12 @@
/*
* Copyright 2008-2010 LinkedIn, Inc
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -16,19 +16,20 @@
package voldemort.performance;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClient;
import voldemort.client.StoreClientFactory;
import voldemort.utils.CmdUtils;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import voldemort.utils.TimeoutConfig;
/**
* Stress tests the client. Intended to diagnose issues such as connection leaks
@@ -92,8 +93,6 @@ public void run() {
executor.shutdown();
}
-
-
public static void main(String[] args) throws Exception {
OptionParser parser = new OptionParser();
@@ -112,18 +111,14 @@ public static void main(String[] args) throws Exception {
parser.accepts(MAX_CONNECTIONS_TOTAL, "Max total connections")
.withRequiredArg()
.ofType(Integer.class);
- parser.accepts(MAX_THREADS, "Max threads")
- .withRequiredArg()
- .ofType(Integer.class);
+ parser.accepts(MAX_THREADS, "Max threads").withRequiredArg().ofType(Integer.class);
parser.accepts(SELECTORS, "Number of NIO selectors")
.withRequiredArg()
.ofType(Integer.class);
parser.accepts(SOCKET_BUFFER_SIZE, "Socket buffer size")
- .withRequiredArg()
- .ofType(Integer.class);
- parser.accepts(REQS, "Requests per session")
.withRequiredArg()
.ofType(Integer.class);
+ parser.accepts(REQS, "Requests per session").withRequiredArg().ofType(Integer.class);
parser.accepts(CONNECTIONS, "Total connections to make")
.withRequiredArg()
.ofType(Integer.class);
@@ -144,11 +139,15 @@ public static void main(String[] args) throws Exception {
ClientConfig config = new ClientConfig();
if(options.has(CONNECTION_TIMEOUT))
- config.setConnectionTimeout((Integer) options.valueOf(CONNECTION_TIMEOUT), TimeUnit.MILLISECONDS);
+ config.setConnectionTimeout((Integer) options.valueOf(CONNECTION_TIMEOUT),
+ TimeUnit.MILLISECONDS);
if(options.has(ROUTING_TIMEOUT))
- config.setRoutingTimeout((Integer) options.valueOf(ROUTING_TIMEOUT), TimeUnit.MILLISECONDS);
+ config.setTimeoutConfig(new TimeoutConfig(TimeUnit.MILLISECONDS.toMillis((Integer) options.valueOf(ROUTING_TIMEOUT)),
+ false));
+
if(options.has(SOCKET_TIMEOUT))
- config.setSocketTimeout((Integer) options.valueOf(SOCKET_TIMEOUT), TimeUnit.MILLISECONDS);
+ config.setSocketTimeout((Integer) options.valueOf(SOCKET_TIMEOUT),
+ TimeUnit.MILLISECONDS);
if(options.has(MAX_CONNECTIONS))
config.setMaxConnectionsPerNode((Integer) options.valueOf(MAX_CONNECTIONS));
if(options.has(MAX_THREADS))
View
10 test/integration/voldemort/performance/RoutedStoreParallelismTest.java
@@ -28,7 +28,6 @@
import joptsimple.OptionParser;
import joptsimple.OptionSet;
-import voldemort.cluster.failuredetector.MutableStoreVerifier;
import voldemort.ServerTestUtils;
import voldemort.TestUtils;
import voldemort.VoldemortException;
@@ -39,6 +38,7 @@
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.cluster.failuredetector.FailureDetectorConfig;
import voldemort.cluster.failuredetector.FailureDetectorUtils;
+import voldemort.cluster.failuredetector.MutableStoreVerifier;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.VoldemortServer;
@@ -101,7 +101,9 @@ public static void main(String[] args) throws Throwable {
.ofType(Integer.class);
parser.accepts("num-clients",
"The number of threads to make requests concurrently Default = "
- + DEFAULT_NUM_CLIENTS).withRequiredArg().ofType(Integer.class);
+ + DEFAULT_NUM_CLIENTS)
+ .withRequiredArg()
+ .ofType(Integer.class);
parser.accepts("routed-store-type",
"Type of routed store, either \"" + THREAD_POOL_ROUTED_STORE + "\" or \""
+ PIPELINE_ROUTED_STORE + "\" Default = "
@@ -201,7 +203,7 @@ public static void main(String[] args) throws Throwable {
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(routedStoreType.trim()
.equalsIgnoreCase(PIPELINE_ROUTED_STORE),
routedStoreThreadPool,
- clientConfig.getRoutingTimeout(TimeUnit.MILLISECONDS));
+ clientConfig.getTimeoutConfig());
final RoutedStore routedStore = routedStoreFactory.create(cluster,
storeDefinition,
@@ -223,7 +225,7 @@ public void run() {
try {
routedStore.get(key, null);
} catch(VoldemortException e) {
- //
+ //
}
}
}
View
5 test/unit/voldemort/store/routed/HintedHandoffTest.java
@@ -61,6 +61,7 @@
import voldemort.store.slop.strategy.HintedHandoffStrategyType;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
+import voldemort.utils.TimeoutConfig;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
@@ -168,7 +169,9 @@ public void setUp() throws Exception {
setFailureDetector(subStores);
routedStoreThreadPool = Executors.newFixedThreadPool(NUM_THREADS);
- routedStoreFactory = new RoutedStoreFactory(true, routedStoreThreadPool, 1500L);
+ routedStoreFactory = new RoutedStoreFactory(true,
+ routedStoreThreadPool,
+ new TimeoutConfig(1500L, false));
strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, cluster);
Map<Integer, NonblockingStore> nonblockingSlopStores = Maps.newHashMap();
View
15 test/unit/voldemort/store/routed/ReadRepairerTest.java
@@ -21,9 +21,9 @@
import static org.junit.Assert.assertEquals;
import static voldemort.FailureDetectorTestUtils.recordException;
import static voldemort.FailureDetectorTestUtils.recordSuccess;
-import static voldemort.cluster.failuredetector.MutableStoreVerifier.create;
import static voldemort.TestUtils.getClock;
import static voldemort.cluster.failuredetector.FailureDetectorUtils.create;
+import static voldemort.cluster.failuredetector.MutableStoreVerifier.create;
import java.util.ArrayList;
import java.util.Arrays;
@@ -56,6 +56,7 @@
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.utils.ByteArray;
import voldemort.utils.Time;
+import voldemort.utils.TimeoutConfig;
import voldemort.versioning.Versioned;
import com.google.common.collect.Iterables;
@@ -157,7 +158,8 @@ public void testMissingKeysAreAddedToNodeWhenDoingReadRepair() throws Exception
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- 1000L);
+ new TimeoutConfig(1000L,
+ false));
RoutedStore store = routedStoreFactory.create(cluster,
storeDef,
@@ -198,8 +200,8 @@ public void testNoDuplicates() throws Exception {
public void testSingleSuccessor() throws Exception {
assertVariationsEqual(singletonList(getValue(1, 1, new int[] { 1, 1 })),
- asList(getValue(1, 1, new int[] { 1 }), getValue(2, 1, new int[] { 1,
- 1 })));
+ asList(getValue(1, 1, new int[] { 1 }),
+ getValue(2, 1, new int[] { 1, 1 })));
}
public void testAllConcurrent() throws Exception {
@@ -257,8 +259,9 @@ public void testConcurrentToOneDoesNotImplyConcurrentToAll() throws Exception {
getValue(1, 1, new int[] { 1, 2 }),
getValue(2, 1, new int[] { 1, 3, 3 }),
getValue(3, 1, new int[] { 1, 2 })),
- asList(getValue(1, 1, new int[] { 3, 3 }), getValue(2, 1, new int[] {
- 1, 2 }), getValue(3, 1, new int[] { 1, 3, 3 })));
+ asList(getValue(1, 1, new int[] { 3, 3 }),
+ getValue(2, 1, new int[] { 1, 2 }),
+ getValue(3, 1, new int[] { 1, 3, 3 })));
}
public void testLotsOfVersions() throws Exception {
View
167 test/unit/voldemort/store/routed/RoutedStoreTest.java
@@ -18,10 +18,10 @@
import static voldemort.FailureDetectorTestUtils.recordException;
import static voldemort.FailureDetectorTestUtils.recordSuccess;
-import static voldemort.cluster.failuredetector.MutableStoreVerifier.create;
import static voldemort.TestUtils.getClock;
import static voldemort.VoldemortTestConstants.getNineNodeCluster;
import static voldemort.cluster.failuredetector.FailureDetectorUtils.create;
+import static voldemort.cluster.failuredetector.MutableStoreVerifier.create;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,6 +33,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
@@ -73,6 +74,7 @@
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Time;
+import voldemort.utils.TimeoutConfig;
import voldemort.utils.Utils;
import voldemort.versioning.Occurred;
import voldemort.versioning.VectorClock;
@@ -202,7 +204,8 @@ else if(count < failing + sleepy)
routedStoreThreadPool = Executors.newFixedThreadPool(threads);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- 1000L);
+ new TimeoutConfig(1000L,
+ false));
return routedStoreFactory.create(cluster, storeDef, subStores, true, failureDetector);
}
@@ -252,7 +255,8 @@ else if(sleepy != null && sleepy.contains(n.getId()))
routedStoreThreadPool = Executors.newFixedThreadPool(threads);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true,
routedStoreThreadPool,
- timeOutMs);
+ new TimeoutConfig(timeOutMs,
+ false));
return routedStoreFactory.create(cluster, storeDef, subStores, true, failureDetector);
}
@@ -376,7 +380,7 @@ public void testPutIncrementsVersion() throws Exception {
@Test
public void testObsoleteMasterFails() {
- // write me
+ // write me
}
@Test
@@ -779,6 +783,75 @@ public void testGetAllWithNodeDown() throws Exception {
}
}
+ /**
+ * Tests that getAll returns partial results
+ */
+ @Test
+ public void testPartialGetAll() throws Exception {
+ // create a store with rf=1 i.e disjoint partitions
+ StoreDefinition definition = new StoreDefinitionBuilder().setName("test")
+ .setType("foo")
+ .setKeySerializer(new SerializerDefinition("test"))
+ .setValueSerializer(new SerializerDefinition("test"))
+ .setRoutingPolicy(RoutingTier.CLIENT)
+ .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY)
+ .setReplicationFactor(1)
+ .setPreferredReads(1)
+ .setRequiredReads(1)
+ .setPreferredWrites(1)
+ .setRequiredWrites(1)
+ .build();
+
+ Map<Integer, Store<ByteArray, byte[], byte[]>> stores = new HashMap<Integer, Store<ByteArray, byte[], byte[]>>();
+ List<Node> nodes = new ArrayList<Node>();
+ // create nodes with varying speeds - 100ms, 200ms, 300ms
+ for(int i = 0; i < 3; i++) {
+ Store<ByteArray, byte[], byte[]> store = new SleepyStore<ByteArray, byte[], byte[]>(100 * (i + 1),
+ new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test"));
+ stores.put(i, store);
+ List<Integer> partitions = Arrays.asList(i);
+ nodes.add(new Node(i, "none", 0, 0, 0, partitions));
+ }
+ setFailureDetector(stores);
+
+ routedStoreThreadPool = Executors.newFixedThreadPool(3);
+
+ TimeoutConfig timeoutConfig = new TimeoutConfig(1500, true);
+ // This means, the getall will only succeed on two of the nodes
+ timeoutConfig.getAllTimeoutMs(250, TimeUnit.MILLISECONDS);
+ RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true,
+ routedStoreThreadPool,
+ timeoutConfig);
+
+ RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes),
+ definition,
+ stores,
+ true,
+ failureDetector);
+ /* do some puts so we have some data to test getalls */
+ Map<ByteArray, byte[]> expectedValues = Maps.newHashMap();
+ for(byte i = 1; i < 11; ++i) {
+ ByteArray key = new ByteArray(new byte[] { i });
+ byte[] value = new byte[] { (byte) (i + 50) };
+ routedStore.put(key, Versioned.value(value), null);
+ expectedValues.put(key, value);
+ }
+
+ /* 1. positive test; if partial is on, should get something back */
+ Map<ByteArray, List<Versioned<byte[]>>> all = routedStore.getAll(expectedValues.keySet(),
+ null);
+ assert (expectedValues.size() > all.size());
+
+ /* 2. negative test; if partial is off, should fail the whole operation */
+ timeoutConfig.setPartialGetAllAllowed(false);
+ try {
+ all = routedStore.getAll(expectedValues.keySet(), null);
+ fail("Should have failed");
+ } catch(Exception e) {
+
+ }
+ }
+
@Test
public void testGetAllWithFailingStore() throws Exception {
cluster = VoldemortTestConstants.getTwoNodeCluster();
@@ -802,7 +875,8 @@ public void testGetAllWithFailingStore() throws Exception {
routedStoreThreadPool = Executors.newFixedThreadPool(1);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- 1000L);
+ new TimeoutConfig(1000L,
+ false));
RoutedStore routedStore = routedStoreFactory.create(cluster,
storeDef,
@@ -858,7 +932,8 @@ public void testGetAllWithMorePreferredReadsThanNodes() throws Exception {
routedStoreThreadPool = Executors.newFixedThreadPool(1);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- 1000L);
+ new TimeoutConfig(1000L,
+ false));
RoutedStore routedStore = routedStoreFactory.create(cluster,
storeDef,
@@ -965,7 +1040,8 @@ public void testPutWithOneNodeDownAndOneNodeSlow() throws Exception {
routedStoreThreadPool = Executors.newFixedThreadPool(1);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- 1000L);
+ new TimeoutConfig(1000L,
+ false));
RoutedStore routedStore = routedStoreFactory.create(cluster,
storeDef,
@@ -1011,7 +1087,8 @@ public void testPutTimeout() throws Exception {
routedStoreThreadPool = Executors.newFixedThreadPool(3);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- timeout);
+ new TimeoutConfig(timeout,
+ false));
RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes),
definition,
@@ -1064,7 +1141,8 @@ public void testGetTimeout() throws Exception {
routedStoreThreadPool = Executors.newFixedThreadPool(3);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true,
routedStoreThreadPool,
- timeout);
+ new TimeoutConfig(timeout,
+ false));
RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes),
definition,
@@ -1082,6 +1160,62 @@ public void testGetTimeout() throws Exception {
}
}
+ @Test
+ public void testOperationSpecificTimeouts() throws Exception {
+ StoreDefinition definition = new StoreDefinitionBuilder().setName("test")
+ .setType("foo")
+ .setKeySerializer(new SerializerDefinition("test"))
+ .setValueSerializer(new SerializerDefinition("test"))
+ .setRoutingPolicy(RoutingTier.CLIENT)
+ .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY)
+ .setReplicationFactor(3)
+ .setPreferredReads(3)
+ .setRequiredReads(3)
+ .setPreferredWrites(3)
+ .setRequiredWrites(3)
+ .build();
+ Map<Integer, Store<ByteArray, byte[], byte[]>> stores = new HashMap<Integer, Store<ByteArray, byte[], byte[]>>();
+ List<Node> nodes = new ArrayList<Node>();
+ for(int i = 0; i < 3; i++) {
+ Store<ByteArray, byte[], byte[]> store = new SleepyStore<ByteArray, byte[], byte[]>(200,
+ new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test"));
+ stores.put(i, store);
+ List<Integer> partitions = Arrays.asList(i);
+ nodes.add(new Node(i, "none", 0, 0, 0, partitions));
+ }
+
+ setFailureDetector(stores);
+
+ routedStoreThreadPool = Executors.newFixedThreadPool(3);
+ // with a 500ms general timeout and a 100ms get timeout, only get should
+ // fail
+ TimeoutConfig timeoutConfig = new TimeoutConfig(1500, false);
+ timeoutConfig.getTimeoutMs(100, TimeUnit.MILLISECONDS);
+ RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true,
+ routedStoreThreadPool,
+ timeoutConfig);
+
+ RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes),
+ definition,
+ stores,
+ true,
+ failureDetector);
+ try {
+ routedStore.put(new ByteArray("test".getBytes()),
+ new Versioned<byte[]>(new byte[] { 1 }),
+ null);
+ } catch(InsufficientOperationalNodesException e) {
+ fail("Should not have failed");
+ }
+
+ try {
+ routedStore.get(new ByteArray("test".getBytes()), null);
+ fail("Should have thrown");
+ } catch(InsufficientOperationalNodesException e) {
+
+ }
+ }
+
/**
* See Issue #211: Unnecessary read repairs during getAll with more than one
* key
@@ -1113,7 +1247,8 @@ public void testNoReadRepair() throws Exception {
routedStoreThreadPool = Executors.newFixedThreadPool(1);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- 1000L);
+ new TimeoutConfig(1000L,
+ false));
RoutedStore routedStore = routedStoreFactory.create(cluster,
storeDef,
@@ -1164,7 +1299,8 @@ public void testTardyResponsesNotIncludedInResult() throws Exception {
routedStoreThreadPool = Executors.newFixedThreadPool(cluster.getNumberOfNodes());
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- 10000L);
+ new TimeoutConfig(10000L,
+ false));
RoutedStore routedStore = routedStoreFactory.create(cluster,
storeDef,
@@ -1176,7 +1312,7 @@ public void testTardyResponsesNotIncludedInResult() throws Exception {
routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- sleepTimeMs / 2);
+ new TimeoutConfig(sleepTimeMs / 2, false));
routedStore = routedStoreFactory.create(cluster, storeDef, subStores, true, failureDetector);
@@ -1218,7 +1354,8 @@ public void testSlowStoreDowngradesFromPreferredToRequired() throws Exception {
routedStoreThreadPool = Executors.newFixedThreadPool(cluster.getNumberOfNodes());
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- 10000L);
+ new TimeoutConfig(10000L,
+ false));
RoutedStore routedStore = routedStoreFactory.create(cluster,
storeDef,
@@ -1230,7 +1367,7 @@ public void testSlowStoreDowngradesFromPreferredToRequired() throws Exception {
routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled,
routedStoreThreadPool,
- sleepTimeMs / 2);
+ new TimeoutConfig(sleepTimeMs / 2, false));
routedStore = routedStoreFactory.create(cluster, storeDef, subStores, true, failureDetector);
@@ -1279,7 +1416,7 @@ public void testPutDeleteZoneRouting() throws Exception {
routedStoreThreadPool = Executors.newFixedThreadPool(8);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true,
routedStoreThreadPool,
- 60);
+ new TimeoutConfig(60, false));
Store<ByteArray, byte[], byte[]> s1 = routedStoreFactory.create(cluster,
storeDef,

0 comments on commit 10211c7

Please sign in to comment.