Skip to content

Commit

Permalink
Implement Thread caches for pooled buffers to minimize conditions. Th…
Browse files Browse the repository at this point in the history
…is fixes [#2264] and [#808].

Motivation:
Remove the synchronization bottleneck in PoolArena and so speed up things

Modifications:

This implementation uses kind of the same technics as outlined in the jemalloc paper and jemalloc
blogpost https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919.

At the moment we only cache for "known" Threads (that powers EventExecutors) and not for others to keep the overhead
minimal when need to free up unused buffers in the cache and free up cached buffers once the Thread completes. Here
we use multi-level caches for tiny, small and normal allocations. Huge allocations are not cached at all to keep the
memory usage at a sane level. All the different cache configurations can be adjusted via system properties or the constructor
directly where it makes sense.

Result:
Less conditions as most allocations can be served by the cache itself
  • Loading branch information
Norman Maurer committed Mar 20, 2014
1 parent 92037e8 commit d27679f
Show file tree
Hide file tree
Showing 4 changed files with 602 additions and 52 deletions.
99 changes: 75 additions & 24 deletions buffer/src/main/java/io/netty/buffer/PoolArena.java
Expand Up @@ -23,14 +23,16 @@

abstract class PoolArena<T> {

static final int numTinySubpagePools = 512 >>> 4;

final PooledByteBufAllocator parent;

private final int pageSize;
private final int maxOrder;
private final int pageShifts;
private final int chunkSize;
private final int subpageOverflowMask;

final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;

Expand All @@ -51,13 +53,13 @@ protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, i
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
subpageOverflowMask = ~(pageSize - 1);

tinySubpagePools = newSubpagePoolArray(512 >>> 4);
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}

smallSubpagePools = newSubpagePoolArray(pageShifts - 9);
numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
Expand Down Expand Up @@ -89,27 +91,56 @@ private PoolSubpage<T>[] newSubpagePoolArray(int size) {
return new PoolSubpage[size];
}

abstract boolean isDirect();

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}

static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}

static int smallIdx(int normCapacity) {
int tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
return tableIdx;
}

// capacity < pageSize
boolean isTinyOrSmall(int normCapacity) {
return (normCapacity & subpageOverflowMask) == 0;
}

// normCapacity < 512
static boolean isTiny(int normCapacity) {
return (normCapacity & 0xFFFFFE00) == 0;
}

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if ((normCapacity & subpageOverflowMask) == 0) { // capacity < pageSize
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
if ((normCapacity & 0xFFFFFE00) == 0) { // < 512
tableIdx = normCapacity >>> 4;
if (isTiny(normCapacity)) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}

Expand All @@ -124,11 +155,16 @@ private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int req
return;
}
}
} else if (normCapacity > chunkSize) {
} else if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
return;
}

allocateNormal(buf, reqCapacity, normCapacity);
}

Expand All @@ -151,10 +187,15 @@ private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
}

void free(PoolChunk<T> chunk, long handle) {
void free(PoolChunk<T> chunk, long handle, int normCapacity) {
if (chunk.unpooled) {
destroyChunk(chunk);
} else {
PoolThreadCache cache = parent.threadCache.get();
if (cache.add(this, chunk, handle, normCapacity)) {
// cached so not free it.
return;
}
synchronized (this) {
chunk.parent.free(chunk, handle);
}
Expand All @@ -164,7 +205,7 @@ void free(PoolChunk<T> chunk, long handle) {
PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx;
PoolSubpage<T>[] table;
if ((elemSize & 0xFFFFFE00) == 0) { // < 512
if (isTiny(elemSize)) { // < 512
tableIdx = elemSize >>> 4;
table = tinySubpagePools;
} else {
Expand All @@ -180,15 +221,15 @@ PoolSubpage<T> findSubpagePoolHead(int elemSize) {
return table[tableIdx];
}

private int normalizeCapacity(int reqCapacity) {
int normalizeCapacity(int reqCapacity) {
if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
}
if (reqCapacity >= chunkSize) {
return reqCapacity;
}

if ((reqCapacity & 0xFFFFFE00) != 0) { // >= 512
if (!isTiny(reqCapacity)) { // >= 512
// Doubled

int normalizedCapacity = reqCapacity;
Expand Down Expand Up @@ -228,7 +269,7 @@ void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;

int oldMaxLength = buf.maxLength;
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();

Expand All @@ -253,7 +294,7 @@ void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
buf.setIndex(readerIndex, writerIndex);

if (freeOldMemory) {
free(oldChunk, oldHandle);
free(oldChunk, oldHandle, oldMaxLength);
}
}

Expand Down Expand Up @@ -339,6 +380,11 @@ static final class HeapArena extends PoolArena<byte[]> {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
}

@Override
boolean isDirect() {
return false;
}

@Override
protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);
Expand Down Expand Up @@ -377,6 +423,11 @@ static final class DirectArena extends PoolArena<ByteBuffer> {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
}

@Override
boolean isDirect() {
return true;
}

@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<ByteBuffer>(
Expand Down

0 comments on commit d27679f

Please sign in to comment.