Skip to content

Commit

Permalink
ConcurrentTest optimisation
Browse files Browse the repository at this point in the history
Reverting back BaseConcurrentTest to use ExecutorService instead of
ForkJoinPool.
  • Loading branch information
Rui Gu committed Apr 28, 2016
1 parent 945426f commit d9610a2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 52 deletions.
65 changes: 14 additions & 51 deletions src/test/java/org/redisson/BaseConcurrentTest.java
Expand Up @@ -14,43 +14,6 @@
public abstract class BaseConcurrentTest extends BaseTest {

protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Multi Instance Concurrent Job Interation: " + iterations);
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
final Map<Integer, RedissonClient> instances = new HashMap<>();

pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> instances.put(i, BaseTest.createInstance()));
});

long watch = System.currentTimeMillis();
pool.awaitQuiescence(5, TimeUnit.MINUTES);

pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> runnable.run(instances.get(i)));
});

pool.shutdown();
Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES));

System.out.println("multi: " + (System.currentTimeMillis() - watch));

pool = new ForkJoinPool();

pool.submit(() -> {
instances.values()
.parallelStream()
.<RedisClient>forEach((r) -> r.shutdown());
});

pool.shutdown();
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));
}

protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Multi Instance Concurrent Job Interation: " + iterations);
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

Expand Down Expand Up @@ -82,25 +45,25 @@ protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations,

protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Single Instance Concurrent Job Interation: " + iterations);
final RedissonClient r = BaseTest.createInstance();
long watch = System.currentTimeMillis();

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> {
runnable.run(r);
});
});
final RedissonClient redisson = BaseTest.createInstance();
long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
runnable.run(redisson);
}
});
}

pool.shutdown();
Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES));
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));

System.out.println(System.currentTimeMillis() - watch);

r.shutdown();
redisson.shutdown();
}

}
2 changes: 1 addition & 1 deletion src/test/java/org/redisson/RedissonSemaphoreTest.java
Expand Up @@ -234,7 +234,7 @@ public void testConcurrency_MultiInstance_10_permits() throws InterruptedExcepti

final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits());
final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits());
testMultiInstanceConcurrencySequentiallyLaunched(iterations, r -> {
testMultiInstanceConcurrency(iterations, r -> {
RSemaphore s1 = r.getSemaphore("test");
try {
s1.acquire();
Expand Down

0 comments on commit d9610a2

Please sign in to comment.