Skip to content

Commit

Permalink
PooledByteBufAllocatorTest may has memory visiblity issues as it uses…
Browse files Browse the repository at this point in the history
… non concurrent queue

Motivation:

PooledByteBufAllocatorTest uses an ArrayQueue but access it from multiple threads (not concurrently but still from different threads). This may leak to memory visibility issues.

Modifications:

- Use a concurrent queue
- Some cleanup

Result:

Non racy test code.
  • Loading branch information
normanmaurer committed Dec 2, 2016
1 parent 2b8fd8d commit 243b2b9
Showing 1 changed file with 14 additions and 15 deletions.
Expand Up @@ -21,12 +21,13 @@
import io.netty.util.internal.SystemPropertyUtil;
import org.junit.Test;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -339,11 +340,9 @@ private static final class AllocationThread extends Thread {
}
}

private final CountDownLatch latch = new CountDownLatch(1);
private final Queue<ByteBuf> buffers = new ArrayDeque<ByteBuf>(10);
private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<ByteBuf>();
private final ByteBufAllocator allocator;
private volatile boolean finished;
private volatile Throwable error;
private final AtomicReference<Object> finish = new AtomicReference<Object>();

public AllocationThread(ByteBufAllocator allocator) {
this.allocator = allocator;
Expand All @@ -353,7 +352,7 @@ public AllocationThread(ByteBufAllocator allocator) {
public void run() {
try {
int idx = 0;
while (!finished) {
while (finish.get() == null) {
for (int i = 0; i < 10; i++) {
buffers.add(allocator.directBuffer(
ALLOCATION_SIZES[Math.abs(idx++ % ALLOCATION_SIZES.length)],
Expand All @@ -362,12 +361,10 @@ public void run() {
releaseBuffers();
}
} catch (Throwable cause) {
error = cause;
finished = true;
finish.set(cause);
} finally {
releaseBuffers();
}
latch.countDown();
}

private void releaseBuffers() {
Expand All @@ -381,22 +378,24 @@ private void releaseBuffers() {
}

public boolean isFinished() {
return finished;
return finish.get() != null;
}

public void finish() throws Throwable {
try {
finished = true;
latch.await();
checkForError();
// Mark as finish if not already done but ensure we not override the previous set error.
finish.compareAndSet(null, Boolean.TRUE);
join();
} finally {
releaseBuffers();
}
checkForError();
}

public void checkForError() throws Throwable {
if (error != null) {
throw error;
Object obj = finish.get();
if (obj instanceof Throwable) {
throw (Throwable) obj;
}
}
}
Expand Down

0 comments on commit 243b2b9

Please sign in to comment.