Skip to content

Commit

Permalink
Revert "ConcurrentTest optimisation"
Browse files Browse the repository at this point in the history
This reverts commit d9610a2.
  • Loading branch information
Rui Gu committed Apr 28, 2016
1 parent 4cbae15 commit 9eeb4c0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 15 deletions.
65 changes: 51 additions & 14 deletions src/test/java/org/redisson/BaseConcurrentTest.java
Expand Up @@ -14,6 +14,43 @@
public abstract class BaseConcurrentTest extends BaseTest { public abstract class BaseConcurrentTest extends BaseTest {


protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Multi Instance Concurrent Job Interation: " + iterations);
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
final Map<Integer, RedissonClient> instances = new HashMap<>();

pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> instances.put(i, BaseTest.createInstance()));
});

long watch = System.currentTimeMillis();
pool.awaitQuiescence(5, TimeUnit.MINUTES);

pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> runnable.run(instances.get(i)));
});

pool.shutdown();
Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES));

System.out.println("multi: " + (System.currentTimeMillis() - watch));

pool = new ForkJoinPool();

pool.submit(() -> {
instances.values()
.parallelStream()
.<RedisClient>forEach((r) -> r.shutdown());
});

pool.shutdown();
Assert.assertTrue(pool.awaitTermination(5, TimeUnit.MINUTES));
}

protected void testMultiInstanceConcurrencySequentiallyLaunched(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Multi Instance Concurrent Job Interation: " + iterations); System.out.println("Multi Instance Concurrent Job Interation: " + iterations);
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);


Expand Down Expand Up @@ -45,25 +82,25 @@ protected void testMultiInstanceConcurrency(int iterations, final RedissonRunnab


protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException { protected void testSingleInstanceConcurrency(int iterations, final RedissonRunnable runnable) throws InterruptedException {
System.out.println("Single Instance Concurrent Job Interation: " + iterations); System.out.println("Single Instance Concurrent Job Interation: " + iterations);
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); final RedissonClient r = BaseTest.createInstance();

final RedissonClient redisson = BaseTest.createInstance();
long watch = System.currentTimeMillis(); long watch = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
runnable.run(redisson);
}
});
}


executor.shutdown(); ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
Assert.assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
pool.submit(() -> {
IntStream.range(0, iterations)
.parallel()
.forEach((i) -> {
runnable.run(r);
});
});

pool.shutdown();
Assert.assertTrue(pool.awaitTermination(RedissonRuntimeEnvironment.isTravis ? 10 : 3, TimeUnit.MINUTES));


System.out.println(System.currentTimeMillis() - watch); System.out.println(System.currentTimeMillis() - watch);


redisson.shutdown(); r.shutdown();
} }


} }
2 changes: 1 addition & 1 deletion src/test/java/org/redisson/RedissonSemaphoreTest.java
Expand Up @@ -234,7 +234,7 @@ public void testConcurrency_MultiInstance_10_permits() throws InterruptedExcepti


final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits()); final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits());
final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits()); final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits());
testMultiInstanceConcurrency(iterations, r -> { testMultiInstanceConcurrencySequentiallyLaunched(iterations, r -> {
RSemaphore s1 = r.getSemaphore("test"); RSemaphore s1 = r.getSemaphore("test");
try { try {
s1.acquire(); s1.acquire();
Expand Down

0 comments on commit 9eeb4c0

Please sign in to comment.