Skip to content

Commit

Permalink
Move the PoolThreadCache finalizer to a separate object (#13510)
Browse files Browse the repository at this point in the history
Motivation:
PoolThreadCache objects are created even when they are not meant to pool
anything.
In such a case, there is no point in giving them a finalizer() method.

Modification:
The finalizer method is moved to a separate object, which is
conditionally referenced from the PoolThreadCache.
Non-pooling PoolThreadCache objects will not create their FreeOnFinalize
objects.

This is an alternative implementation to #13408, which is also reverted
by this PR.

Additionally, #13408 added a condition to create non-caching
PoolThreadCache when the size of the small and normal caches were zero.
However, it turned out that even when these were requested to be zero, a
single-element cache would be created for them.
This PR also reverts the logic to the old behaviour with the
single-element cache for small and normal sizes.

Result:
We avoid creating finalizable objects when we don't pool.
  • Loading branch information
chrisvest committed Jul 24, 2023
1 parent e2f71fe commit dc16c58
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 115 deletions.
14 changes: 7 additions & 7 deletions buffer/src/main/java/io/netty/buffer/PoolArena.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ private PoolSubpage<T>[] newSubpagePoolArray(int size) {

abstract boolean isDirect();

PooledByteBuf<T> allocate(PoolArenasCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}

private void allocate(PoolArenasCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int sizeIdx = size2SizeIdx(reqCapacity);

if (sizeIdx <= smallMaxSizeIdx) {
Expand All @@ -145,7 +145,7 @@ private void allocate(PoolArenasCache cache, PooledByteBuf<T> buf, final int req
}
}

private void tcacheAllocateSmall(PoolArenasCache cache, PooledByteBuf<T> buf, final int reqCapacity,
private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
final int sizeIdx) {

if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) {
Expand Down Expand Up @@ -186,7 +186,7 @@ private void tcacheAllocateSmall(PoolArenasCache cache, PooledByteBuf<T> buf, fi
incSmallAllocation();
}

private void tcacheAllocateNormal(PoolArenasCache cache, PooledByteBuf<T> buf, final int reqCapacity,
private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
final int sizeIdx) {
if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) {
// was able to allocate out of the cache so move on
Expand All @@ -201,7 +201,7 @@ private void tcacheAllocateNormal(PoolArenasCache cache, PooledByteBuf<T> buf, f
}
}

private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolArenasCache threadCache) {
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
assert lock.isHeldByCurrentThread();
if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
Expand Down Expand Up @@ -229,7 +229,7 @@ private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
allocationsHuge.increment();
}

void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolArenasCache cache) {
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
chunk.decrementPinnedMemory(normCapacity);
if (chunk.unpooled) {
int size = chunk.chunkSize();
Expand Down Expand Up @@ -294,7 +294,7 @@ void reallocate(PooledByteBuf<T> buf, int newCapacity) {
final T oldMemory;
final int oldOffset;
final int oldMaxLength;
final PoolArenasCache oldCache;
final PoolThreadCache oldCache;

// We synchronize on the ByteBuf itself to ensure there is no "concurrent" reallocations for the same buffer.
// We do this to ensure the ByteBuf internal fields that are used to allocate / free are not accessed
Expand Down
58 changes: 0 additions & 58 deletions buffer/src/main/java/io/netty/buffer/PoolArenasCache.java

This file was deleted.

6 changes: 3 additions & 3 deletions buffer/src/main/java/io/netty/buffer/PoolChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private int usage(int freeBytes) {
return 100 - freePercentage;
}

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolArenasCache cache) {
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
final long handle;
if (sizeIdx <= arena.smallMaxSizeIdx) {
// small
Expand Down Expand Up @@ -582,7 +582,7 @@ private static long toRunHandle(int runOffset, int runPages, int inUsed) {
}

void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
PoolArenasCache threadCache) {
PoolThreadCache threadCache) {
if (isSubpage(handle)) {
initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
} else {
Expand All @@ -593,7 +593,7 @@ void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCap
}

void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
PoolArenasCache threadCache) {
PoolThreadCache threadCache) {
int runOffset = runOffset(handle);
int bitmapIdx = bitmapIdx(handle);

Expand Down
2 changes: 1 addition & 1 deletion buffer/src/main/java/io/netty/buffer/PoolChunkList.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void prevList(PoolChunkList<T> prevList) {
this.prevList = prevList;
}

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolArenasCache threadCache) {
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
int normCapacity = arena.sizeIdx2size(sizeIdx);
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
Expand Down
54 changes: 36 additions & 18 deletions buffer/src/main/java/io/netty/buffer/PoolThreadCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@
* <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
* Scalable memory allocation using jemalloc</a>.
*/
final class PoolThreadCache extends PoolArenasCache {
final class PoolThreadCache {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;

final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;

// Hold the caches for the different size classes, which are small and normal.
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
Expand All @@ -53,6 +58,8 @@ final class PoolThreadCache extends PoolArenasCache {

private final int freeSweepAllocationThreshold;
private final AtomicBoolean freed = new AtomicBoolean();
@SuppressWarnings("unused") // Field is only here for the finalizer.
private final FreeOnFinalize freeOnFinalize;

private int allocations;

Expand All @@ -61,10 +68,11 @@ final class PoolThreadCache extends PoolArenasCache {

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
int freeSweepAllocationThreshold) {
super(heapArena, directArena);
int freeSweepAllocationThreshold, boolean useFinalizer) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools);
Expand Down Expand Up @@ -100,6 +108,7 @@ final class PoolThreadCache extends PoolArenasCache {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
freeOnFinalize = useFinalizer ? new FreeOnFinalize(this) : null;
}

private static <T> MemoryRegionCache<T>[] createSubPageCaches(
Expand Down Expand Up @@ -134,18 +143,21 @@ private static <T> MemoryRegionCache<T>[] createNormalCaches(
}
}

// val > 0
static int log2(int val) {
return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
}

/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
@Override
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
}

/**
* Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
@Override
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
}
Expand All @@ -169,7 +181,6 @@ private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqC
* Returns {@code true} if it fit into the cache {@code false} otherwise.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
long handle, int normCapacity, SizeClass sizeClass) {
int sizeIdx = area.size2SizeIdx(normCapacity);
Expand All @@ -194,20 +205,9 @@ private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass siz
}
}

/// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
@Override
protected void finalize() throws Throwable {
try {
super.finalize();
} finally {
free(true);
}
}

/**
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
*/
@Override
void free(boolean finalizer) {
// As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
// we only call this one time.
Expand Down Expand Up @@ -266,7 +266,6 @@ private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
return cache.free(finalizer);
}

@Override
void trim() {
trim(smallSubPageDirectCaches);
trim(normalDirectCaches);
Expand Down Expand Up @@ -490,4 +489,23 @@ public Entry newObject(Handle<Entry> handle) {
}
});
}

private static final class FreeOnFinalize {
private final PoolThreadCache cache;

private FreeOnFinalize(PoolThreadCache cache) {
this.cache = cache;
}

/// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
@SuppressWarnings({"FinalizeDeclaration", "deprecation"})
@Override
protected void finalize() throws Throwable {
try {
super.finalize();
} finally {
cache.free(true);
}
}
}
}
6 changes: 3 additions & 3 deletions buffer/src/main/java/io/netty/buffer/PooledByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
protected int offset;
protected int length;
int maxLength;
PoolArenasCache cache;
PoolThreadCache cache;
ByteBuffer tmpNioBuf;
private ByteBufAllocator allocator;

Expand All @@ -48,7 +48,7 @@ protected PooledByteBuf(Handle<? extends PooledByteBuf<T>> recyclerHandle, int m
}

void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolArenasCache cache) {
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
}

Expand All @@ -57,7 +57,7 @@ void initUnpooled(PoolChunk<T> chunk, int length) {
}

private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolArenasCache cache) {
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
assert !PoolChunk.isSubpage(handle) || chunk.arena.size2SizeIdx(maxLength) <= chunk.arena.smallMaxSizeIdx:
Expand Down

0 comments on commit dc16c58

Please sign in to comment.