diff --git a/src/java/voldemort/server/VoldemortConfig.java b/src/java/voldemort/server/VoldemortConfig.java index e1626e0a81..aa0554a04d 100644 --- a/src/java/voldemort/server/VoldemortConfig.java +++ b/src/java/voldemort/server/VoldemortConfig.java @@ -30,6 +30,7 @@ import voldemort.store.bdb.BdbStorageConfiguration; import voldemort.store.memory.CacheStorageConfiguration; import voldemort.store.memory.InMemoryStorageConfiguration; +import voldemort.store.memory.SlowStorageEngine; import voldemort.store.mysql.MysqlStorageConfiguration; import voldemort.store.readonly.BinarySearchStrategy; import voldemort.store.readonly.ReadOnlyStorageConfiguration; @@ -103,6 +104,9 @@ public class VoldemortConfig implements Serializable { private long reportingIntervalBytes; private int fetcherBufferSize; + private SlowStorageEngine.OperationDelays slowQueueingDelays; + private SlowStorageEngine.OperationDelays slowConcurrentDelays; + private int coreThreads; private int maxThreads; @@ -252,6 +256,20 @@ public VoldemortConfig(Props props) { this.mysqlPort = props.getInt("mysql.port", 3306); this.mysqlDatabaseName = props.getString("mysql.database", "voldemort"); + this.slowQueueingDelays = new SlowStorageEngine.OperationDelays(); + this.slowQueueingDelays.getMs = props.getInt("slow.queueing.get.ms", 0); + this.slowQueueingDelays.getVersionsMs = props.getInt("slow.queueing.getversions.ms", 0); + this.slowQueueingDelays.getAllMs = props.getInt("slow.queueing.getall.ms", 0); + this.slowQueueingDelays.putMs = props.getInt("slow.queueing.put.ms", 0); + this.slowQueueingDelays.deleteMs = props.getInt("slow.queueing.delete.ms", 0); + + this.slowConcurrentDelays = new SlowStorageEngine.OperationDelays(); + this.slowConcurrentDelays.getMs = props.getInt("slow.concurrent.get.ms", 0); + this.slowConcurrentDelays.getVersionsMs = props.getInt("slow.concurrent.getversions.ms", 0); + this.slowConcurrentDelays.getAllMs = props.getInt("slow.concurrent.getall.ms", 0); + this.slowConcurrentDelays.putMs = props.getInt("slow.concurrent.put.ms", 0); + this.slowConcurrentDelays.deleteMs = props.getInt("slow.concurrent.delete.ms", 0); + this.maxThreads = props.getInt("max.threads", 100); this.coreThreads = props.getInt("core.threads", Math.max(1, maxThreads / 2)); @@ -1547,4 +1565,11 @@ public void setEnableJmxClusterName(boolean enableJmxClusterName) { this.enableJmxClusterName = enableJmxClusterName; } + public SlowStorageEngine.OperationDelays getSlowQueueingDelays() { + return this.slowQueueingDelays; + } + + public SlowStorageEngine.OperationDelays getSlowConcurrentDelays() { + return this.slowConcurrentDelays; + } } diff --git a/src/java/voldemort/store/memory/SlowStorageConfiguration.java b/src/java/voldemort/store/memory/SlowStorageConfiguration.java new file mode 100644 index 0000000000..0c2d2034e6 --- /dev/null +++ b/src/java/voldemort/store/memory/SlowStorageConfiguration.java @@ -0,0 +1,62 @@ +/* + * Copyright 2012 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.store.memory; + +import voldemort.VoldemortException; +import voldemort.server.VoldemortConfig; +import voldemort.store.StorageConfiguration; +import voldemort.store.StorageEngine; +import voldemort.store.StoreDefinition; +import voldemort.utils.ByteArray; + +/** + * A storage engine that wraps InMemoryStorageEngine with delays. + * + * + */ +public class SlowStorageConfiguration implements StorageConfiguration { + + public static final String TYPE_NAME = "slow"; + + private final VoldemortConfig voldemortConfig; + + public SlowStorageConfiguration(VoldemortConfig config) { + this.voldemortConfig = config; + } + + public StorageEngine getStore(StoreDefinition storeDef) { + if(voldemortConfig != null) { + return new SlowStorageEngine(storeDef.getName(), + this.voldemortConfig.getSlowQueueingDelays(), + this.voldemortConfig.getSlowConcurrentDelays()); + } + return new SlowStorageEngine(storeDef.getName(), + new SlowStorageEngine.OperationDelays(), + new SlowStorageEngine.OperationDelays()); + } + + public String getType() { + return TYPE_NAME; + } + + public void close() {} + + public void update(StoreDefinition storeDef) { + throw new VoldemortException("Storage config updates not permitted for " + + this.getClass().getCanonicalName()); + } +} diff --git a/src/java/voldemort/store/memory/SlowStorageEngine.java b/src/java/voldemort/store/memory/SlowStorageEngine.java new file mode 100644 index 0000000000..5c0f276766 --- /dev/null +++ b/src/java/voldemort/store/memory/SlowStorageEngine.java @@ -0,0 +1,184 @@ +/* + * Copyright 2012 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.store.memory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import voldemort.VoldemortException; +import voldemort.store.StorageEngine; +import voldemort.store.StoreCapabilityType; +import voldemort.utils.ClosableIterator; +import voldemort.utils.Pair; +import voldemort.versioning.Version; +import voldemort.versioning.Versioned; + +/** + * A wrapped version of InMemoryStorageEngine that can add delay to each + * operation type. Useful for unit testing. + * + * Any operation with queueingDelays of more than 0 ms blocks until it has + * slept. It will queue up behind other queuingDelays operations that need to + * sleep first. + * + * Any operation with concurrentDelays of more than 0 ms sleeps for the + * specified amount of time. This delay does not block other operations. I.e., + * multiple operations sleep for the specified concurrentDelays simultaneously. + * + * Both queueingDelays and concurrentDelays may be specified for each operation. + * queueingDelays are done before concurrentDelays; time spent in queueingDelays + * does not affect concurrentDelays. + * + */ +public class SlowStorageEngine implements StorageEngine { + + public static class OperationDelays { + + public long getMs; + public long getVersionsMs; + public long getAllMs; + public long putMs; + public long deleteMs; + + public OperationDelays() { + this.getMs = 0; + this.getVersionsMs = 0; + this.getAllMs = 0; + this.putMs = 0; + this.deleteMs = 0; + } + + public OperationDelays(long getMs, + long getVersionsMs, + long getAllMs, + long putMs, + long deleteMs) { + this.getMs = getMs; + this.getVersionsMs = getVersionsMs; + this.getAllMs = getAllMs; + this.putMs = putMs; + this.deleteMs = deleteMs; + } + } + + private final InMemoryStorageEngine imStore; + private final OperationDelays queueingDelays; + private final OperationDelays concurrentDelays; + + public SlowStorageEngine(String name) { + this(name, new OperationDelays(), new OperationDelays()); + } + + public SlowStorageEngine(String name, + OperationDelays queueingDelays, + OperationDelays concurrentDelays) { + imStore = new InMemoryStorageEngine(name); + this.queueingDelays = queueingDelays; + this.concurrentDelays = concurrentDelays; + } + + private synchronized void queueingSleep(long ms) { + try { + TimeUnit.MILLISECONDS.sleep(ms); + } catch(InterruptedException e) { + e.printStackTrace(); + } + } + + private void concurrentSleep(long ms) { + try { + TimeUnit.MILLISECONDS.sleep(ms); + } catch(InterruptedException e) { + e.printStackTrace(); + } + } + + public boolean delete(K key) { + return delete(key, null); + } + + public boolean delete(K key, Version version) { + if(queueingDelays.deleteMs > 0) + queueingSleep(queueingDelays.deleteMs); + if(concurrentDelays.deleteMs > 0) + concurrentSleep(concurrentDelays.deleteMs); + return imStore.delete(key, version); + } + + public List getVersions(K key) { + if(queueingDelays.getVersionsMs > 0) + queueingSleep(queueingDelays.getVersionsMs); + if(concurrentDelays.getVersionsMs > 0) + concurrentSleep(concurrentDelays.getVersionsMs); + return imStore.getVersions(key); + } + + public List> get(K key, T transform) throws VoldemortException { + if(queueingDelays.getMs > 0) + queueingSleep(queueingDelays.getMs); + if(concurrentDelays.getMs > 0) + concurrentSleep(concurrentDelays.getMs); + return imStore.get(key, transform); + } + + public Map>> getAll(Iterable keys, Map transforms) + throws VoldemortException { + if(queueingDelays.getAllMs > 0) + queueingSleep(queueingDelays.getAllMs); + if(concurrentDelays.getAllMs > 0) + concurrentSleep(concurrentDelays.getAllMs); + return imStore.getAll(keys, transforms); + } + + public void put(K key, Versioned value, T transforms) throws VoldemortException { + if(queueingDelays.putMs > 0) + queueingSleep(queueingDelays.putMs); + if(concurrentDelays.putMs > 0) + concurrentSleep(concurrentDelays.putMs); + imStore.put(key, value, transforms); + } + + public ClosableIterator>> entries() { + return imStore.entries(); + } + + public ClosableIterator keys() { + return imStore.keys(); + } + + public void truncate() { + imStore.truncate(); + } + + public boolean isPartitionAware() { + return imStore.isPartitionAware(); + } + + public String getName() { + return imStore.getName(); + } + + public void close() { + imStore.close(); + } + + public Object getCapability(StoreCapabilityType capability) { + return imStore.getCapability(capability); + } + +} diff --git a/test/unit/voldemort/store/memory/SlowStorageEngineTest.java b/test/unit/voldemort/store/memory/SlowStorageEngineTest.java new file mode 100644 index 0000000000..8e6b63fd8e --- /dev/null +++ b/test/unit/voldemort/store/memory/SlowStorageEngineTest.java @@ -0,0 +1,296 @@ +/* + * Copyright 2012 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.store.memory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.junit.Test; + +import voldemort.TestUtils; +import voldemort.common.VoldemortOpCode; +import voldemort.store.AbstractStorageEngineTest; +import voldemort.store.StorageEngine; +import voldemort.utils.ByteArray; +import voldemort.utils.ByteUtils; +import voldemort.utils.pool.KeyedResourcePool; +import voldemort.versioning.ObsoleteVersionException; +import voldemort.versioning.VectorClock; +import voldemort.versioning.Versioned; + +public class SlowStorageEngineTest extends AbstractStorageEngineTest { + + private static final Logger logger = Logger.getLogger(KeyedResourcePool.class.getName()); + + private StorageEngine store; + private final List opList; + + public SlowStorageEngineTest() { + opList = new ArrayList(); + opList.add(VoldemortOpCode.GET_OP_CODE); + opList.add(VoldemortOpCode.GET_VERSION_OP_CODE); + opList.add(VoldemortOpCode.GET_ALL_OP_CODE); + opList.add(VoldemortOpCode.PUT_OP_CODE); + opList.add(VoldemortOpCode.DELETE_OP_CODE); + } + + @Override + public StorageEngine getStorageEngine() { + return store; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + // Do not change the magic constants in the next two constructors! The + // unit tests assert on specific delays occurring. + SlowStorageEngine.OperationDelays queued = new SlowStorageEngine.OperationDelays(10, + 20, + 30, + 40, + 50); + SlowStorageEngine.OperationDelays concurrent = new SlowStorageEngine.OperationDelays(50, + 40, + 30, + 20, + 10); + this.store = new SlowStorageEngine("test", queued, concurrent); + } + + @Override + public List getKeys(int numKeys) { + List keys = new ArrayList(numKeys); + for(int i = 0; i < numKeys; i++) + keys.add(new ByteArray(TestUtils.randomBytes(10))); + return keys; + } + + // Two modes for the runnable: give it a time to check (expectedTimeMs) or + // give it a concurrent queue upon which to add its run time. + public class OpInvoker implements Runnable { + + private final CountDownLatch signal; + private final byte opCode; + + private long expectedTimeMs; + private ConcurrentLinkedQueue runTimes; + + private final ByteArray key; + private final byte[] value; + + protected OpInvoker(CountDownLatch signal, byte opCode) { + this.signal = signal; + this.opCode = opCode; + + this.expectedTimeMs = -1; + this.runTimes = null; + + this.key = new ByteArray(ByteUtils.getBytes("key", "UTF-8")); + this.value = ByteUtils.getBytes("value", "UTF-8"); + logger.debug("OpInvoker created for operation " + getOpName() + "(Thread: " + + Thread.currentThread().getName() + ")"); + } + + // expectedTimeMs <= 0 means not checking the runtime + OpInvoker(CountDownLatch signal, byte opCode, long expectedTimeMs) { + this(signal, opCode); + this.expectedTimeMs = expectedTimeMs; + } + + OpInvoker(CountDownLatch signal, byte opCode, ConcurrentLinkedQueue runTimes) { + this(signal, opCode); + this.runTimes = runTimes; + } + + private String getOpName() { + switch(this.opCode) { + case VoldemortOpCode.GET_OP_CODE: + return "Get"; + case VoldemortOpCode.GET_VERSION_OP_CODE: + return "GetVersion"; + case VoldemortOpCode.GET_ALL_OP_CODE: + return "GetAll"; + case VoldemortOpCode.DELETE_OP_CODE: + return "Delete"; + case VoldemortOpCode.PUT_OP_CODE: + return "Put"; + default: + logger.error("getOpName invoked with bad operation code: " + opCode); + } + return null; + } + + private void doGet() { + store.get(key, null); + } + + private void doGetAll() { + List keys = new ArrayList(); + keys.add(key); + store.getAll(keys, null); + } + + private void doGetVersion() { + store.getVersions(key); + } + + private void doPut() { + try { + store.put(key, new Versioned(value), null); + } catch(ObsoleteVersionException e) { + // This exception is expected in some tests. + } + } + + private void doDelete() { + store.delete(key, new VectorClock()); + } + + public void run() { + long startTimeNs = System.nanoTime(); + + switch(this.opCode) { + case VoldemortOpCode.GET_OP_CODE: + doGet(); + break; + case VoldemortOpCode.GET_VERSION_OP_CODE: + doGetVersion(); + break; + case VoldemortOpCode.GET_ALL_OP_CODE: + doGetAll(); + break; + case VoldemortOpCode.PUT_OP_CODE: + doPut(); + break; + case VoldemortOpCode.DELETE_OP_CODE: + doDelete(); + break; + default: + logger.error("OpInvoker issued with bad operation code: " + this.opCode); + } + long runTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); + + if(this.expectedTimeMs > 0) { + String details = "(runTimeMs: " + runTimeMs + ", Thread: " + + Thread.currentThread().getName() + ")"; + assertFalse("OpInvoker operation time is bad " + details, + !isRunTimeBad(runTimeMs, this.expectedTimeMs)); + } + + if(this.runTimes != null) + runTimes.add(runTimeMs); + logger.debug("OpInvoker finished operation " + getOpName() + "(Thread: " + + Thread.currentThread().getName() + ")"); + signal.countDown(); + } + } + + // true if runtime is not within a "reasonable" range. Reasonable + // defined by a 10% fudge factor. + private boolean isRunTimeBad(long runTimeMs, long expectedTimeMs) { + if((runTimeMs < expectedTimeMs || runTimeMs > (expectedTimeMs * 1.1))) { + return false; + } + return true; + } + + /** + * Test the time of each op type individually. + */ + @Test + public void testEachOpTypeIndividually() { + for(int i = 0; i < 5; ++i) { + long timeoutMs = 60; + if(i == 0) + timeoutMs = 0; + + for(byte op: opList) { + CountDownLatch waitForOp = new CountDownLatch(1); + new Thread(new OpInvoker(waitForOp, op, timeoutMs)).start(); + try { + waitForOp.await(); + } catch(InterruptedException e) { + e.printStackTrace(); + } + } + } + + } + + /** + * Test repeated operations. + */ + @Test + public void testEachOpTypeRepeated() { + // Magic number '2': Run once to warm up, run again and test asserts on + // second pass + for(int j = 0; j < 2; j++) { + for(byte op: opList) { + ConcurrentLinkedQueue runTimes = new ConcurrentLinkedQueue(); + CountDownLatch waitForOps = new CountDownLatch(5 + 1); + for(int i = 0; i < 5; ++i) { + new Thread(new OpInvoker(waitForOps, op, runTimes)).start(); + } + + waitForOps.countDown(); + try { + waitForOps.await(); + } catch(InterruptedException e) { + e.printStackTrace(); + } + + // Test runs after the single warm up run. + if(j > 0) { + // Determine what the longest delay should be and test the + // maximum delay against that value. The magic constants + // used to construct the SlowStorageEngine determine the + // longest delay. + Long[] allTimes = runTimes.toArray(new Long[0]); + Arrays.sort(allTimes); + long maxTimeMs = allTimes[4]; + long expectedTimeMs = 0; + switch(op) { + case VoldemortOpCode.GET_OP_CODE: + expectedTimeMs = (5 * 10) + 50; + break; + case VoldemortOpCode.GET_VERSION_OP_CODE: + expectedTimeMs = (5 * 20) + 40; + break; + case VoldemortOpCode.GET_ALL_OP_CODE: + expectedTimeMs = (5 * 30) + 30; + break; + case VoldemortOpCode.PUT_OP_CODE: + expectedTimeMs = (5 * 40) + 20; + break; + case VoldemortOpCode.DELETE_OP_CODE: + expectedTimeMs = (5 * 50) + 10; + break; + } + String details = "(maxTimeMs: " + maxTimeMs + ", " + expectedTimeMs + ")"; + assertFalse("OpInvoker operation time is bad " + details, + !isRunTimeBad(maxTimeMs, expectedTimeMs)); + } + } + } + } + +}