diff --git a/agrona/src/test/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBufferConcurrentTest.java b/agrona/src/test/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBufferConcurrentTest.java index a1b319db9..59b76cf9d 100644 --- a/agrona/src/test/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBufferConcurrentTest.java +++ b/agrona/src/test/java/org/agrona/concurrent/ringbuffer/ManyToOneRingBufferConcurrentTest.java @@ -75,15 +75,17 @@ public void shouldProvideCorrelationIds() throws Exception } @Test - public void shouldExchangeMessages() + public void shouldExchangeMessages() throws Exception { final int reps = 10_000_000; final int numProducers = 2; - final CyclicBarrier barrier = new CyclicBarrier(numProducers); + final CyclicBarrier barrier = new CyclicBarrier(numProducers + 1); + final Thread[] threads = new Thread[numProducers]; for (int i = 0; i < numProducers; i++) { - new Thread(new Producer(i, barrier, reps)).start(); + threads[i] = new Thread(new Producer(i, barrier, reps)); + threads[i].start(); } final int[] counts = new int[numProducers]; @@ -100,6 +102,8 @@ public void shouldExchangeMessages() counts[producerId]++; }; + barrier.await(); + int msgCount = 0; while (msgCount < (reps * numProducers)) { @@ -113,18 +117,25 @@ public void shouldExchangeMessages() } assertEquals(reps * numProducers, msgCount); + + for (final Thread t : threads) + { + t.join(); + } } @Test - public void shouldExchangeMessagesViaTryClaimCommit() + public void shouldExchangeMessagesViaTryClaimCommit() throws Exception { final int reps = 10_000_000; final int numProducers = 2; - final CyclicBarrier barrier = new CyclicBarrier(numProducers); + final CyclicBarrier barrier = new CyclicBarrier(numProducers + 1); + final Thread[] threads = new Thread[numProducers]; for (int i = 0; i < numProducers; i++) { - new Thread(new ClaimCommit(i, barrier, reps)).start(); + threads[i] = new Thread(new ClaimCommit(i, barrier, reps)); + threads[i].start(); } final int[] counts = new int[numProducers]; @@ -141,6 +152,8 @@ public void shouldExchangeMessagesViaTryClaimCommit() counts[producerId]++; }; + barrier.await(); + int msgCount = 0; while (msgCount < (reps * numProducers)) { @@ -154,18 +167,25 @@ public void shouldExchangeMessagesViaTryClaimCommit() } assertEquals(reps * numProducers, msgCount); + + for (final Thread t : threads) + { + t.join(); + } } @Test - public void shouldExchangeMessagesViaTryClaimAbort() + public void shouldExchangeMessagesViaTryClaimAbort() throws Exception { final int reps = 10_000_000; final int numProducers = 2; - final CyclicBarrier barrier = new CyclicBarrier(numProducers); + final CyclicBarrier barrier = new CyclicBarrier(numProducers + 1); + final Thread[] threads = new Thread[numProducers]; for (int i = 0; i < numProducers; i++) { - new Thread(new ClaimAbort(i, barrier, reps)).start(); + threads[i] = new Thread(new ClaimAbort(i, barrier, reps)); + threads[i].start(); } final int[] counts = new int[numProducers]; @@ -182,6 +202,8 @@ public void shouldExchangeMessagesViaTryClaimAbort() counts[producerId]++; }; + barrier.await(); + int msgCount = 0; while (msgCount < (reps * numProducers)) { @@ -195,6 +217,11 @@ public void shouldExchangeMessagesViaTryClaimAbort() } assertEquals(reps * numProducers, msgCount); + + for (final Thread t : threads) + { + t.join(); + } } class Producer implements Runnable diff --git a/agrona/src/test/java/org/agrona/concurrent/ringbuffer/OneToOneRingBufferConcurrentTest.java b/agrona/src/test/java/org/agrona/concurrent/ringbuffer/OneToOneRingBufferConcurrentTest.java index 37195b8ca..7cda02707 100644 --- a/agrona/src/test/java/org/agrona/concurrent/ringbuffer/OneToOneRingBufferConcurrentTest.java +++ b/agrona/src/test/java/org/agrona/concurrent/ringbuffer/OneToOneRingBufferConcurrentTest.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; +import java.util.concurrent.CyclicBarrier; import static org.agrona.BitUtil.SIZE_OF_INT; import static org.agrona.BitUtil.SIZE_OF_LONG; @@ -34,16 +35,18 @@ public class OneToOneRingBufferConcurrentTest { private static final int MSG_TYPE_ID = 7; - public static final int REPETITIONS = 1; + public static final int REPETITIONS = 10_000_000; private final ByteBuffer byteBuffer = ByteBuffer.allocateDirect((16 * 1024) + TRAILER_LENGTH); private final UnsafeBuffer unsafeBuffer = new UnsafeBuffer(byteBuffer); private final RingBuffer ringBuffer = new OneToOneRingBuffer(unsafeBuffer); @Test - public void shouldExchangeMessages() + public void shouldExchangeMessages() throws Exception { - new Producer().start(); + final CyclicBarrier barrier = new CyclicBarrier(2); + final Producer producer = new Producer(barrier); + producer.start(); final MutableInteger count = new MutableInteger(); final MessageHandler handler = @@ -56,6 +59,8 @@ public void shouldExchangeMessages() count.increment(); }; + barrier.await(); + while (count.get() < REPETITIONS) { final int readCount = ringBuffer.read(handler); @@ -64,12 +69,16 @@ public void shouldExchangeMessages() Thread.yield(); } } + + producer.join(); } @Test - public void shouldExchangeMessagesViaTryClaimCommit() + public void shouldExchangeMessagesViaTryClaimCommit() throws Exception { - new ClaimCommit().start(); + final CyclicBarrier barrier = new CyclicBarrier(2); + final ClaimCommit producer = new ClaimCommit(barrier); + producer.start(); final MutableInteger count = new MutableInteger(); final MessageHandler handler = @@ -84,6 +93,8 @@ public void shouldExchangeMessagesViaTryClaimCommit() count.increment(); }; + barrier.await(); + while (count.get() < REPETITIONS) { final int readCount = ringBuffer.read(handler); @@ -92,12 +103,16 @@ public void shouldExchangeMessagesViaTryClaimCommit() Thread.yield(); } } + + producer.join(); } @Test - public void shouldExchangeMessagesViaTryClaimAbort() + public void shouldExchangeMessagesViaTryClaimAbort() throws Exception { - new ClaimAbort().start(); + final CyclicBarrier barrier = new CyclicBarrier(2); + final ClaimAbort producer = new ClaimAbort(barrier); + producer.start(); final MutableInteger count = new MutableInteger(); final MessageHandler handler = @@ -111,6 +126,8 @@ public void shouldExchangeMessagesViaTryClaimAbort() count.increment(); }; + barrier.await(); + while (count.get() < REPETITIONS) { final int readCount = ringBuffer.read(handler); @@ -119,18 +136,30 @@ public void shouldExchangeMessagesViaTryClaimAbort() Thread.yield(); } } + + producer.join(); } class Producer extends Thread { - Producer() + private final CyclicBarrier barrier; + + Producer(final CyclicBarrier barrier) { super("producer"); + this.barrier = barrier; } public void run() { final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[1024]); + try + { + barrier.await(); + } + catch (final Exception ignore) + { + } for (int i = 0; i < REPETITIONS; i++) { @@ -146,13 +175,24 @@ public void run() class ClaimCommit extends Thread { - ClaimCommit() + private final CyclicBarrier barrier; + + ClaimCommit(final CyclicBarrier barrier) { super("tryClaim-commit"); + this.barrier = barrier; } public void run() { + try + { + barrier.await(); + } + catch (final Exception ignore) + { + } + final int length = SIZE_OF_INT + SIZE_OF_LONG; for (int i = 0; i < REPETITIONS; i++) { @@ -179,14 +219,25 @@ public void run() class ClaimAbort extends Thread { - ClaimAbort() + private final CyclicBarrier barrier; + + ClaimAbort(final CyclicBarrier barrier) { super("tryClaim-abort"); + this.barrier = barrier; } public void run() { final UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[1024]); + try + { + barrier.await(); + } + catch (final Exception ignore) + { + } + for (int i = 0; i < REPETITIONS; i++) {