Skip to content
Permalink
Browse files

[#5505] Enforce Recycler limit when recycling from different threads

Motivation:

Currently, the recycler max capacity it's only enforced on the
thread-local stack which is used when the recycling happens on the
same thread that requested the object.

When the recycling happens in a different thread, then the objects
will be queued into a linked list (where each node holds N objects,
default=16). These objects are then transfered into the stack when
new objects are requested and the stack is empty.

The problem is that the queue doesn't have a max capacity and that
can lead to bad scenarios. Eg:

- Allocate 1M object from recycler
- Recycle all of them from different thread
- Recycler WeakOrderQueue will contain 1M objects
- Reference graph will be very long to traverse and GC timeseems to be negatively impacted
- Size of the queue will never shrink after this

Modifications:

Add some shared counter which is used to manage capacity limits when recycle from different thread then the allocation thread. We modify the counter whenever we allocate a new Link to reduce the overhead of increment / decrement it.

Result:

More predictable number of objects mantained in the recycler pool.
  • Loading branch information...
normanmaurer committed Jul 11, 2016
1 parent 771cfae commit afafadd3d7caf1e4b346da049baab0afeae0a4bc
Showing with 114 additions and 10 deletions.
  1. +66 −10 common/src/main/java/io/netty/util/Recycler.java
  2. +48 −0 common/src/test/java/io/netty/util/RecyclerTest.java
@@ -28,6 +28,9 @@
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Math.max;
import static java.lang.Math.min;

/**
* Light-weight object pool based on a thread-local stack.
*
@@ -48,8 +51,10 @@ public void recycle(Object object) {
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
// TODO: Some arbitrary large number - should adjust as we get more production experience.
private static final int DEFAULT_INITIAL_MAX_CAPACITY = 262144;

private static final int DEFAULT_MAX_CAPACITY;
private static final int INITIAL_CAPACITY;
private static final int MAX_SHARED_CAPACITY_FACTOR;
private static final int LINK_CAPACITY;

static {
@@ -60,30 +65,37 @@ public void recycle(Object object) {
if (maxCapacity < 0) {
maxCapacity = DEFAULT_INITIAL_MAX_CAPACITY;
}

DEFAULT_MAX_CAPACITY = maxCapacity;

MAX_SHARED_CAPACITY_FACTOR = max(2,
SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
2));

LINK_CAPACITY = MathUtil.findNextPositivePowerOfTwo(
Math.max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));

if (logger.isDebugEnabled()) {
if (DEFAULT_MAX_CAPACITY == 0) {
logger.debug("-Dio.netty.recycler.maxCapacity: disabled");
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
} else {
logger.debug("-Dio.netty.recycler.maxCapacity: {}", DEFAULT_MAX_CAPACITY);
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
}
}

INITIAL_CAPACITY = Math.min(DEFAULT_MAX_CAPACITY, 256);
INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY, 256);
}

private final int maxCapacity;
private final int maxSharedCapacityFactor;

private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity);
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor);
}
};

@@ -92,7 +104,17 @@ protected Recycler() {
}

protected Recycler(int maxCapacity) {
this.maxCapacity = Math.max(0, maxCapacity);
this(maxCapacity, MAX_SHARED_CAPACITY_FACTOR);
}

protected Recycler(int maxCapacity, int maxSharedCapacityFactor) {
if (maxCapacity <= 0) {
this.maxCapacity = 0;
this.maxSharedCapacityFactor = 1;
} else {
this.maxCapacity = maxCapacity;
this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
}
}

@SuppressWarnings("unchecked")
@@ -201,6 +223,7 @@ public void recycle(Object object) {
private WeakOrderQueue next;
private final WeakReference<Thread> owner;
private final int id = ID_GENERATOR.getAndIncrement();
private final Stack<?> stack;

WeakOrderQueue(Stack<?> stack, Thread thread) {
head = tail = new Link();
@@ -209,6 +232,10 @@ public void recycle(Object object) {
next = stack.head;
stack.head = this;
}
this.stack = stack;
// We allocated a Link so reserve the space
boolean reserved = stack.reserveSpace(LINK_CAPACITY);
assert reserved;
}

void add(DefaultHandle<?> handle) {
@@ -217,7 +244,13 @@ void add(DefaultHandle<?> handle) {
Link tail = this.tail;
int writeIndex;
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
if (!stack.reserveSpace(LINK_CAPACITY)) {
// Drop it.
return;
}
// We allocate a Link so reserve the space
this.tail = tail = tail.next = new Link();

writeIndex = tail.get();
}
tail.elements[writeIndex] = handle;
@@ -259,7 +292,7 @@ boolean transfer(Stack<?> dst) {

if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}

if (srcStart != srcEnd) {
@@ -280,6 +313,9 @@ boolean transfer(Stack<?> dst) {
dst.size = newDstSize;

if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
stack.reclaimSpace(LINK_CAPACITY);

this.head = head.next;
}

@@ -303,15 +339,35 @@ boolean transfer(Stack<?> dst) {
private DefaultHandle<?>[] elements;
private final int maxCapacity;
private int size;
private final AtomicInteger availableSharedCapacity;

private volatile WeakOrderQueue head;
private WeakOrderQueue cursor, prev;

Stack(Recycler<T> parent, Thread thread, int maxCapacity) {
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor) {
this.parent = parent;
this.thread = thread;
this.maxCapacity = maxCapacity;
elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)];
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
}

boolean reserveSpace(int space) {
assert space >= 0;
for (;;) {
int available = availableSharedCapacity.get();
if (available < space) {
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - space)) {
return true;
}
}
}

void reclaimSpace(int space) {
assert space >= 0;
availableSharedCapacity.addAndGet(space);
}

int increaseCapacity(int expectedCapacity) {
@@ -321,7 +377,7 @@ int increaseCapacity(int expectedCapacity) {
newCapacity <<= 1;
} while (newCapacity < expectedCapacity && newCapacity < maxCapacity);

newCapacity = Math.min(newCapacity, maxCapacity);
newCapacity = min(newCapacity, maxCapacity);
if (newCapacity != elements.length) {
elements = Arrays.copyOf(elements, newCapacity);
}
@@ -421,7 +477,7 @@ void push(DefaultHandle<?> item) {
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity));
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}

elements[size] = item;
@@ -18,6 +18,7 @@
import org.junit.Test;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
@@ -199,6 +200,53 @@ public void run() {
assertThat(recycler.threadLocalSize(), is(0));
}

@Test
public void testDiscardingExceedingElementsWithRecycleAtDifferentThread() throws Exception {
final int maxCapacity = 32;
final AtomicInteger instancesCount = new AtomicInteger(0);

final Recycler<HandledObject> recycler = new Recycler<HandledObject>(maxCapacity, 2) {
@Override
protected HandledObject newObject(Recycler.Handle<HandledObject> handle) {
instancesCount.incrementAndGet();
return new HandledObject(handle);
}
};

// Borrow 2 * maxCapacity objects.
final HandledObject[] array = new HandledObject[maxCapacity * 2];
for (int i = 0; i < array.length; i++) {
array[i] = recycler.get();
}

assertEquals(array.length, instancesCount.get());
// Reset counter.
instancesCount.set(0);

// Recycle from other thread.
final Thread thread = new Thread() {
@Override
public void run() {
for (HandledObject object: array) {
object.recycle();
}
}
};
thread.start();
thread.join();

assertEquals(0, instancesCount.get());

// Borrow 2 * maxCapacity objects. Half of them should come from
// the recycler queue, the other half should be freshly allocated.
for (int i = 0; i < array.length; i++) {
recycler.get();
}

// The implementation uses maxCapacity / 2 as limit per WeakOrderQueue
assertEquals(array.length - maxCapacity / 2, instancesCount.get());
}

static final class HandledObject {
Recycler.Handle<HandledObject> handle;

0 comments on commit afafadd

Please sign in to comment.
You can’t perform that action at this time.