From 10211c71f35b358ffc38bdbf7d75d6f0cf1c1480 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Fri, 1 Jun 2012 14:48:26 -0700 Subject: [PATCH] finer timeouts and partial getalls. --- .../client/AbstractStoreClientFactory.java | 2 +- src/java/voldemort/client/ClientConfig.java | 60 ++++++- .../voldemort/server/VoldemortConfig.java | 25 +++ .../server/storage/StorageService.java | 2 +- .../store/routed/PipelineRoutedStore.java | 46 +++-- .../voldemort/store/routed/RoutedStore.java | 7 +- .../store/routed/RoutedStoreFactory.java | 11 +- .../store/routed/ThreadPoolRoutedStore.java | 31 +++- .../action/PerformParallelGetAllRequests.java | 2 +- .../action/PerformSerialGetAllRequests.java | 34 ++-- src/java/voldemort/utils/TimeoutConfig.java | 114 ++++++++++++ .../ClientConnectionStressTest.java | 39 ++-- .../RoutedStoreParallelismTest.java | 10 +- .../store/routed/HintedHandoffTest.java | 5 +- .../store/routed/ReadRepairerTest.java | 15 +- .../store/routed/RoutedStoreTest.java | 167 ++++++++++++++++-- 16 files changed, 474 insertions(+), 96 deletions(-) create mode 100644 src/java/voldemort/utils/TimeoutConfig.java diff --git a/src/java/voldemort/client/AbstractStoreClientFactory.java b/src/java/voldemort/client/AbstractStoreClientFactory.java index ab0c7d998e..bf523cfdc6 100644 --- a/src/java/voldemort/client/AbstractStoreClientFactory.java +++ b/src/java/voldemort/client/AbstractStoreClientFactory.java @@ -114,7 +114,7 @@ public AbstractStoreClientFactory(ClientConfig config) { this.clientZoneId = config.getClientZoneId(); this.routedStoreFactory = new RoutedStoreFactory(config.isPipelineRoutedStoreEnabled(), threadPool, - config.getRoutingTimeout(TimeUnit.MILLISECONDS)); + config.getTimeoutConfig()); if(this.isJmxEnabled) { JmxUtils.registerMbean(threadPool, diff --git a/src/java/voldemort/client/ClientConfig.java b/src/java/voldemort/client/ClientConfig.java index f74f31c04d..19ab3bde3a 100644 --- a/src/java/voldemort/client/ClientConfig.java +++ b/src/java/voldemort/client/ClientConfig.java @@ -36,6 +36,7 @@ import voldemort.utils.ConfigurationException; import voldemort.utils.Props; import voldemort.utils.ReflectUtils; +import voldemort.utils.TimeoutConfig; import voldemort.utils.Utils; /** @@ -55,6 +56,7 @@ public class ClientConfig { private volatile boolean socketKeepAlive = false; private volatile int selectors = 8; private volatile long routingTimeoutMs = 15000; + private volatile TimeoutConfig timeoutConfig = new TimeoutConfig(routingTimeoutMs, false); private volatile int socketBufferSize = 64 * 1024; private volatile SerializerFactory serializerFactory = new DefaultSerializerFactory(); private volatile List bootstrapUrls = null; @@ -91,6 +93,12 @@ public ClientConfig() {} public static final String SOCKET_KEEPALIVE_PROPERTY = "socket_keepalive"; public static final String SELECTORS_PROPERTY = "selectors"; public static final String ROUTING_TIMEOUT_MS_PROPERTY = "routing_timeout_ms"; + public static final String GETALL_ROUTING_TIMEOUT_MS_PROPERTY = "getall_routing_timeout_ms"; + public static final String PUT_ROUTING_TIMEOUT_MS_PROPERTY = "put_routing_timeout_ms"; + public static final String GET_ROUTING_TIMEOUT_MS_PROPERTY = "get_routing_timeout_ms"; + public static final String GET_VERSIONS_ROUTING_TIMEOUT_MS_PROPERTY = "getversions_routing_timeout_ms"; + public static final String DELETE_ROUTING_TIMEOUT_MS_PROPERTY = "delete_routing_timeout_ms"; + public static final String ALLOW_PARTIAL_GETALLS_PROPERTY = "allow_partial_getalls"; public static final String NODE_BANNAGE_MS_PROPERTY = "node_bannage_ms"; public static final String SOCKET_BUFFER_SIZE_PROPERTY = "socket_buffer_size"; public static final String SERIALIZER_FACTORY_CLASS_PROPERTY = "serializer_factory_class"; @@ -174,6 +182,36 @@ private void setProperties(Properties properties) { if(props.containsKey(ROUTING_TIMEOUT_MS_PROPERTY)) this.setRoutingTimeout(props.getInt(ROUTING_TIMEOUT_MS_PROPERTY), TimeUnit.MILLISECONDS); + // By default, make all the timeouts equal to routing timeout + timeoutConfig = new TimeoutConfig(routingTimeoutMs, false); + + if(props.containsKey(GETALL_ROUTING_TIMEOUT_MS_PROPERTY)) + timeoutConfig.getAllTimeoutMs(props.getInt(GETALL_ROUTING_TIMEOUT_MS_PROPERTY), + TimeUnit.MILLISECONDS); + + if(props.containsKey(GET_ROUTING_TIMEOUT_MS_PROPERTY)) + timeoutConfig.getTimeoutMs(props.getInt(GET_ROUTING_TIMEOUT_MS_PROPERTY), + TimeUnit.MILLISECONDS); + + if(props.containsKey(PUT_ROUTING_TIMEOUT_MS_PROPERTY)) { + long putTimeoutMs = props.getInt(PUT_ROUTING_TIMEOUT_MS_PROPERTY); + timeoutConfig.putTimeoutMs(putTimeoutMs, TimeUnit.MILLISECONDS); + // By default, use the same thing for getVersions() also + timeoutConfig.getVersionsTimeoutMs(putTimeoutMs, TimeUnit.MILLISECONDS); + } + + // of course, if someone overrides it, we will respect that + if(props.containsKey(GET_VERSIONS_ROUTING_TIMEOUT_MS_PROPERTY)) + timeoutConfig.getVersionsTimeoutMs(props.getInt(GET_VERSIONS_ROUTING_TIMEOUT_MS_PROPERTY), + TimeUnit.MILLISECONDS); + + if(props.containsKey(DELETE_ROUTING_TIMEOUT_MS_PROPERTY)) + timeoutConfig.deleteTimeoutMs(props.getInt(DELETE_ROUTING_TIMEOUT_MS_PROPERTY), + TimeUnit.MILLISECONDS); + + if(props.containsKey(ALLOW_PARTIAL_GETALLS_PROPERTY)) + timeoutConfig.setPartialGetAllAllowed(props.getBoolean(ALLOW_PARTIAL_GETALLS_PROPERTY)); + if(props.containsKey(SOCKET_BUFFER_SIZE_PROPERTY)) this.setSocketBufferSize(props.getInt(SOCKET_BUFFER_SIZE_PROPERTY)); @@ -324,6 +362,26 @@ public ClientConfig setRoutingTimeout(int routingTimeout, TimeUnit unit) { return this; } + /** + * Set the timeout configuration for the voldemort operations + * + * @param tConfig + * @return + */ + public ClientConfig setTimeoutConfig(TimeoutConfig tConfig) { + this.timeoutConfig = tConfig; + return this; + } + + /** + * Get the timeouts for voldemort operations + * + * @return + */ + public TimeoutConfig getTimeoutConfig() { + return timeoutConfig; + } + /** * @deprecated Use {@link #getFailureDetectorBannagePeriod()} instead */ @@ -517,7 +575,7 @@ public boolean isLazyEnabled() { /** * Enable lazy initialization of clients? - * + * * @param enableLazy If true clients will be lazily initialized */ public ClientConfig setEnableLazy(boolean enableLazy) { diff --git a/src/java/voldemort/server/VoldemortConfig.java b/src/java/voldemort/server/VoldemortConfig.java index 119c37040c..ae1991d6fa 100644 --- a/src/java/voldemort/server/VoldemortConfig.java +++ b/src/java/voldemort/server/VoldemortConfig.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; import voldemort.client.protocol.RequestFormatType; import voldemort.cluster.failuredetector.FailureDetectorConfig; @@ -34,6 +35,7 @@ import voldemort.utils.ConfigurationException; import voldemort.utils.Props; import voldemort.utils.Time; +import voldemort.utils.TimeoutConfig; import voldemort.utils.UndefinedPropertyException; import voldemort.utils.Utils; @@ -112,6 +114,7 @@ public class VoldemortConfig implements Serializable { private int clientSelectors; private int clientRoutingTimeoutMs; + private TimeoutConfig clientTimeoutConfig; private int clientMaxConnectionsPerNode; private int clientConnectionTimeoutMs; private int clientMaxThreads; @@ -275,6 +278,24 @@ public VoldemortConfig(Props props) { this.clientMaxConnectionsPerNode = props.getInt("client.max.connections.per.node", 50); this.clientConnectionTimeoutMs = props.getInt("client.connection.timeout.ms", 500); this.clientRoutingTimeoutMs = props.getInt("client.routing.timeout.ms", 15000); + this.clientTimeoutConfig = new TimeoutConfig(this.clientRoutingTimeoutMs, false); + this.clientTimeoutConfig.getTimeoutMs(props.getInt("client.routing.get.timeout.ms", + this.clientRoutingTimeoutMs), + TimeUnit.MILLISECONDS); + this.clientTimeoutConfig.getAllTimeoutMs(props.getInt("client.routing.getall.timeout.ms", + this.clientRoutingTimeoutMs), + TimeUnit.MILLISECONDS); + this.clientTimeoutConfig.putTimeoutMs(props.getInt("client.routing.put.timeout.ms", + this.clientRoutingTimeoutMs), + TimeUnit.MILLISECONDS); + this.clientTimeoutConfig.getVersionsTimeoutMs(props.getLong("client.routing.getversions.timeout.ms", + this.clientTimeoutConfig.putTimeoutMs()), + TimeUnit.MILLISECONDS); + this.clientTimeoutConfig.deleteTimeoutMs(props.getInt("client.routing.delete.timeout.ms", + this.clientRoutingTimeoutMs), + TimeUnit.MILLISECONDS); + this.clientTimeoutConfig.setPartialGetAllAllowed(props.getBoolean("client.routing.allow.partial.getall", + false)); this.clientMaxThreads = props.getInt("client.max.threads", 500); this.clientThreadIdleMs = props.getInt("client.thread.idle.ms", 100000); this.clientMaxQueuedRequests = props.getInt("client.max.queued.requests", 1000); @@ -997,6 +1018,10 @@ public void setClientRoutingTimeoutMs(int routingTimeoutMs) { this.clientRoutingTimeoutMs = routingTimeoutMs; } + public TimeoutConfig getTimeoutConfig() { + return this.clientTimeoutConfig; + } + public int getClientMaxConnectionsPerNode() { return clientMaxConnectionsPerNode; } diff --git a/src/java/voldemort/server/storage/StorageService.java b/src/java/voldemort/server/storage/StorageService.java index 5a26c3e86d..8bb3919c6f 100644 --- a/src/java/voldemort/server/storage/StorageService.java +++ b/src/java/voldemort/server/storage/StorageService.java @@ -155,7 +155,7 @@ public StorageService(StoreRepository storeRepository, this.storeStats = new StoreStats(); this.routedStoreFactory = new RoutedStoreFactory(voldemortConfig.isPipelineRoutedStoreEnabled(), this.clientThreadPool, - voldemortConfig.getClientRoutingTimeoutMs()); + voldemortConfig.getTimeoutConfig()); /* * Initialize the dynamic throttle limit based on the per node limit diff --git a/src/java/voldemort/store/routed/PipelineRoutedStore.java b/src/java/voldemort/store/routed/PipelineRoutedStore.java index c5aa1ba06b..9ed094c03c 100644 --- a/src/java/voldemort/store/routed/PipelineRoutedStore.java +++ b/src/java/voldemort/store/routed/PipelineRoutedStore.java @@ -58,6 +58,7 @@ import voldemort.utils.ByteUtils; import voldemort.utils.JmxUtils; import voldemort.utils.SystemTime; +import voldemort.utils.TimeoutConfig; import voldemort.versioning.Version; import voldemort.versioning.Versioned; @@ -102,7 +103,7 @@ public PipelineRoutedStore(String name, StoreDefinition storeDef, boolean repairReads, int clientZoneId, - long timeoutMs, + TimeoutConfig timeoutConfig, FailureDetector failureDetector, boolean jmxEnabled) { super(name, @@ -110,7 +111,7 @@ public PipelineRoutedStore(String name, cluster, storeDef, repairReads, - timeoutMs, + timeoutConfig, failureDetector, SystemTime.INSTANCE); this.nonblockingSlopStores = nonblockingSlopStores; @@ -149,7 +150,9 @@ public List> get(final ByteArray key, final byte[] transforms) pipelineData.setZonesRequired(null); pipelineData.setStats(stats); - final Pipeline pipeline = new Pipeline(Operation.GET, timeoutMs, TimeUnit.MILLISECONDS); + final Pipeline pipeline = new Pipeline(Operation.GET, + timeoutConfig.getTimeoutMs(), + TimeUnit.MILLISECONDS); boolean allowReadRepair = repairReads && transforms == null; StoreRequest>> blockingStoreRequest = new StoreRequest>>() { @@ -177,7 +180,7 @@ public List> request(Store store) { failureDetector, storeDef.getPreferredReads(), storeDef.getRequiredReads(), - timeoutMs, + timeoutConfig.getTimeoutMs(), nonblockingStores, Event.INSUFFICIENT_SUCCESSES, Event.INSUFFICIENT_ZONES)); @@ -198,7 +201,7 @@ public List> request(Store store) { new ReadRepair>>>(pipelineData, Event.COMPLETED, storeDef.getPreferredReads(), - timeoutMs, + timeoutConfig.getTimeoutMs(), nonblockingStores, readRepairer)); @@ -255,7 +258,9 @@ public Map>> getAll(Iterable keys, pipelineData.setZonesRequired(null); pipelineData.setStats(stats); - Pipeline pipeline = new Pipeline(Operation.GET_ALL, timeoutMs, TimeUnit.MILLISECONDS); + Pipeline pipeline = new Pipeline(Operation.GET_ALL, + timeoutConfig.getAllTimeoutMs(), + TimeUnit.MILLISECONDS); pipeline.addEventAction(Event.STARTED, new GetAllConfigureNodes(pipelineData, Event.CONFIGURED, @@ -270,7 +275,7 @@ public Map>> getAll(Iterable keys, new PerformParallelGetAllRequests(pipelineData, Event.INSUFFICIENT_SUCCESSES, failureDetector, - timeoutMs, + timeoutConfig.getAllTimeoutMs(), nonblockingStores)); pipeline.addEventAction(Event.INSUFFICIENT_SUCCESSES, new PerformSerialGetAllRequests(pipelineData, @@ -280,14 +285,15 @@ public Map>> getAll(Iterable keys, failureDetector, innerStores, storeDef.getPreferredReads(), - storeDef.getRequiredReads())); + storeDef.getRequiredReads(), + timeoutConfig.isPartialGetAllAllowed())); if(allowReadRepair) pipeline.addEventAction(Event.RESPONSES_RECEIVED, new GetAllReadRepair(pipelineData, Event.COMPLETED, storeDef.getPreferredReads(), - timeoutMs, + timeoutConfig.getAllTimeoutMs(), nonblockingStores, readRepairer)); @@ -323,7 +329,9 @@ public List getVersions(final ByteArray key) { else pipelineData.setZonesRequired(null); pipelineData.setStats(stats); - Pipeline pipeline = new Pipeline(Operation.GET_VERSIONS, timeoutMs, TimeUnit.MILLISECONDS); + Pipeline pipeline = new Pipeline(Operation.GET_VERSIONS, + timeoutConfig.getVersionsTimeoutMs(), + TimeUnit.MILLISECONDS); StoreRequest> blockingStoreRequest = new StoreRequest>() { @@ -349,7 +357,7 @@ public List request(Store store) { failureDetector, storeDef.getPreferredReads(), storeDef.getRequiredReads(), - timeoutMs, + timeoutConfig.getVersionsTimeoutMs(), nonblockingStores, Event.INSUFFICIENT_SUCCESSES, Event.INSUFFICIENT_ZONES)); @@ -408,7 +416,9 @@ public boolean delete(final ByteArray key, final Version version) throws Voldemo pipelineData.setStoreName(name); pipelineData.setStats(stats); - Pipeline pipeline = new Pipeline(Operation.DELETE, timeoutMs, TimeUnit.MILLISECONDS); + Pipeline pipeline = new Pipeline(Operation.DELETE, + timeoutConfig.deleteTimeoutMs(), + TimeUnit.MILLISECONDS); pipeline.setEnableHintedHandoff(isHintedHandoffEnabled()); HintedHandoff hintedHandoff = null; @@ -419,7 +429,7 @@ public boolean delete(final ByteArray key, final Version version) throws Voldemo nonblockingSlopStores, handoffStrategy, pipelineData.getFailedNodes(), - timeoutMs); + timeoutConfig.deleteTimeoutMs()); pipeline.addEventAction(Event.STARTED, new ConfigureNodes>(pipelineData, @@ -437,7 +447,7 @@ public boolean delete(final ByteArray key, final Version version) throws Voldemo failureDetector, storeDef.getPreferredWrites(), storeDef.getRequiredWrites(), - timeoutMs, + timeoutConfig.deleteTimeoutMs(), nonblockingStores, hintedHandoff, version)); @@ -496,7 +506,9 @@ public void put(ByteArray key, Versioned versioned, byte[] transforms) pipelineData.setStoreName(name); pipelineData.setStats(stats); - Pipeline pipeline = new Pipeline(Operation.PUT, timeoutMs, TimeUnit.MILLISECONDS); + Pipeline pipeline = new Pipeline(Operation.PUT, + timeoutConfig.putTimeoutMs(), + TimeUnit.MILLISECONDS); pipeline.setEnableHintedHandoff(isHintedHandoffEnabled()); HintedHandoff hintedHandoff = null; @@ -507,7 +519,7 @@ public void put(ByteArray key, Versioned versioned, byte[] transforms) nonblockingSlopStores, handoffStrategy, pipelineData.getFailedNodes(), - timeoutMs); + timeoutConfig.putTimeoutMs()); pipeline.addEventAction(Event.STARTED, new ConfigureNodes(pipelineData, @@ -537,7 +549,7 @@ public void put(ByteArray key, Versioned versioned, byte[] transforms) failureDetector, storeDef.getPreferredWrites(), storeDef.getRequiredWrites(), - timeoutMs, + timeoutConfig.putTimeoutMs(), nonblockingStores, hintedHandoff)); if(isHintedHandoffEnabled()) { diff --git a/src/java/voldemort/store/routed/RoutedStore.java b/src/java/voldemort/store/routed/RoutedStore.java index 613f195bf8..ad2eb4a14d 100644 --- a/src/java/voldemort/store/routed/RoutedStore.java +++ b/src/java/voldemort/store/routed/RoutedStore.java @@ -32,6 +32,7 @@ import voldemort.store.StoreDefinition; import voldemort.utils.ByteArray; import voldemort.utils.Time; +import voldemort.utils.TimeoutConfig; import voldemort.utils.Utils; /** @@ -45,7 +46,7 @@ public abstract class RoutedStore implements Store { protected final Map> innerStores; protected final boolean repairReads; protected final ReadRepairer readRepairer; - protected final long timeoutMs; + protected final TimeoutConfig timeoutConfig; protected final Time time; protected final StoreDefinition storeDef; protected final FailureDetector failureDetector; @@ -57,7 +58,7 @@ protected RoutedStore(String name, Cluster cluster, StoreDefinition storeDef, boolean repairReads, - long timeoutMs, + TimeoutConfig timeoutConfig, FailureDetector failureDetector, Time time) { if(storeDef.getRequiredReads() < 1) @@ -77,7 +78,7 @@ protected RoutedStore(String name, this.innerStores = new ConcurrentHashMap>(innerStores); this.repairReads = repairReads; this.readRepairer = new ReadRepairer(); - this.timeoutMs = timeoutMs; + this.timeoutConfig = timeoutConfig; this.time = Utils.notNull(time); this.storeDef = storeDef; this.failureDetector = failureDetector; diff --git a/src/java/voldemort/store/routed/RoutedStoreFactory.java b/src/java/voldemort/store/routed/RoutedStoreFactory.java index 0a608bac4d..270e959fa0 100644 --- a/src/java/voldemort/store/routed/RoutedStoreFactory.java +++ b/src/java/voldemort/store/routed/RoutedStoreFactory.java @@ -18,6 +18,7 @@ import voldemort.store.slop.Slop; import voldemort.utils.ByteArray; import voldemort.utils.SystemTime; +import voldemort.utils.TimeoutConfig; import com.google.common.collect.Maps; @@ -27,16 +28,16 @@ public class RoutedStoreFactory { private final ExecutorService threadPool; - private final long routingTimeoutMs; + private final TimeoutConfig timeoutConfig; private final Logger logger = Logger.getLogger(getClass()); public RoutedStoreFactory(boolean isPipelineRoutedStoreEnabled, ExecutorService threadPool, - long routingTimeoutMs) { + TimeoutConfig timeoutConfig) { this.isPipelineRoutedStoreEnabled = isPipelineRoutedStoreEnabled; this.threadPool = threadPool; - this.routingTimeoutMs = routingTimeoutMs; + this.timeoutConfig = timeoutConfig; } public NonblockingStore toNonblockingStore(Store store) { @@ -90,7 +91,7 @@ public RoutedStore create(Cluster cluster, storeDefinition, repairReads, clientZoneId, - routingTimeoutMs, + timeoutConfig, failureDetector, jmxEnabled); } else { @@ -111,7 +112,7 @@ public RoutedStore create(Cluster cluster, storeDefinition, repairReads, threadPool, - routingTimeoutMs, + timeoutConfig, failureDetector, SystemTime.INSTANCE); } diff --git a/src/java/voldemort/store/routed/ThreadPoolRoutedStore.java b/src/java/voldemort/store/routed/ThreadPoolRoutedStore.java index 9fec120cd2..5a8a667dff 100644 --- a/src/java/voldemort/store/routed/ThreadPoolRoutedStore.java +++ b/src/java/voldemort/store/routed/ThreadPoolRoutedStore.java @@ -48,6 +48,7 @@ import voldemort.utils.ByteUtils; import voldemort.utils.SystemTime; import voldemort.utils.Time; +import voldemort.utils.TimeoutConfig; import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.VectorClock; import voldemort.versioning.Version; @@ -102,7 +103,7 @@ public ThreadPoolRoutedStore(String name, StoreDefinition storeDef, int numberOfThreads, boolean repairReads, - long timeoutMs, + TimeoutConfig timeoutConfig, FailureDetector failureDetector) { this(name, innerStores, @@ -110,7 +111,7 @@ public ThreadPoolRoutedStore(String name, storeDef, repairReads, Executors.newFixedThreadPool(numberOfThreads), - timeoutMs, + timeoutConfig, failureDetector, SystemTime.INSTANCE); } @@ -134,10 +135,17 @@ public ThreadPoolRoutedStore(String name, StoreDefinition storeDef, boolean repairReads, ExecutorService threadPool, - long timeoutMs, + TimeoutConfig timeoutConfig, FailureDetector failureDetector, Time time) { - super(name, innerStores, cluster, storeDef, repairReads, timeoutMs, failureDetector, time); + super(name, + innerStores, + cluster, + storeDef, + repairReads, + timeoutConfig, + failureDetector, + time); this.executor = threadPool; } @@ -184,7 +192,8 @@ public void run() { } catch(Exception e) { failures.add(e); logger.warn("Error in DELETE on node " + node.getId() + "(" - + node.getHost() + ")", e); + + node.getHost() + ")", + e); } finally { // signal that the operation is complete semaphore.release(); @@ -199,6 +208,7 @@ public void run() { } else { for(int i = 0; i < numNodes; i++) { try { + long timeoutMs = timeoutConfig.deleteTimeoutMs(); boolean acquired = semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS); if(!acquired) logger.warn("Delete operation timed out waiting for operation " + i @@ -292,6 +302,7 @@ public Map>> getAll(Iterable keys, keyToSuccessCount.put(key, new MutableInt(0)); List> futures; + long timeoutMs = timeoutConfig.getAllTimeoutMs(); try { // TODO What to do about timeouts? They should be longer as getAll // is likely to @@ -377,7 +388,8 @@ public Map>> getAll(Iterable keys, throw e; } catch(Exception e) { logger.warn("Error in GET_ALL on node " + node.getId() + "(" - + node.getHost() + ")", e); + + node.getHost() + ")", + e); failures.add(e); } } @@ -453,6 +465,8 @@ private List get(final ByteArray key, } List>> futures; + long timeoutMs = (fetcher == VERSION_OP) ? timeoutConfig.getVersionsTimeoutMs() + : timeoutConfig.getTimeoutMs(); try { futures = executor.invokeAll(callables, timeoutMs, TimeUnit.MILLISECONDS); } catch(InterruptedException e) { @@ -498,8 +512,7 @@ private List get(final ByteArray key, key, fetcher.execute(innerStores.get(node.getId()), key, - transforms), - null)); + transforms), null)); ++successes; recordSuccess(node, startNs); } catch(UnreachableStoreException e) { @@ -760,7 +773,7 @@ private boolean blockOnPut(long startNs, for(int i = startingIndex; i < blockCount; i++) { try { long ellapsedNs = System.nanoTime() - startNs; - long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs; + long remainingNs = (timeoutConfig.putTimeoutMs() * Time.NS_PER_MS) - ellapsedNs; boolean acquiredPermit = semaphore.tryAcquire(Math.max(remainingNs, 0), TimeUnit.NANOSECONDS); if(!acquiredPermit) { diff --git a/src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java b/src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java index a6bc581a7d..444fdd930e 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java @@ -119,7 +119,7 @@ public void requestComplete(Object result, long requestTime) { } try { - latch.await(timeoutMs * 3, TimeUnit.MILLISECONDS); + latch.await(timeoutMs, TimeUnit.MILLISECONDS); } catch(InterruptedException e) { if(logger.isEnabledFor(Level.WARN)) logger.warn(e, e); diff --git a/src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java b/src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java index bc8cf957e6..059545e90b 100644 --- a/src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java +++ b/src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java @@ -52,19 +52,23 @@ public class PerformSerialGetAllRequests private final int required; + private final boolean allowPartial; + public PerformSerialGetAllRequests(GetAllPipelineData pipelineData, Event completeEvent, Iterable keys, FailureDetector failureDetector, Map> stores, int preferred, - int required) { + int required, + boolean allowPartial) { super(pipelineData, completeEvent); this.keys = keys; this.failureDetector = failureDetector; this.stores = stores; this.preferred = preferred; this.required = required; + this.allowPartial = allowPartial; } public void execute(Pipeline pipeline) { @@ -156,17 +160,23 @@ public void execute(Pipeline pipeline) { MutableInt successCount = pipelineData.getSuccessCount(key); if(successCount.intValue() < required) { - pipelineData.setFatalError(new InsufficientOperationalNodesException(required - + " " - + pipeline.getOperation() - .getSimpleName() - + "s required, but " - + successCount.intValue() - + " succeeded. Failing nodes : " - + pipelineData.getFailedNodes(), - pipelineData.getFailures())); - pipeline.addEvent(Event.ERROR); - return; + // if we allow partial results, then just remove keys that did + // not meet 'required' guarantee; else raise error + if(allowPartial) { + result.remove(key); + } else { + pipelineData.setFatalError(new InsufficientOperationalNodesException(required + + " " + + pipeline.getOperation() + .getSimpleName() + + "s required, but " + + successCount.intValue() + + " succeeded. Failing nodes : " + + pipelineData.getFailedNodes(), + pipelineData.getFailures())); + pipeline.addEvent(Event.ERROR); + return; + } } } diff --git a/src/java/voldemort/utils/TimeoutConfig.java b/src/java/voldemort/utils/TimeoutConfig.java new file mode 100644 index 0000000000..1752d1eab2 --- /dev/null +++ b/src/java/voldemort/utils/TimeoutConfig.java @@ -0,0 +1,114 @@ +package voldemort.utils; + +import java.util.concurrent.TimeUnit; + +/** + * Encapsulates the timeouts for various voldemort operations + * + */ +public class TimeoutConfig { + + private long getTimeoutMs; + + private long putTimeoutMs; + + private long getAllTimeoutMs; + + private long deleteTimeoutMs; + + private long getVersionsTimeoutMs; + + private boolean partialGetAllAllowed; + + public TimeoutConfig(long globalTimeout, boolean allowPartialGetAlls) { + this(globalTimeout, + globalTimeout, + globalTimeout, + globalTimeout, + globalTimeout, + allowPartialGetAlls); + } + + public TimeoutConfig(long getTimeout, + long putTimeout, + long deleteTimeout, + long getAllTimeout, + long getVersionsTimeout, + boolean allowPartialGetAlls) { + getTimeoutMs(getTimeout, TimeUnit.MILLISECONDS); + putTimeoutMs(putTimeout, TimeUnit.MILLISECONDS); + deleteTimeoutMs(deleteTimeout, TimeUnit.MILLISECONDS); + getAllTimeoutMs(getAllTimeout, TimeUnit.MILLISECONDS); + getVersionsTimeoutMs(getVersionsTimeout, TimeUnit.MILLISECONDS); + setPartialGetAllAllowed(allowPartialGetAlls); + } + + public long getTimeoutMs(TimeUnit unit) { + return unit.convert(getTimeoutMs, TimeUnit.MILLISECONDS); + } + + public long getTimeoutMs() { + return getTimeoutMs; + } + + public void getTimeoutMs(long getTimeoutMs, TimeUnit unit) { + this.getTimeoutMs = unit.toMillis(getTimeoutMs); + } + + public long getVersionsTimeoutMs(TimeUnit unit) { + return unit.convert(getVersionsTimeoutMs, TimeUnit.MILLISECONDS); + } + + public long getVersionsTimeoutMs() { + return getVersionsTimeoutMs; + } + + public void getVersionsTimeoutMs(long getTimeoutMs, TimeUnit unit) { + this.getVersionsTimeoutMs = unit.toMillis(getTimeoutMs); + } + + public long putTimeoutMs(TimeUnit unit) { + return unit.convert(putTimeoutMs, TimeUnit.MILLISECONDS); + } + + public long putTimeoutMs() { + return putTimeoutMs; + } + + public void putTimeoutMs(long putTimeoutMs, TimeUnit unit) { + this.putTimeoutMs = unit.toMillis(putTimeoutMs); + } + + public long getAllTimeoutMs(TimeUnit unit) { + return unit.convert(getAllTimeoutMs, TimeUnit.MILLISECONDS); + } + + public long getAllTimeoutMs() { + return getAllTimeoutMs; + } + + public void getAllTimeoutMs(long getAllTimeoutMs, TimeUnit unit) { + this.getAllTimeoutMs = unit.toMillis(getAllTimeoutMs); + } + + public long deleteTimeoutMs(TimeUnit unit) { + return unit.convert(deleteTimeoutMs, TimeUnit.MILLISECONDS); + } + + public long deleteTimeoutMs() { + return deleteTimeoutMs; + } + + public void deleteTimeoutMs(long deleteTimeoutMs, TimeUnit unit) { + this.deleteTimeoutMs = unit.toMillis(deleteTimeoutMs); + } + + public boolean isPartialGetAllAllowed() { + return partialGetAllAllowed; + } + + public void setPartialGetAllAllowed(boolean allowPartialGetAlls) { + this.partialGetAllAllowed = allowPartialGetAlls; + } + +} diff --git a/test/integration/voldemort/performance/ClientConnectionStressTest.java b/test/integration/voldemort/performance/ClientConnectionStressTest.java index a26beb984e..6bf7e23ada 100644 --- a/test/integration/voldemort/performance/ClientConnectionStressTest.java +++ b/test/integration/voldemort/performance/ClientConnectionStressTest.java @@ -1,12 +1,12 @@ /* * Copyright 2008-2010 LinkedIn, Inc - * + * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -16,6 +16,12 @@ package voldemort.performance; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import joptsimple.OptionParser; import joptsimple.OptionSet; import voldemort.client.ClientConfig; @@ -23,12 +29,7 @@ import voldemort.client.StoreClient; import voldemort.client.StoreClientFactory; import voldemort.utils.CmdUtils; - -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import voldemort.utils.TimeoutConfig; /** * Stress tests the client. Intended to diagnose issues such as connection leaks @@ -92,8 +93,6 @@ public void run() { executor.shutdown(); } - - public static void main(String[] args) throws Exception { OptionParser parser = new OptionParser(); @@ -112,18 +111,14 @@ public static void main(String[] args) throws Exception { parser.accepts(MAX_CONNECTIONS_TOTAL, "Max total connections") .withRequiredArg() .ofType(Integer.class); - parser.accepts(MAX_THREADS, "Max threads") - .withRequiredArg() - .ofType(Integer.class); + parser.accepts(MAX_THREADS, "Max threads").withRequiredArg().ofType(Integer.class); parser.accepts(SELECTORS, "Number of NIO selectors") .withRequiredArg() .ofType(Integer.class); parser.accepts(SOCKET_BUFFER_SIZE, "Socket buffer size") - .withRequiredArg() - .ofType(Integer.class); - parser.accepts(REQS, "Requests per session") .withRequiredArg() .ofType(Integer.class); + parser.accepts(REQS, "Requests per session").withRequiredArg().ofType(Integer.class); parser.accepts(CONNECTIONS, "Total connections to make") .withRequiredArg() .ofType(Integer.class); @@ -144,11 +139,15 @@ public static void main(String[] args) throws Exception { ClientConfig config = new ClientConfig(); if(options.has(CONNECTION_TIMEOUT)) - config.setConnectionTimeout((Integer) options.valueOf(CONNECTION_TIMEOUT), TimeUnit.MILLISECONDS); + config.setConnectionTimeout((Integer) options.valueOf(CONNECTION_TIMEOUT), + TimeUnit.MILLISECONDS); if(options.has(ROUTING_TIMEOUT)) - config.setRoutingTimeout((Integer) options.valueOf(ROUTING_TIMEOUT), TimeUnit.MILLISECONDS); + config.setTimeoutConfig(new TimeoutConfig(TimeUnit.MILLISECONDS.toMillis((Integer) options.valueOf(ROUTING_TIMEOUT)), + false)); + if(options.has(SOCKET_TIMEOUT)) - config.setSocketTimeout((Integer) options.valueOf(SOCKET_TIMEOUT), TimeUnit.MILLISECONDS); + config.setSocketTimeout((Integer) options.valueOf(SOCKET_TIMEOUT), + TimeUnit.MILLISECONDS); if(options.has(MAX_CONNECTIONS)) config.setMaxConnectionsPerNode((Integer) options.valueOf(MAX_CONNECTIONS)); if(options.has(MAX_THREADS)) diff --git a/test/integration/voldemort/performance/RoutedStoreParallelismTest.java b/test/integration/voldemort/performance/RoutedStoreParallelismTest.java index 0bbbddb76b..0397416613 100644 --- a/test/integration/voldemort/performance/RoutedStoreParallelismTest.java +++ b/test/integration/voldemort/performance/RoutedStoreParallelismTest.java @@ -28,7 +28,6 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; -import voldemort.cluster.failuredetector.MutableStoreVerifier; import voldemort.ServerTestUtils; import voldemort.TestUtils; import voldemort.VoldemortException; @@ -39,6 +38,7 @@ import voldemort.cluster.failuredetector.FailureDetector; import voldemort.cluster.failuredetector.FailureDetectorConfig; import voldemort.cluster.failuredetector.FailureDetectorUtils; +import voldemort.cluster.failuredetector.MutableStoreVerifier; import voldemort.server.StoreRepository; import voldemort.server.VoldemortConfig; import voldemort.server.VoldemortServer; @@ -101,7 +101,9 @@ public static void main(String[] args) throws Throwable { .ofType(Integer.class); parser.accepts("num-clients", "The number of threads to make requests concurrently Default = " - + DEFAULT_NUM_CLIENTS).withRequiredArg().ofType(Integer.class); + + DEFAULT_NUM_CLIENTS) + .withRequiredArg() + .ofType(Integer.class); parser.accepts("routed-store-type", "Type of routed store, either \"" + THREAD_POOL_ROUTED_STORE + "\" or \"" + PIPELINE_ROUTED_STORE + "\" Default = " @@ -201,7 +203,7 @@ public static void main(String[] args) throws Throwable { RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(routedStoreType.trim() .equalsIgnoreCase(PIPELINE_ROUTED_STORE), routedStoreThreadPool, - clientConfig.getRoutingTimeout(TimeUnit.MILLISECONDS)); + clientConfig.getTimeoutConfig()); final RoutedStore routedStore = routedStoreFactory.create(cluster, storeDefinition, @@ -223,7 +225,7 @@ public void run() { try { routedStore.get(key, null); } catch(VoldemortException e) { - // + // } } } diff --git a/test/unit/voldemort/store/routed/HintedHandoffTest.java b/test/unit/voldemort/store/routed/HintedHandoffTest.java index 64d1662192..653ccb2d70 100644 --- a/test/unit/voldemort/store/routed/HintedHandoffTest.java +++ b/test/unit/voldemort/store/routed/HintedHandoffTest.java @@ -61,6 +61,7 @@ import voldemort.store.slop.strategy.HintedHandoffStrategyType; import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; +import voldemort.utils.TimeoutConfig; import voldemort.versioning.Version; import voldemort.versioning.Versioned; @@ -168,7 +169,9 @@ public void setUp() throws Exception { setFailureDetector(subStores); routedStoreThreadPool = Executors.newFixedThreadPool(NUM_THREADS); - routedStoreFactory = new RoutedStoreFactory(true, routedStoreThreadPool, 1500L); + routedStoreFactory = new RoutedStoreFactory(true, + routedStoreThreadPool, + new TimeoutConfig(1500L, false)); strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, cluster); Map nonblockingSlopStores = Maps.newHashMap(); diff --git a/test/unit/voldemort/store/routed/ReadRepairerTest.java b/test/unit/voldemort/store/routed/ReadRepairerTest.java index 3c0958f09b..7adfde569c 100644 --- a/test/unit/voldemort/store/routed/ReadRepairerTest.java +++ b/test/unit/voldemort/store/routed/ReadRepairerTest.java @@ -21,9 +21,9 @@ import static org.junit.Assert.assertEquals; import static voldemort.FailureDetectorTestUtils.recordException; import static voldemort.FailureDetectorTestUtils.recordSuccess; -import static voldemort.cluster.failuredetector.MutableStoreVerifier.create; import static voldemort.TestUtils.getClock; import static voldemort.cluster.failuredetector.FailureDetectorUtils.create; +import static voldemort.cluster.failuredetector.MutableStoreVerifier.create; import java.util.ArrayList; import java.util.Arrays; @@ -56,6 +56,7 @@ import voldemort.store.memory.InMemoryStorageEngine; import voldemort.utils.ByteArray; import voldemort.utils.Time; +import voldemort.utils.TimeoutConfig; import voldemort.versioning.Versioned; import com.google.common.collect.Iterables; @@ -157,7 +158,8 @@ public void testMissingKeysAreAddedToNodeWhenDoingReadRepair() throws Exception RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - 1000L); + new TimeoutConfig(1000L, + false)); RoutedStore store = routedStoreFactory.create(cluster, storeDef, @@ -198,8 +200,8 @@ public void testNoDuplicates() throws Exception { public void testSingleSuccessor() throws Exception { assertVariationsEqual(singletonList(getValue(1, 1, new int[] { 1, 1 })), - asList(getValue(1, 1, new int[] { 1 }), getValue(2, 1, new int[] { 1, - 1 }))); + asList(getValue(1, 1, new int[] { 1 }), + getValue(2, 1, new int[] { 1, 1 }))); } public void testAllConcurrent() throws Exception { @@ -257,8 +259,9 @@ public void testConcurrentToOneDoesNotImplyConcurrentToAll() throws Exception { getValue(1, 1, new int[] { 1, 2 }), getValue(2, 1, new int[] { 1, 3, 3 }), getValue(3, 1, new int[] { 1, 2 })), - asList(getValue(1, 1, new int[] { 3, 3 }), getValue(2, 1, new int[] { - 1, 2 }), getValue(3, 1, new int[] { 1, 3, 3 }))); + asList(getValue(1, 1, new int[] { 3, 3 }), + getValue(2, 1, new int[] { 1, 2 }), + getValue(3, 1, new int[] { 1, 3, 3 }))); } public void testLotsOfVersions() throws Exception { diff --git a/test/unit/voldemort/store/routed/RoutedStoreTest.java b/test/unit/voldemort/store/routed/RoutedStoreTest.java index 4ec00e514b..8da982f487 100644 --- a/test/unit/voldemort/store/routed/RoutedStoreTest.java +++ b/test/unit/voldemort/store/routed/RoutedStoreTest.java @@ -18,10 +18,10 @@ import static voldemort.FailureDetectorTestUtils.recordException; import static voldemort.FailureDetectorTestUtils.recordSuccess; -import static voldemort.cluster.failuredetector.MutableStoreVerifier.create; import static voldemort.TestUtils.getClock; import static voldemort.VoldemortTestConstants.getNineNodeCluster; import static voldemort.cluster.failuredetector.FailureDetectorUtils.create; +import static voldemort.cluster.failuredetector.MutableStoreVerifier.create; import java.util.ArrayList; import java.util.Arrays; @@ -33,6 +33,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; @@ -73,6 +74,7 @@ import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; import voldemort.utils.Time; +import voldemort.utils.TimeoutConfig; import voldemort.utils.Utils; import voldemort.versioning.Occurred; import voldemort.versioning.VectorClock; @@ -202,7 +204,8 @@ else if(count < failing + sleepy) routedStoreThreadPool = Executors.newFixedThreadPool(threads); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - 1000L); + new TimeoutConfig(1000L, + false)); return routedStoreFactory.create(cluster, storeDef, subStores, true, failureDetector); } @@ -252,7 +255,8 @@ else if(sleepy != null && sleepy.contains(n.getId())) routedStoreThreadPool = Executors.newFixedThreadPool(threads); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true, routedStoreThreadPool, - timeOutMs); + new TimeoutConfig(timeOutMs, + false)); return routedStoreFactory.create(cluster, storeDef, subStores, true, failureDetector); } @@ -376,7 +380,7 @@ public void testPutIncrementsVersion() throws Exception { @Test public void testObsoleteMasterFails() { - // write me + // write me } @Test @@ -779,6 +783,75 @@ public void testGetAllWithNodeDown() throws Exception { } } + /** + * Tests that getAll returns partial results + */ + @Test + public void testPartialGetAll() throws Exception { + // create a store with rf=1 i.e disjoint partitions + StoreDefinition definition = new StoreDefinitionBuilder().setName("test") + .setType("foo") + .setKeySerializer(new SerializerDefinition("test")) + .setValueSerializer(new SerializerDefinition("test")) + .setRoutingPolicy(RoutingTier.CLIENT) + .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY) + .setReplicationFactor(1) + .setPreferredReads(1) + .setRequiredReads(1) + .setPreferredWrites(1) + .setRequiredWrites(1) + .build(); + + Map> stores = new HashMap>(); + List nodes = new ArrayList(); + // create nodes with varying speeds - 100ms, 200ms, 300ms + for(int i = 0; i < 3; i++) { + Store store = new SleepyStore(100 * (i + 1), + new InMemoryStorageEngine("test")); + stores.put(i, store); + List partitions = Arrays.asList(i); + nodes.add(new Node(i, "none", 0, 0, 0, partitions)); + } + setFailureDetector(stores); + + routedStoreThreadPool = Executors.newFixedThreadPool(3); + + TimeoutConfig timeoutConfig = new TimeoutConfig(1500, true); + // This means, the getall will only succeed on two of the nodes + timeoutConfig.getAllTimeoutMs(250, TimeUnit.MILLISECONDS); + RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true, + routedStoreThreadPool, + timeoutConfig); + + RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes), + definition, + stores, + true, + failureDetector); + /* do some puts so we have some data to test getalls */ + Map expectedValues = Maps.newHashMap(); + for(byte i = 1; i < 11; ++i) { + ByteArray key = new ByteArray(new byte[] { i }); + byte[] value = new byte[] { (byte) (i + 50) }; + routedStore.put(key, Versioned.value(value), null); + expectedValues.put(key, value); + } + + /* 1. positive test; if partial is on, should get something back */ + Map>> all = routedStore.getAll(expectedValues.keySet(), + null); + assert (expectedValues.size() > all.size()); + + /* 2. negative test; if partial is off, should fail the whole operation */ + timeoutConfig.setPartialGetAllAllowed(false); + try { + all = routedStore.getAll(expectedValues.keySet(), null); + fail("Should have failed"); + } catch(Exception e) { + + } + } + @Test public void testGetAllWithFailingStore() throws Exception { cluster = VoldemortTestConstants.getTwoNodeCluster(); @@ -802,7 +875,8 @@ public void testGetAllWithFailingStore() throws Exception { routedStoreThreadPool = Executors.newFixedThreadPool(1); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - 1000L); + new TimeoutConfig(1000L, + false)); RoutedStore routedStore = routedStoreFactory.create(cluster, storeDef, @@ -858,7 +932,8 @@ public void testGetAllWithMorePreferredReadsThanNodes() throws Exception { routedStoreThreadPool = Executors.newFixedThreadPool(1); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - 1000L); + new TimeoutConfig(1000L, + false)); RoutedStore routedStore = routedStoreFactory.create(cluster, storeDef, @@ -965,7 +1040,8 @@ public void testPutWithOneNodeDownAndOneNodeSlow() throws Exception { routedStoreThreadPool = Executors.newFixedThreadPool(1); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - 1000L); + new TimeoutConfig(1000L, + false)); RoutedStore routedStore = routedStoreFactory.create(cluster, storeDef, @@ -1011,7 +1087,8 @@ public void testPutTimeout() throws Exception { routedStoreThreadPool = Executors.newFixedThreadPool(3); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - timeout); + new TimeoutConfig(timeout, + false)); RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes), definition, @@ -1064,7 +1141,8 @@ public void testGetTimeout() throws Exception { routedStoreThreadPool = Executors.newFixedThreadPool(3); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true, routedStoreThreadPool, - timeout); + new TimeoutConfig(timeout, + false)); RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes), definition, @@ -1082,6 +1160,62 @@ public void testGetTimeout() throws Exception { } } + @Test + public void testOperationSpecificTimeouts() throws Exception { + StoreDefinition definition = new StoreDefinitionBuilder().setName("test") + .setType("foo") + .setKeySerializer(new SerializerDefinition("test")) + .setValueSerializer(new SerializerDefinition("test")) + .setRoutingPolicy(RoutingTier.CLIENT) + .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY) + .setReplicationFactor(3) + .setPreferredReads(3) + .setRequiredReads(3) + .setPreferredWrites(3) + .setRequiredWrites(3) + .build(); + Map> stores = new HashMap>(); + List nodes = new ArrayList(); + for(int i = 0; i < 3; i++) { + Store store = new SleepyStore(200, + new InMemoryStorageEngine("test")); + stores.put(i, store); + List partitions = Arrays.asList(i); + nodes.add(new Node(i, "none", 0, 0, 0, partitions)); + } + + setFailureDetector(stores); + + routedStoreThreadPool = Executors.newFixedThreadPool(3); + // with a 500ms general timeout and a 100ms get timeout, only get should + // fail + TimeoutConfig timeoutConfig = new TimeoutConfig(1500, false); + timeoutConfig.getTimeoutMs(100, TimeUnit.MILLISECONDS); + RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true, + routedStoreThreadPool, + timeoutConfig); + + RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes), + definition, + stores, + true, + failureDetector); + try { + routedStore.put(new ByteArray("test".getBytes()), + new Versioned(new byte[] { 1 }), + null); + } catch(InsufficientOperationalNodesException e) { + fail("Should not have failed"); + } + + try { + routedStore.get(new ByteArray("test".getBytes()), null); + fail("Should have thrown"); + } catch(InsufficientOperationalNodesException e) { + + } + } + /** * See Issue #211: Unnecessary read repairs during getAll with more than one * key @@ -1113,7 +1247,8 @@ public void testNoReadRepair() throws Exception { routedStoreThreadPool = Executors.newFixedThreadPool(1); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - 1000L); + new TimeoutConfig(1000L, + false)); RoutedStore routedStore = routedStoreFactory.create(cluster, storeDef, @@ -1164,7 +1299,8 @@ public void testTardyResponsesNotIncludedInResult() throws Exception { routedStoreThreadPool = Executors.newFixedThreadPool(cluster.getNumberOfNodes()); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - 10000L); + new TimeoutConfig(10000L, + false)); RoutedStore routedStore = routedStoreFactory.create(cluster, storeDef, @@ -1176,7 +1312,7 @@ public void testTardyResponsesNotIncludedInResult() throws Exception { routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - sleepTimeMs / 2); + new TimeoutConfig(sleepTimeMs / 2, false)); routedStore = routedStoreFactory.create(cluster, storeDef, subStores, true, failureDetector); @@ -1218,7 +1354,8 @@ public void testSlowStoreDowngradesFromPreferredToRequired() throws Exception { routedStoreThreadPool = Executors.newFixedThreadPool(cluster.getNumberOfNodes()); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - 10000L); + new TimeoutConfig(10000L, + false)); RoutedStore routedStore = routedStoreFactory.create(cluster, storeDef, @@ -1230,7 +1367,7 @@ public void testSlowStoreDowngradesFromPreferredToRequired() throws Exception { routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, - sleepTimeMs / 2); + new TimeoutConfig(sleepTimeMs / 2, false)); routedStore = routedStoreFactory.create(cluster, storeDef, subStores, true, failureDetector); @@ -1279,7 +1416,7 @@ public void testPutDeleteZoneRouting() throws Exception { routedStoreThreadPool = Executors.newFixedThreadPool(8); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true, routedStoreThreadPool, - 60); + new TimeoutConfig(60, false)); Store s1 = routedStoreFactory.create(cluster, storeDef,