Skip to content

Commit

Permalink
Trying to improve concurrent tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Rui Gu committed Mar 31, 2016
1 parent 5e4267d commit d6e8d00
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 49 deletions.
88 changes: 40 additions & 48 deletions src/test/java/org/redisson/BaseConcurrentTest.java
Expand Up @@ -2,76 +2,68 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.Assert;
import org.redisson.client.RedisClient;

public abstract class BaseConcurrentTest extends BaseTest {

protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
final Map<Integer, RedissonClient> instances = new HashMap<>();

final Map<Integer, RedissonClient> instances = new HashMap<Integer, RedissonClient>();
for (int i = 0; i < iterations; i++) {
instances.put(i, BaseTest.createInstance());
}
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> instances.put(i, BaseTest.createInstance()));
});

long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
final int n = i;
executor.execute(new Runnable() {
@Override
public void run() {
RedissonClient redisson = instances.get(n);
runnable.run(redisson);
}
});
}

executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
pool.awaitQuiescence(5, TimeUnit.MINUTES);

pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> runnable.run(instances.get(i)));
});

pool.shutdown();
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));

System.out.println("multi: " + (System.currentTimeMillis() - watch));

executor = Executors.newCachedThreadPool();
pool = new ForkJoinPool();

for (final RedissonClient redisson : instances.values()) {
executor.execute(new Runnable() {
@Override
public void run() {
redisson.shutdown();
}
});
}
pool.submit(() -> {
instances.values()
.parallelStream()
.<RedisClient>forEach((r) -> r.shutdown());
});

executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
pool.shutdown();
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));
}

protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);

final RedissonClient redisson = BaseTest.createInstance();
final RedissonClient r = BaseTest.createInstance();
long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
runnable.run(redisson);
}
});
}

executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));

System.out.println(System.currentTimeMillis() - watch);
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> runnable.run(r));
});

redisson.shutdown();
}
pool.shutdown();
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));

System.out.println(System.currentTimeMillis() - watch);

r.shutdown();
}

}
3 changes: 2 additions & 1 deletion src/test/java/org/redisson/BaseTest.java
Expand Up @@ -45,7 +45,8 @@ public static RedissonClient createInstance() {
}

@Before
public void before() {
public void before() throws InterruptedException {
Thread.sleep(5000l);
redisson.getKeys().flushall();
}
}

0 comments on commit d6e8d00

Please sign in to comment.