From 623b845306edb5ca7f89f863ba61ea7cb4b30678 Mon Sep 17 00:00:00 2001 From: Stefania Alborghetti Date: Mon, 11 May 2015 10:11:11 +0800 Subject: [PATCH] BufferPool is fully atomic, added assert on double free, global pool stores chunks --- .../cassandra/utils/memory/BufferPool.java | 229 +++++++----------- .../utils/memory/BufferPoolTest.java | 5 +- 2 files changed, 89 insertions(+), 145 deletions(-) diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index 986806f2bf09..4f7c9480adfb 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -21,8 +21,8 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.NoSpamLogger; @@ -40,7 +40,6 @@ */ public class BufferPool { - /** The size of a page aligned buffer, 64kbit */ static final int CHUNK_SIZE = 64 << 10; @@ -106,15 +105,15 @@ public static long sizeInBytes() } /** - * A queue of page aligned buffers, the buffers, which have been sliced from bigger chunks, - * the chunks, also page aligned. Chunks are added as long as we have not exceeded the + * A queue of page aligned buffers, which have been sliced from bigger chunks, + * the macro-chunks, also page aligned. Chunks are allocated as long as we have not exceeded the * memory maximum threshold, MEMORY_USAGE_THRESHOLD. * * This class is shared by multiple threads. */ static final class GlobalPool { - /** The size of a bigger chunk, 1mbit, must be a multiple of BUFFER_SIZE */ + /** The size of a bigger chunk, 1-mbit, must be a multiple of BUFFER_SIZE */ static final int MACRO_CHUNK_SIZE = 1 << 20; static @@ -125,17 +124,16 @@ static final class GlobalPool } // the collection of large chunks we have allocated; - // these are split up into smaller ones and placed in chunks private final Queue macroChunks = new ConcurrentLinkedQueue<>(); - // suggestion: can't we just put Chunks here? TODO - private final Queue chunks = new ConcurrentLinkedQueue<>(); + // the sliced chunks that we dish out on request + private final Queue chunks = new ConcurrentLinkedQueue<>(); private final AtomicLong memoryUsage = new AtomicLong(); - public ByteBuffer get() + public Chunk get() { while (true) { - ByteBuffer ret = chunks.poll(); + Chunk ret = chunks.poll(); if (ret != null) return ret; @@ -169,15 +167,15 @@ private boolean allocateMoreChunks() macroChunks.add(chunk); for (int i = 0 ; i < MACRO_CHUNK_SIZE ; i += CHUNK_SIZE) { - chunks.add(chunk.get(CHUNK_SIZE)); + chunks.add(new Chunk(chunk.get(CHUNK_SIZE))); } return true; } - public void recycle(ByteBuffer buffer) + public void recycle(Chunk chunk) { - chunks.add(buffer); + chunks.add(chunk); } public long sizeInBytes() @@ -196,10 +194,8 @@ void reset() } /** - * A thread local class that slices a chunk into smaller buffers, this class is not thread - * safe and must be called only by a single thread. Only the metrics are shared by threads. - * - * It grabs chunks from the global pool. + * A thread local class that grabs a chunk from the global pool for this thread allocations. + * Only one thread can do the allocations but multiple threads can release the allocations. */ static final class LocalPool { @@ -221,15 +217,18 @@ private boolean grabChunk() { Chunk current = currentChunk; currentChunk = EMPTY_CHUNK; + // must recycle after clearing currentChunk, so that we synchronize with a concurrent put() - if (!current.isRecycled()) - current.maybeRecycle(); + current.maybeRecycle(this); - ByteBuffer buffer = globalPool.get(); - if (buffer == null) + Chunk chunk = globalPool.get(); + if (chunk == null) return false; - currentChunk = new Chunk(this, buffer); + assert chunk.owner == null; + chunk.setOwner(this); + + currentChunk = chunk; return true; } @@ -242,14 +241,6 @@ public ByteBuffer get(int size) return allocate(size); } - if (currentChunk.isRecycled()) - { - if (logger.isTraceEnabled()) - logger.trace("Current chunk was recycled by different thread", size, CHUNK_SIZE); - if (!grabChunk()) - return allocate(size); - } - ByteBuffer buffer = currentChunk.get(size); if (buffer != null) return buffer; @@ -277,14 +268,16 @@ public void put(ByteBuffer buffer) // suggestion: clean performs all of these checks already // HOWEVER: think we should consider taking a boolean indicating if a heap buffer would be ACCEPTABLE if the pool has run dry // this would likely be faster in most cases, since deallocation is slow. possibly we should even just ALWAYS return a heap buffer - // if we've run out of pool space, but in this case we should probably configure unit tests to run with the pool disabled, to check we don't break anything + // if we've run out of pool space, but in this case we should probably configure unit tests to run with the pool disabled, + // to check we don't break anything FileUtils.clean(buffer); return; } - chunk.free(buffer, this); + chunk.free(buffer); + if (chunk != currentChunk) - chunk.maybeRecycle(); + chunk.maybeRecycle(this); } } @@ -318,36 +311,25 @@ private static ByteBuffer allocateDirectAligned(int capacity) */ final static class Chunk { - public final LocalPool owner; private final ByteBuffer slab; private final long baseAddress; - private long freeSlots = -1; + private final long shift; - // Incremented when the owning pool releases a buffer, single thread update - private int normalFree; + private volatile long freeSlots; + private static final AtomicLongFieldUpdater atomicFreeSlotUpdater = AtomicLongFieldUpdater.newUpdater(Chunk.class, "freeSlots"); - // Incremented when another pool releases a buffer, multiple thread update - private volatile int atomicFree; - private static final AtomicIntegerFieldUpdater atomicFreeUpdater = AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "atomicFree"); + public volatile LocalPool owner; public Chunk(ByteBuffer slab) - { - this(null, slab); - } - - public Chunk(LocalPool owner, ByteBuffer slab) { assert slab.isDirect(); - this.owner = owner; this.slab = slab; this.baseAddress = MemoryUtil.getAddress(slab); + this.shift = 31 & (Integer.numberOfTrailingZeros(slab.capacity() / 64)); - if (slab.capacity() == 0) - freeSlots = 0; - - this.normalFree = slab.capacity(); - this.atomicFree = 0; + this.freeSlots = slab.capacity() == 0 ? 0 : -1; + this.owner = null; } public static Chunk getAllocatedFrom(ByteBuffer buffer) @@ -365,51 +347,50 @@ public static Chunk getAllocatedFrom(ByteBuffer buffer) return null; } - public int capacity() + LocalPool owner() { - return 64 << shift(); + return this.owner; } - final int unit() + public void setOwner(LocalPool owner) { - return 1 << shift(); + this.owner = owner; } - private int shift() + public int capacity() { - return (31 & (Integer.numberOfTrailingZeros(slab.capacity() / 64))); + return 64 << shift; } - final boolean isFree() + final int unit() { - return free() >= capacity(); + return 1 << shift; } - final boolean isRecycled() + final boolean isFree() { - return free() == capacity() + 1; + return free() >= capacity(); } - /** The total free size, includes lost free slots that could not be compacted */ + /** The total free size */ int free() { - return normalFree + atomicFree; + return Long.bitCount(freeSlots) * unit(); } - /** Return the slab to the global pool if we can atomically increment the atomic - * free so that our combined free count (atomic and normal) is exactly capacity + 1. + /** + * Return the slab to the global pool if it's the correct owner asking. */ - public void maybeRecycle() + public void maybeRecycle(LocalPool owner) { if (!isFree()) + return; //TODO - relax this, should be possible to release non free chunks + + if (this.owner != owner) return; - // suggestion: recycle whole Chunk, not just the slab...? - int cur = atomicFree; - int combined = cur + normalFree; - if (combined > capacity() && cur != capacity() + 1) - throw new AssertionError("Chunk has a corrupted free count (atomic=" + atomicFree + ", normal=" + normalFree + ")"); - else if (combined == capacity() && atomicFreeUpdater.compareAndSet(this, cur, cur + 1)) - globalPool.recycle(slab); + + setOwner(null); + globalPool.recycle(this); } /** @@ -418,8 +399,6 @@ else if (combined == capacity() && atomicFreeUpdater.compareAndSet(this, cur, cu */ public ByteBuffer get(int size) { - int shift = shift(); - // how many multiples of our units is the size? // we add (unit - 1), so that when we divide by unit (>>> shift), we effectively round up int slotCount = (size - 1 + unit()) >>> shift; @@ -469,17 +448,29 @@ public ByteBuffer get(int size) long candidate = slotBits << index; if ((candidate & cur) == candidate) { - freeSlots &= ~candidate; - return get(index << shift, slotCount << shift, size); + // here we are sure we will manage to CAS successfully without changing candidate because + // there is only one thread allocating at the moment, the concurrency is with the release + // operations only + while (true) + { + // clear the candidate bits (freeSlots &= ~candidate) + if (atomicFreeSlotUpdater.compareAndSet(this, cur, cur & ~candidate)) + break; + + cur = freeSlots; + // make sure no other thread has cleared the candidate bits + assert ((candidate & cur) == candidate); + } + return get(index << shift, size); } } } - private ByteBuffer get(int offset, long roundedSize, int size) + private ByteBuffer get(int offset, int size) { slab.limit(offset + size); slab.position(offset); - normalFree -= roundedSize; + ByteBuffer ret = slab.slice(); MemoryUtil.setAttachment(ret, this); if (Ref.DEBUG_ENABLED) @@ -497,88 +488,44 @@ int roundUp(int v) } /** Release a buffer */ - public void free(ByteBuffer buffer, LocalPool pool) + public void free(ByteBuffer buffer) { // can just clear attachment, since we have a strong reference to its parent, and no point adding work for GC MemoryUtil.setAttachment(buffer, null); - long address = getAddress(buffer); - if (pool != owner) - releaseDifferentPool(buffer); - else - releaseSamePool(buffer, address); - } - - private long getAddress(ByteBuffer buffer) - { long address = MemoryUtil.getAddress(buffer); - assert (address >= baseAddress) & (address <= baseAddress + capacity()); - return address; - } - - /** This is called by a thread local pool that has been mistakenly given - * a byte buffer allocated by another thread local pool, we simply increment an atomic - * counter so that we can be recycled at the end of our life and we do not - * attempt to reuse the buffer. - */ - public void releaseDifferentPool(ByteBuffer buffer) - { - if (logger.isDebugEnabled()) - logger.debug("Byte buffer {} was released from different thread: {}, original thread was {}", - buffer, - Thread.currentThread().getName(), - getOwningThreadName()); - - int size = roundUp(buffer.capacity()); - while (true) - { - int cur = atomicFree; - if (atomicFreeUpdater.compareAndSet(this, cur, cur + size)) - break; - } - } - - private String getOwningThreadName() - { - assert owner != null; - Thread[] threads = new Thread[Thread.activeCount()]; - Thread.enumerate(threads); - for (Thread t : threads) - { - if (t.getId() == owner.threadId) - return t.getName(); - } - return "Thread name not available"; - } - /** - * This is called by the local thread pool owning this chunk, - * we try and re-use the buffer by storing it in the free slots - * or appending it to the end, if possible. - * - * We also increment the ordinary free counter (non atomic). - */ - private void releaseSamePool(ByteBuffer buffer, long address) - { int position = (int)(address - baseAddress); int size = roundUp(buffer.capacity()); - int shift = shift(); - - normalFree += size; position >>= shift; int slotCount = size >> shift; + if (slotCount == 64) { assert size == capacity(); assert position == 0; - freeSlots = -1; + + while (true) + { + long cur = freeSlots; + assert cur == 0; // ensure no double free + if (atomicFreeSlotUpdater.compareAndSet(this, cur, -1)) + break; + } } else { long slotBits = (1 << slotCount) - 1; - freeSlots |= slotBits << position; + + while (true) + { + long cur = freeSlots; + assert (cur & (slotBits << position)) == 0; // ensure no double free + if (atomicFreeSlotUpdater.compareAndSet(this, cur, cur | (slotBits << position))) + break; + } } } diff --git a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java index efcc9d43d70b..bd9bbd7643af 100644 --- a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java @@ -710,9 +710,6 @@ private void doMultipleThreadsReleaseBuffers(final int threadCount, final int .. BufferPool.put(buffer); - if (chunk.isFree()) - assertTrue(chunk.isRecycled()); - } catch (Exception ex) { @@ -739,6 +736,6 @@ private void doMultipleThreadsReleaseBuffers(final int threadCount, final int .. assertNotNull(buffer); assertEquals(sizes[0], buffer.capacity()); - assertNotSame(chunk, BufferPool.currentChunk()); + assertEquals(chunk, BufferPool.currentChunk()); } }