Skip to content

Commit

Permalink
Substantial refactoring of (Queued)KeyedResourcePool tests.
Browse files Browse the repository at this point in the history
- factored out a common base class in which all of the nested helper classes are defined
- separated out the various types of tests into files:
  - simple/basic tests
  - contention tests that spawn threads to generate contention
  - specific race condition test
- The specific race condition test for KeyedResourcePool shows that google issue 276 is resolved: http://code.google.com/p/project-voldemort/issues/detail?id=276
  • Loading branch information
jayjwylie committed Jan 15, 2013
1 parent 1c15b5a commit 1f3a466
Show file tree
Hide file tree
Showing 6 changed files with 496 additions and 482 deletions.
Expand Up @@ -15,80 +15,18 @@
*/
package voldemort.utils.pool;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;

public class KeyedResourcePoolStressTest {

protected static final int POOL_SIZE = 100;
protected static final long TIMEOUT_MS = 500;
protected static final long NUM_TESTS = 5000;
public class KeyedResourcePoolBaseTest {

protected TestResourceFactory factory;
protected KeyedResourcePool<String, TestResource> pool;
protected ResourcePoolConfig config;

@Before
public void setUp() {
factory = new TestResourceFactory();
config = new ResourcePoolConfig().setMaxPoolSize(POOL_SIZE)
.setTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS);
this.pool = new KeyedResourcePool<String, TestResource>(factory, config);
}

@Test
public void testAttemptGrow() {
ExecutorService service = Executors.newFixedThreadPool(POOL_SIZE);
for(int i = 0; i < NUM_TESTS; i++) {
if(i % 100 == 0) {
System.out.println("Test run: " + i);
}
final CountDownLatch checkouts = new CountDownLatch(POOL_SIZE);
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(POOL_SIZE);
for(int t = 0; t < POOL_SIZE; t++) {
tasks.add(new Callable<Boolean>() {

@Override
public Boolean call() throws Exception {
try {
TestResource resource = pool.checkout("a");
checkouts.countDown();
checkouts.await();
resource.invalidate();
pool.checkin("a", resource);
return true;
} catch(Exception e) {
checkouts.countDown();
throw e;
}
}
});
}
try {
List<Future<Boolean>> futures = service.invokeAll(tasks);
for(Future<Boolean> future: futures) {
assertTrue(future.get());
}
} catch(Exception e) {
fail("Unexpected exception - " + e.getMessage());
}
}
}

protected static class TestResource {

private String value;
Expand Down Expand Up @@ -180,4 +118,64 @@ public void close() {}

}

// TestResourceRequest is only need for the QueuedResourcePool tests, but it
// is easier/cleaner to define here with the other test resources.
protected static class TestResourceRequest implements AsyncResourceRequest<TestResource> {

private AtomicBoolean usedResource;
private AtomicBoolean handledTimeout;
private AtomicBoolean handledException;

static AtomicInteger usedResourceCount = new AtomicInteger(0);
static AtomicInteger handledTimeoutCount = new AtomicInteger(0);
static AtomicInteger handledExceptionCount = new AtomicInteger(0);

long deadlineNs;
final Queue<TestResource> doneQueue;

TestResourceRequest(long deadlineNs, Queue<TestResource> doneQueue) {
this.usedResource = new AtomicBoolean(false);
this.handledTimeout = new AtomicBoolean(false);
this.handledException = new AtomicBoolean(false);
this.deadlineNs = deadlineNs;
this.doneQueue = doneQueue;
}

@Override
public void useResource(TestResource tr) {
// System.err.println("useResource " +
// Thread.currentThread().getName());
assertFalse(this.handledTimeout.get());
assertFalse(this.handledException.get());
usedResource.set(true);
usedResourceCount.getAndIncrement();
doneQueue.add(tr);
}

@Override
public void handleTimeout() {
// System.err.println("handleTimeout " +
// Thread.currentThread().getName());
assertFalse(this.usedResource.get());
assertFalse(this.handledException.get());
handledTimeout.set(true);
handledTimeoutCount.getAndIncrement();
}

@Override
public void handleException(Exception e) {
// System.err.println("handleException " +
// Thread.currentThread().getName());
assertFalse(this.usedResource.get());
assertFalse(this.handledTimeout.get());
handledException.set(true);
handledExceptionCount.getAndIncrement();
}

@Override
public long getDeadlineNs() {
return deadlineNs;
}
}

}
139 changes: 139 additions & 0 deletions test/unit/voldemort/utils/pool/KeyedResourcePoolContentionTest.java
@@ -0,0 +1,139 @@
/*
* Copyright 2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.utils.pool;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Test;

public class KeyedResourcePoolContentionTest extends KeyedResourcePoolBaseTest {

protected static int POOL_SIZE = 5;
protected static long TIMEOUT_MS = 500;

@Before
public void setUp() {
factory = new TestResourceFactory();
config = new ResourcePoolConfig().setMaxPoolSize(POOL_SIZE)
.setTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS);
this.pool = new KeyedResourcePool<String, TestResource>(factory, config);
}

// This method was helpful when developing contendForResources
public void printStats(String key) {
System.err.println("");
System.err.println("getCreated: " + this.factory.getCreated());
System.err.println("getDestroyed: " + this.factory.getDestroyed());
System.err.println("getTotalResourceCount(key): " + this.pool.getTotalResourceCount(key));
System.err.println("getTotalResourceCount(): " + this.pool.getTotalResourceCount());
System.err.println("getCheckedInResourcesCount(key): "
+ this.pool.getCheckedInResourcesCount(key));
System.err.println("getCheckedInResourceCount(): " + this.pool.getCheckedInResourceCount());
}

@Test
public void contendForResources() throws Exception {
int numCheckers = POOL_SIZE * 2;
int numChecks = 10 * 1000;
String key = "Key";
float invalidationRate = (float) 0.25;
CountDownLatch waitForThreads = new CountDownLatch(numCheckers);
CountDownLatch waitForCheckers = new CountDownLatch(numCheckers);
for(int i = 0; i < numCheckers; ++i) {
new Thread(new Checkers(waitForThreads,
waitForCheckers,
key,
numChecks,
invalidationRate)).start();
}

try {
waitForCheckers.await();
assertEquals(this.pool.getCheckedInResourceCount(), this.pool.getTotalResourceCount());
} catch(InterruptedException e) {
e.printStackTrace();
}

}

public class Checkers implements Runnable {

private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;

private final String key;
private final int checks;

private Random random;
private float invalidationRate;

Checkers(CountDownLatch startSignal,
CountDownLatch doneSignal,
String key,
int checks,
float invalidationRate) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;

this.key = key;
this.checks = checks;

this.random = new Random();
this.invalidationRate = invalidationRate;
}

@Override
public void run() {
startSignal.countDown();
try {
startSignal.await();
} catch(InterruptedException e) {
e.printStackTrace();
}

try {
TestResource tr = null;
for(int i = 0; i < checks; ++i) {
tr = pool.checkout(key);
assertTrue(tr.isValid());

// Invalid some resources (except on last checkin)
float f = random.nextFloat();
if(f < invalidationRate && i != checks - 1) {
tr.invalidate();
}
Thread.yield();

pool.checkin(key, tr);
Thread.yield();

// if(i % 1000 == 0) { printStats(key); }
}
} catch(Exception e) {
System.err.println(e.toString());
fail(e.toString());
}
doneSignal.countDown();
}
}
}
83 changes: 83 additions & 0 deletions test/unit/voldemort/utils/pool/KeyedResourcePoolRaceTest.java
@@ -0,0 +1,83 @@
/*
* Copyright 2012 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.utils.pool;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Test;

public class KeyedResourcePoolRaceTest extends KeyedResourcePoolBaseTest {

protected static final int POOL_SIZE = 100;
protected static final long TIMEOUT_MS = 500;
protected static final long NUM_TESTS = 250;

@Before
public void setUp() {
factory = new TestResourceFactory();
config = new ResourcePoolConfig().setMaxPoolSize(POOL_SIZE)
.setTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS);
this.pool = new KeyedResourcePool<String, TestResource>(factory, config);
}

// See http://code.google.com/p/project-voldemort/issues/detail?id=276
@Test
public void testAttemptGrow() {
ExecutorService service = Executors.newFixedThreadPool(POOL_SIZE);
for(int i = 0; i < NUM_TESTS; i++) {
final CountDownLatch checkouts = new CountDownLatch(POOL_SIZE);
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(POOL_SIZE);
for(int t = 0; t < POOL_SIZE; t++) {
tasks.add(new Callable<Boolean>() {

@Override
public Boolean call() throws Exception {
try {
TestResource resource = pool.checkout("a");
checkouts.countDown();
checkouts.await();
resource.invalidate();
pool.checkin("a", resource);
return true;
} catch(Exception e) {
checkouts.countDown();
throw e;
}
}
});
}
try {
List<Future<Boolean>> futures = service.invokeAll(tasks);
for(Future<Boolean> future: futures) {
assertTrue(future.get());
}
} catch(Exception e) {
fail("Unexpected exception - " + e.getMessage());
}
}
}
}

0 comments on commit 1f3a466

Please sign in to comment.