Skip to content

Commit

Permalink
Adding a SlowStorageEngine to permit end-to-end testing with slow
Browse files Browse the repository at this point in the history
servers in a cluster.

src/java/voldemort/store/memory/SlowStorageConfiguration.java
src/java/voldemort/store/memory/SlowStorageEngine.java
- The SlowStorageEngine/Configuration is inspired by the
  SlowStore used in other unit tests. The SlowStorageEngine
  produces delays on a per-operation-type basis. Delays can
  either be concurrent or queued: concurrent delays can overlap
  in time, queued delays occur in serial.

src/java/voldemort/server/VoldemortConfig.java
- Added config options for SlowStorageEngine

test/unit/voldemort/store/memory/SlowStorageEngineTest.java
- Unit test to confirm queued/concurrent delay behavior of a single SlowStorageEngine
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent 69f9896 commit c00291a
Show file tree
Hide file tree
Showing 4 changed files with 567 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -30,6 +30,7 @@
import voldemort.store.bdb.BdbStorageConfiguration;
import voldemort.store.memory.CacheStorageConfiguration;
import voldemort.store.memory.InMemoryStorageConfiguration;
import voldemort.store.memory.SlowStorageEngine;
import voldemort.store.mysql.MysqlStorageConfiguration;
import voldemort.store.readonly.BinarySearchStrategy;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
Expand Down Expand Up @@ -103,6 +104,9 @@ public class VoldemortConfig implements Serializable {
private long reportingIntervalBytes;
private int fetcherBufferSize;

private SlowStorageEngine.OperationDelays slowQueueingDelays;
private SlowStorageEngine.OperationDelays slowConcurrentDelays;

private int coreThreads;
private int maxThreads;

Expand Down Expand Up @@ -252,6 +256,20 @@ public VoldemortConfig(Props props) {
this.mysqlPort = props.getInt("mysql.port", 3306);
this.mysqlDatabaseName = props.getString("mysql.database", "voldemort");

this.slowQueueingDelays = new SlowStorageEngine.OperationDelays();
this.slowQueueingDelays.getMs = props.getInt("slow.queueing.get.ms", 0);
this.slowQueueingDelays.getVersionsMs = props.getInt("slow.queueing.getversions.ms", 0);
this.slowQueueingDelays.getAllMs = props.getInt("slow.queueing.getall.ms", 0);
this.slowQueueingDelays.putMs = props.getInt("slow.queueing.put.ms", 0);
this.slowQueueingDelays.deleteMs = props.getInt("slow.queueing.delete.ms", 0);

this.slowConcurrentDelays = new SlowStorageEngine.OperationDelays();
this.slowConcurrentDelays.getMs = props.getInt("slow.concurrent.get.ms", 0);
this.slowConcurrentDelays.getVersionsMs = props.getInt("slow.concurrent.getversions.ms", 0);
this.slowConcurrentDelays.getAllMs = props.getInt("slow.concurrent.getall.ms", 0);
this.slowConcurrentDelays.putMs = props.getInt("slow.concurrent.put.ms", 0);
this.slowConcurrentDelays.deleteMs = props.getInt("slow.concurrent.delete.ms", 0);

this.maxThreads = props.getInt("max.threads", 100);
this.coreThreads = props.getInt("core.threads", Math.max(1, maxThreads / 2));

Expand Down Expand Up @@ -1547,4 +1565,11 @@ public void setEnableJmxClusterName(boolean enableJmxClusterName) {
this.enableJmxClusterName = enableJmxClusterName;
}

public SlowStorageEngine.OperationDelays getSlowQueueingDelays() {
return this.slowQueueingDelays;
}

public SlowStorageEngine.OperationDelays getSlowConcurrentDelays() {
return this.slowConcurrentDelays;
}
}
62 changes: 62 additions & 0 deletions src/java/voldemort/store/memory/SlowStorageConfiguration.java
@@ -0,0 +1,62 @@
/*
* Copyright 2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.store.memory;

import voldemort.VoldemortException;
import voldemort.server.VoldemortConfig;
import voldemort.store.StorageConfiguration;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.utils.ByteArray;

/**
* A storage engine that wraps InMemoryStorageEngine with delays.
*
*
*/
public class SlowStorageConfiguration implements StorageConfiguration {

public static final String TYPE_NAME = "slow";

private final VoldemortConfig voldemortConfig;

public SlowStorageConfiguration(VoldemortConfig config) {
this.voldemortConfig = config;
}

public StorageEngine<ByteArray, byte[], byte[]> getStore(StoreDefinition storeDef) {
if(voldemortConfig != null) {
return new SlowStorageEngine<ByteArray, byte[], byte[]>(storeDef.getName(),
this.voldemortConfig.getSlowQueueingDelays(),
this.voldemortConfig.getSlowConcurrentDelays());
}
return new SlowStorageEngine<ByteArray, byte[], byte[]>(storeDef.getName(),
new SlowStorageEngine.OperationDelays(),
new SlowStorageEngine.OperationDelays());
}

public String getType() {
return TYPE_NAME;
}

public void close() {}

public void update(StoreDefinition storeDef) {
throw new VoldemortException("Storage config updates not permitted for "
+ this.getClass().getCanonicalName());
}
}
184 changes: 184 additions & 0 deletions src/java/voldemort/store/memory/SlowStorageEngine.java
@@ -0,0 +1,184 @@
/*
* Copyright 2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.store.memory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import voldemort.VoldemortException;
import voldemort.store.StorageEngine;
import voldemort.store.StoreCapabilityType;
import voldemort.utils.ClosableIterator;
import voldemort.utils.Pair;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

/**
* A wrapped version of InMemoryStorageEngine that can add delay to each
* operation type. Useful for unit testing.
*
* Any operation with queueingDelays of more than 0 ms blocks until it has
* slept. It will queue up behind other queuingDelays operations that need to
* sleep first.
*
* Any operation with concurrentDelays of more than 0 ms sleeps for the
* specified amount of time. This delay does not block other operations. I.e.,
* multiple operations sleep for the specified concurrentDelays simultaneously.
*
* Both queueingDelays and concurrentDelays may be specified for each operation.
* queueingDelays are done before concurrentDelays; time spent in queueingDelays
* does not affect concurrentDelays.
*
*/
public class SlowStorageEngine<K, V, T> implements StorageEngine<K, V, T> {

public static class OperationDelays {

public long getMs;
public long getVersionsMs;
public long getAllMs;
public long putMs;
public long deleteMs;

public OperationDelays() {
this.getMs = 0;
this.getVersionsMs = 0;
this.getAllMs = 0;
this.putMs = 0;
this.deleteMs = 0;
}

public OperationDelays(long getMs,
long getVersionsMs,
long getAllMs,
long putMs,
long deleteMs) {
this.getMs = getMs;
this.getVersionsMs = getVersionsMs;
this.getAllMs = getAllMs;
this.putMs = putMs;
this.deleteMs = deleteMs;
}
}

private final InMemoryStorageEngine<K, V, T> imStore;
private final OperationDelays queueingDelays;
private final OperationDelays concurrentDelays;

public SlowStorageEngine(String name) {
this(name, new OperationDelays(), new OperationDelays());
}

public SlowStorageEngine(String name,
OperationDelays queueingDelays,
OperationDelays concurrentDelays) {
imStore = new InMemoryStorageEngine<K, V, T>(name);
this.queueingDelays = queueingDelays;
this.concurrentDelays = concurrentDelays;
}

private synchronized void queueingSleep(long ms) {
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch(InterruptedException e) {
e.printStackTrace();
}
}

private void concurrentSleep(long ms) {
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch(InterruptedException e) {
e.printStackTrace();
}
}

public boolean delete(K key) {
return delete(key, null);
}

public boolean delete(K key, Version version) {
if(queueingDelays.deleteMs > 0)
queueingSleep(queueingDelays.deleteMs);
if(concurrentDelays.deleteMs > 0)
concurrentSleep(concurrentDelays.deleteMs);
return imStore.delete(key, version);
}

public List<Version> getVersions(K key) {
if(queueingDelays.getVersionsMs > 0)
queueingSleep(queueingDelays.getVersionsMs);
if(concurrentDelays.getVersionsMs > 0)
concurrentSleep(concurrentDelays.getVersionsMs);
return imStore.getVersions(key);
}

public List<Versioned<V>> get(K key, T transform) throws VoldemortException {
if(queueingDelays.getMs > 0)
queueingSleep(queueingDelays.getMs);
if(concurrentDelays.getMs > 0)
concurrentSleep(concurrentDelays.getMs);
return imStore.get(key, transform);
}

public Map<K, List<Versioned<V>>> getAll(Iterable<K> keys, Map<K, T> transforms)
throws VoldemortException {
if(queueingDelays.getAllMs > 0)
queueingSleep(queueingDelays.getAllMs);
if(concurrentDelays.getAllMs > 0)
concurrentSleep(concurrentDelays.getAllMs);
return imStore.getAll(keys, transforms);
}

public void put(K key, Versioned<V> value, T transforms) throws VoldemortException {
if(queueingDelays.putMs > 0)
queueingSleep(queueingDelays.putMs);
if(concurrentDelays.putMs > 0)
concurrentSleep(concurrentDelays.putMs);
imStore.put(key, value, transforms);
}

public ClosableIterator<Pair<K, Versioned<V>>> entries() {
return imStore.entries();
}

public ClosableIterator<K> keys() {
return imStore.keys();
}

public void truncate() {
imStore.truncate();
}

public boolean isPartitionAware() {
return imStore.isPartitionAware();
}

public String getName() {
return imStore.getName();
}

public void close() {
imStore.close();
}

public Object getCapability(StoreCapabilityType capability) {
return imStore.getCapability(capability);
}

}

0 comments on commit c00291a

Please sign in to comment.