Skip to content

Commit

Permalink
Moved SlowStorageEngine into tests/integration.
Browse files Browse the repository at this point in the history
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent ad3d0cc commit 0c2bd87
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 142 deletions.
32 changes: 12 additions & 20 deletions src/java/voldemort/client/TimeoutConfig.java
@@ -1,26 +1,20 @@
package voldemort.client;

import java.util.HashMap;

import voldemort.common.VoldemortOpCode;
import voldemort.utils.OpTimeMap;

/**
* Encapsulates the timeouts for various voldemort operations
* Encapsulates the timeouts, in ms, for various Voldemort operations
*
*/
public class TimeoutConfig {

private HashMap<Byte, Long> timeoutMap;
private OpTimeMap timeoutMap;

private boolean partialGetAllAllowed;

public TimeoutConfig(long globalTimeout, boolean allowPartialGetAlls) {
this(globalTimeout,
globalTimeout,
globalTimeout,
globalTimeout,
globalTimeout,
allowPartialGetAlls);
timeoutMap = new OpTimeMap(globalTimeout);
setPartialGetAllAllowed(allowPartialGetAlls);
}

public TimeoutConfig(long getTimeout,
Expand All @@ -29,22 +23,20 @@ public TimeoutConfig(long getTimeout,
long getAllTimeout,
long getVersionsTimeout,
boolean allowPartialGetAlls) {
timeoutMap = new HashMap<Byte, Long>();
timeoutMap.put(VoldemortOpCode.GET_OP_CODE, getTimeout);
timeoutMap.put(VoldemortOpCode.PUT_OP_CODE, putTimeout);
timeoutMap.put(VoldemortOpCode.DELETE_OP_CODE, deleteTimeout);
timeoutMap.put(VoldemortOpCode.GET_ALL_OP_CODE, getAllTimeout);
timeoutMap.put(VoldemortOpCode.GET_VERSION_OP_CODE, getVersionsTimeout);
timeoutMap = new OpTimeMap(getTimeout,
putTimeout,
deleteTimeout,
getAllTimeout,
getVersionsTimeout);
setPartialGetAllAllowed(allowPartialGetAlls);
}

public long getOperationTimeout(Byte opCode) {
assert timeoutMap.containsKey(opCode);
return timeoutMap.get(opCode);
return timeoutMap.getOpTime(opCode);
}

public void setOperationTimeout(Byte opCode, long timeoutMs) {
timeoutMap.put(opCode, timeoutMs);
timeoutMap.setOpTime(opCode, timeoutMs);
}

public boolean isPartialGetAllAllowed() {
Expand Down
54 changes: 34 additions & 20 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -30,11 +30,11 @@
import voldemort.store.bdb.BdbStorageConfiguration;
import voldemort.store.memory.CacheStorageConfiguration;
import voldemort.store.memory.InMemoryStorageConfiguration;
import voldemort.store.memory.SlowStorageEngine;
import voldemort.store.mysql.MysqlStorageConfiguration;
import voldemort.store.readonly.BinarySearchStrategy;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
import voldemort.utils.ConfigurationException;
import voldemort.utils.OpTimeMap;
import voldemort.utils.Props;
import voldemort.utils.Time;
import voldemort.utils.UndefinedPropertyException;
Expand Down Expand Up @@ -104,8 +104,8 @@ public class VoldemortConfig implements Serializable {
private long reportingIntervalBytes;
private int fetcherBufferSize;

private SlowStorageEngine.OperationDelays slowQueueingDelays;
private SlowStorageEngine.OperationDelays slowConcurrentDelays;
private OpTimeMap testingSlowQueueingDelays;
private OpTimeMap testingSlowConcurrentDelays;

private int coreThreads;
private int maxThreads;
Expand Down Expand Up @@ -256,19 +256,33 @@ public VoldemortConfig(Props props) {
this.mysqlPort = props.getInt("mysql.port", 3306);
this.mysqlDatabaseName = props.getString("mysql.database", "voldemort");

this.slowQueueingDelays = new SlowStorageEngine.OperationDelays();
this.slowQueueingDelays.getMs = props.getInt("slow.queueing.get.ms", 0);
this.slowQueueingDelays.getVersionsMs = props.getInt("slow.queueing.getversions.ms", 0);
this.slowQueueingDelays.getAllMs = props.getInt("slow.queueing.getall.ms", 0);
this.slowQueueingDelays.putMs = props.getInt("slow.queueing.put.ms", 0);
this.slowQueueingDelays.deleteMs = props.getInt("slow.queueing.delete.ms", 0);

this.slowConcurrentDelays = new SlowStorageEngine.OperationDelays();
this.slowConcurrentDelays.getMs = props.getInt("slow.concurrent.get.ms", 0);
this.slowConcurrentDelays.getVersionsMs = props.getInt("slow.concurrent.getversions.ms", 0);
this.slowConcurrentDelays.getAllMs = props.getInt("slow.concurrent.getall.ms", 0);
this.slowConcurrentDelays.putMs = props.getInt("slow.concurrent.put.ms", 0);
this.slowConcurrentDelays.deleteMs = props.getInt("slow.concurrent.delete.ms", 0);
this.testingSlowQueueingDelays = new OpTimeMap(0);
this.testingSlowQueueingDelays.setOpTime(VoldemortOpCode.GET_OP_CODE,
props.getInt("testing.slow.queueing.get.ms", 0));
this.testingSlowQueueingDelays.setOpTime(VoldemortOpCode.GET_ALL_OP_CODE,
props.getInt("testing.slow.queueing.getall.ms", 0));
this.testingSlowQueueingDelays.setOpTime(VoldemortOpCode.GET_VERSION_OP_CODE,
props.getInt("testing.slow.queueing.getversions.ms",
0));
this.testingSlowQueueingDelays.setOpTime(VoldemortOpCode.PUT_OP_CODE,
props.getInt("testing.slow.queueing.put.ms", 0));
this.testingSlowQueueingDelays.setOpTime(VoldemortOpCode.DELETE_OP_CODE,
props.getInt("testing.slow.queueing.delete.ms", 0));

this.testingSlowConcurrentDelays = new OpTimeMap(0);
this.testingSlowConcurrentDelays.setOpTime(VoldemortOpCode.GET_OP_CODE,
props.getInt("testing.slow.concurrent.get.ms", 0));
this.testingSlowConcurrentDelays.setOpTime(VoldemortOpCode.GET_ALL_OP_CODE,
props.getInt("testing.slow.concurrent.getall.ms",
0));
this.testingSlowConcurrentDelays.setOpTime(VoldemortOpCode.GET_VERSION_OP_CODE,
props.getInt("testing.slow.concurrent.getversions.ms",
0));
this.testingSlowConcurrentDelays.setOpTime(VoldemortOpCode.PUT_OP_CODE,
props.getInt("testing.slow.concurrent.put.ms", 0));
this.testingSlowConcurrentDelays.setOpTime(VoldemortOpCode.DELETE_OP_CODE,
props.getInt("testing.slow.concurrent.delete.ms",
0));

this.maxThreads = props.getInt("max.threads", 100);
this.coreThreads = props.getInt("core.threads", Math.max(1, maxThreads / 2));
Expand Down Expand Up @@ -1565,11 +1579,11 @@ public void setEnableJmxClusterName(boolean enableJmxClusterName) {
this.enableJmxClusterName = enableJmxClusterName;
}

public SlowStorageEngine.OperationDelays getSlowQueueingDelays() {
return this.slowQueueingDelays;
public OpTimeMap testingGetSlowQueueingDelays() {
return this.testingSlowQueueingDelays;
}

public SlowStorageEngine.OperationDelays getSlowConcurrentDelays() {
return this.slowConcurrentDelays;
public OpTimeMap testingGetSlowConcurrentDelays() {
return this.testingSlowConcurrentDelays;
}
}
Expand Up @@ -43,6 +43,7 @@
import voldemort.utils.pool.KeyedResourcePool;
import voldemort.utils.pool.QueuedKeyedResourcePool;
import voldemort.utils.pool.ResourcePoolConfig;
import voldemort.utils.pool.ResourceRequest;

/**
* A pool of {@link ClientRequestExecutor} keyed off the
Expand Down Expand Up @@ -231,8 +232,7 @@ public <T> void submitAsync(SocketDestination destination,
* Wrap up an asynchronous request and actually issue it once a
* SocketDestination is checked out.
*/
private class AsyncRequest<T> implements
QueuedKeyedResourcePool.ResourceRequest<ClientRequestExecutor> {
private class AsyncRequest<T> implements ResourceRequest<ClientRequestExecutor> {

private final SocketDestination destination;
public final ClientRequest<T> delegate;
Expand Down
26 changes: 5 additions & 21 deletions src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
Expand Up @@ -33,23 +33,6 @@
*/
public class QueuedKeyedResourcePool<K, V> extends KeyedResourcePool<K, V> {

public interface ResourceRequest<V> {

// Invoked with checked out resource; resource guaranteed to be
// not-null.
void useResource(V resource);

// Invoked sometime after deadline. Will never invoke useResource.
void handleTimeout();

// Invoked upon resource pool exception. Will never invoke useResource.
void handleException(Exception e);

// Returns deadline (in nanoseconds), after which handleTimeout()
// should be invoked.
long getDeadlineNs();
}

private static final Logger logger = Logger.getLogger(QueuedKeyedResourcePool.class.getName());

private final ConcurrentMap<K, Queue<ResourceRequest<V>>> requestQueueMap;
Expand Down Expand Up @@ -121,7 +104,7 @@ public void requestResource(K key, ResourceRequest<V> resourceRequest) {
try {
resource = attemptCheckout(resourcePool);
} catch(Exception e) {
super.destroyResource(key, resourcePool, resource);
destroyResource(key, resourcePool, resource);
resourceRequest.handleException(e);
}
if(resource != null) {
Expand Down Expand Up @@ -175,7 +158,7 @@ private boolean processQueue(K key) throws Exception {
attemptGrow(key, resourcePool);
resource = attemptCheckout(resourcePool);
} catch(Exception e) {
super.destroyResource(key, resourcePool, resource);
destroyResource(key, resourcePool, resource);
}
if(resource == null) {
return false;
Expand All @@ -184,7 +167,8 @@ private boolean processQueue(K key) throws Exception {
// With resource in hand, process the resource requests
ResourceRequest<V> resourceRequest = getNextUnexpiredResourceRequest(requestQueue);
if(resourceRequest == null) {
// Did not use the resource!
// Did not use the resource! Directly check in via super to avoid
// circular call to processQueue().
super.checkin(key, resource);
return false;
}
Expand All @@ -203,7 +187,7 @@ private boolean processQueue(K key) throws Exception {
public void checkin(K key, V resource) throws Exception {
super.checkin(key, resource);
// NB: Blocking checkout calls may get checked in resource before
// processQueue.
// processQueue() attempts checkout.
while(processQueue(key)) {}
}

Expand Down
Expand Up @@ -47,7 +47,7 @@
import voldemort.store.StoreDefinitionBuilder;
import voldemort.store.bdb.BdbStorageConfiguration;
import voldemort.store.memory.InMemoryStorageConfiguration;
import voldemort.store.memory.SlowStorageConfiguration;
import voldemort.store.slow.SlowStorageConfiguration;
import voldemort.store.socket.SocketStoreFactory;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.versioning.ObsoleteVersionException;
Expand Down
Expand Up @@ -13,15 +13,15 @@
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.store.memory;
package voldemort.store.slow;

import voldemort.VoldemortException;
import voldemort.server.VoldemortConfig;
import voldemort.store.StorageConfiguration;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.utils.ByteArray;
import voldemort.utils.OpTimeMap;

/**
* A storage engine that wraps InMemoryStorageEngine with delays.
Expand All @@ -41,12 +41,12 @@ public SlowStorageConfiguration(VoldemortConfig config) {
public StorageEngine<ByteArray, byte[], byte[]> getStore(StoreDefinition storeDef) {
if(voldemortConfig != null) {
return new SlowStorageEngine<ByteArray, byte[], byte[]>(storeDef.getName(),
this.voldemortConfig.getSlowQueueingDelays(),
this.voldemortConfig.getSlowConcurrentDelays());
this.voldemortConfig.testingGetSlowQueueingDelays(),
this.voldemortConfig.testingGetSlowConcurrentDelays());
}
return new SlowStorageEngine<ByteArray, byte[], byte[]>(storeDef.getName(),
new SlowStorageEngine.OperationDelays(),
new SlowStorageEngine.OperationDelays());
new OpTimeMap(0),
new OpTimeMap(0));
}

public String getType() {
Expand Down

0 comments on commit 0c2bd87

Please sign in to comment.