Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment with ArrayByteBufferPool performance #11426

Merged
merged 5 commits into from Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,7 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
Expand Down Expand Up @@ -64,9 +64,8 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
private final int _maxCapacity;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();
private final IntUnaryOperator _bucketIndexFor;
private final AtomicBoolean _evictor = new AtomicBoolean(false);
private boolean _statisticsEnabled;

/**
Expand Down Expand Up @@ -215,7 +214,6 @@ public RetainableByteBuffer acquire(int size, boolean direct)
if (entry != null)
{
bucket.recordPooled();
subtractMemory(bucket.getCapacity(), direct);
RetainableByteBuffer buffer = entry.getPooled();
((Buffer)buffer).acquire();
return buffer;
Expand All @@ -228,35 +226,25 @@ private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer)
{
bucket.recordRelease();

boolean direct = buffer.isDirect();
int capacity = bucket.getCapacity();

// Discard the buffer if maxMemory is exceeded.
long excessMemory = addMemoryAndGetExcess(bucket, direct);
if (excessMemory > 0)
{
subtractMemory(capacity, direct);
bucket.recordNonPooled();
return;
}

// Try to reserve an entry to put the buffer into the pool.
Pool.Entry<RetainableByteBuffer> entry = bucket.getPool().reserve();
// Cannot reserve, discard the buffer.
if (entry == null)
{
subtractMemory(capacity, direct);
bucket.recordNonPooled();
return;
}

// Add the buffer to the new entry.
ByteBuffer byteBuffer = buffer.getByteBuffer();
BufferUtil.reset(byteBuffer);
Buffer pooledBuffer = new Buffer(byteBuffer, b -> release(bucket, entry));
if (entry.enable(pooledBuffer, false))
{
checkMaxMemory(bucket, buffer.isDirect());
return;
}

// Discard the buffer if the entry cannot be enabled.
subtractMemory(capacity, direct);
bucket.recordNonPooled();
entry.remove();
}
Expand All @@ -267,46 +255,43 @@ private void release(RetainedBucket bucket, Pool.Entry<RetainableByteBuffer> ent

RetainableByteBuffer buffer = entry.getPooled();
BufferUtil.reset(buffer.getByteBuffer());
boolean direct = buffer.isDirect();
int capacity = bucket.getCapacity();
long excessMemory = addMemoryAndGetExcess(bucket, direct);
if (excessMemory > 0)
{
bucket.recordEvict();
// If we cannot free enough space for the entry, remove it.
if (!evict(excessMemory, bucket, direct))
{
subtractMemory(capacity, direct);
bucket.recordRemove();
entry.remove();
return;
}
}

// We have enough space for this entry, pool it.
// Release the buffer and check the memory 1% of the times.
int used = ((Buffer)buffer).use();
if (entry.release())
{
if (used % 100 == 0)
checkMaxMemory(bucket, buffer.isDirect());
return;
}

// Not enough space, discard this buffer.
subtractMemory(capacity, direct);
// Cannot release, discard this buffer.
bucket.recordRemove();
entry.remove();
}

private long addMemoryAndGetExcess(RetainedBucket bucket, boolean direct)
private void checkMaxMemory(RetainedBucket bucket, boolean direct)
{
long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
if (maxMemory < 0)
return -1;

AtomicLong memory = direct ? _directMemory : _heapMemory;
int capacity = bucket.getCapacity();
long newMemory = memory.addAndGet(capacity);
// Account the excess at most for the bucket capacity.
return Math.min(capacity, newMemory - maxMemory);
long max = direct ? _maxDirectMemory : _maxHeapMemory;
if (max <= 0 || !_evictor.compareAndSet(false, true))
gregw marked this conversation as resolved.
Show resolved Hide resolved
return;
try
{
long memory = getMemory(direct);
long excess = memory - max;
if (excess > 0)
{
bucket.recordEvict();
evict(excess, direct);
gregw marked this conversation as resolved.
Show resolved Hide resolved
}
}
finally
{
_evictor.set(false);
}
}

private boolean evict(long excessMemory, RetainedBucket target, boolean direct)
private void evict(long excessMemory, boolean direct)
{
RetainedBucket[] buckets = direct ? _direct : _indirect;
int length = buckets.length;
Expand All @@ -316,28 +301,12 @@ private boolean evict(long excessMemory, RetainedBucket target, boolean direct)
RetainedBucket bucket = buckets[index++];
if (index == length)
index = 0;
// Do not evict from the bucket the buffer is released into.
if (bucket == target)
continue;

int evicted = bucket.evict();
subtractMemory(evicted, direct);

excessMemory -= evicted;
if (excessMemory <= 0)
return true;
return;
}
return false;
}

protected ByteBuffer allocate(int capacity)
{
return ByteBuffer.allocate(capacity);
}

protected ByteBuffer allocateDirect(int capacity)
{
return ByteBuffer.allocateDirect(capacity);
}

private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<RetainableByteBuffer> releaser)
Expand Down Expand Up @@ -415,47 +384,27 @@ public long getHeapMemory()

private long getMemory(boolean direct)
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
AtomicLong memory = direct ? _directMemory : _heapMemory;
return memory.longValue();
}

private void subtractMemory(int amount, boolean direct)
{
AtomicLong memory = direct ? _directMemory : _heapMemory;
memory.addAndGet(-amount);
long size = 0;
for (RetainedBucket bucket : direct ? _direct : _indirect)
size += (long)bucket.getPool().getIdleCount() * bucket.getCapacity();
return size;
}

@ManagedAttribute("The available bytes retained by direct ByteBuffers")
public long getAvailableDirectMemory()
{
return getAvailableMemory(true);
return getDirectMemory();
}

@ManagedAttribute("The available bytes retained by heap ByteBuffers")
public long getAvailableHeapMemory()
{
return getAvailableMemory(false);
}

private long getAvailableMemory(boolean direct)
{
RetainedBucket[] buckets = direct ? _direct : _indirect;
long total = 0L;
for (RetainedBucket bucket : buckets)
{
long capacity = bucket.getCapacity();
total += bucket.getPool().getIdleCount() * capacity;
}
return total;
return getHeapMemory();
}

@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
public void clear()
{
clearBuckets(_direct);
_directMemory.set(0);
clearBuckets(_indirect);
_heapMemory.set(0);
}

private void clearBuckets(RetainedBucket[] buckets)
Expand Down Expand Up @@ -484,8 +433,8 @@ public String toString()
super.toString(),
_minCapacity, _maxCapacity,
_direct.length,
getMemory(false), _maxHeapMemory,
getMemory(true), _maxDirectMemory);
getHeapMemory(), _maxHeapMemory,
getDirectMemory(), _maxDirectMemory);
}

private class RetainedBucket
Expand Down Expand Up @@ -634,12 +583,13 @@ private Pool.Entry<RetainableByteBuffer> evict()

private static class Buffer extends AbstractRetainableByteBuffer
{
private final Consumer<RetainableByteBuffer> releaser;
private final Consumer<RetainableByteBuffer> _releaser;
private int _usages;

private Buffer(ByteBuffer buffer, Consumer<RetainableByteBuffer> releaser)
{
super(buffer);
this.releaser = releaser;
this._releaser = releaser;
}

@Override
Expand All @@ -648,17 +598,24 @@ public boolean release()
boolean released = super.release();
if (released)
{
if (releaser != null)
releaser.accept(this);
if (_releaser != null)
_releaser.accept(this);
}
return released;
}

private int use()
{
if (++_usages < 0)
_usages = 0;
return _usages;
}
}

/**
* A variant of the {@link ArrayByteBufferPool} that
* uses buckets of buffers that increase in size by a power of
* 2 (eg 1k, 2k, 4k, 8k, etc.).
* 2 (e.g. 1k, 2k, 4k, 8k, etc.).
*/
public static class Quadratic extends ArrayByteBufferPool
{
Expand Down
Expand Up @@ -28,7 +28,6 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
Expand All @@ -45,22 +44,8 @@ public void testMaxMemoryEviction()

List<RetainableByteBuffer> buffers = new ArrayList<>();

buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), is(0L));
for (int i = 0; i < 200; i++)
buffers.add(pool.acquire(10 + i / 10, true));

assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), is(0L));
Expand All @@ -73,7 +58,22 @@ public void testMaxMemoryEviction()
assertThat(pool.getDirectByteBufferCount(), greaterThan(0L));
assertThat(pool.getDirectByteBufferCount(), lessThan((long)buffers.size()));
assertThat(pool.getDirectMemory(), greaterThan(0L));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), lessThan(120L));
gregw marked this conversation as resolved.
Show resolved Hide resolved

buffers.clear();
for (int i = 0; i < 200; i++)
buffers.add(pool.acquire(10 + i / 10, true));

long maxSize = 0;
for (RetainableByteBuffer buffer : buffers)
{
buffer.release();
long size = pool.getDirectMemory();
maxSize = Math.max(size, maxSize);
}

// Test that size is never too much over target max
assertThat(maxSize, lessThan(100L));
}

@Test
Expand Down
Expand Up @@ -47,7 +47,7 @@ public static void main(String[] args) throws RunnerException
.measurementTime(TimeValue.milliseconds(500))
.addProfiler(AsyncProfiler.class, "dir=/tmp;output=flamegraph;event=cpu;interval=500000;libPath=" + asyncProfilerPath)
.forks(1)
.threads(10)
.threads(32)
.build();
new Runner(opt).run();
}
Expand Down Expand Up @@ -124,4 +124,12 @@ public void inputFixedCapacityOutputRandomCapacityMigrating()
output.release();
input.release();
}

@Benchmark
@BenchmarkMode({Mode.Throughput})
public void fastPathAcquireRelease()
{
RetainableByteBuffer buffer = pool.acquire(65535, true);
buffer.release();
}
}